# 前言

它是一个具有优先级的无界阻塞队列,但由于底层是数组实现,可能出现 OOM。

# 源码分析

# 变量

队列元素的用对象数组存放,用一个 size 记录元素个数。

// 默认数组容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 最大数组容量,减 8 是为了给数组头留些空间,防止 OOM
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 存放元素的数组
private transient Object[] queue;
// 数组中的元素个数
private transient int size;
// 优先队列的元素大小比较器,默认是 null
private transient Comparator<? super E> comparator;

并发控制使用 ReentrantLock,同时使用一个 Condition 以用来阻塞等待队列空的情况,无界队列不存在满的情况,队列满的时候会抛出 OOM 异常。

// 锁
private final ReentrantLock lock;
// 队列为空移除元素的阻塞等待
private final Condition notEmpty;
// 用于分配的自旋锁,通过 CAS 获取
private transient volatile int allocationSpinLock;

# 构造方法

/**
 * 使用默认容量
 */
public PriorityBlockingQueue() {
  this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
 * 指定初始化容量
 */
public PriorityBlockingQueue(int initialCapacity) {
  this(initialCapacity, null);
}
/**
 * 指定初始化容量和比较器
 */
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
  if (initialCapacity < 1)
    throw new IllegalArgumentException();
  this.lock = new ReentrantLock();
  this.notEmpty = lock.newCondition();
  this.comparator = comparator;
  this.queue = new Object[initialCapacity];
}
/**
 * 初始化数据
 */
public PriorityBlockingQueue(Collection<? extends E> c) {
  this.lock = new ReentrantLock();
  this.notEmpty = lock.newCondition();
  // 是否需要堆排序
  boolean heapify = true;
  // 是否需要检查 null 元素
  boolean screen = true;
  // 已排序,直接使用结构,不需要用堆
  if (c instanceof SortedSet<?>) {
    SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
    this.comparator = (Comparator<? super E>) ss.comparator();
    heapify = false;
  }
  // 已经是优先队列,直接使用结构,不需要过滤 null 元素,不需要用堆
  else if (c instanceof PriorityBlockingQueue<?>) {
    PriorityBlockingQueue<? extends E> pq =
      (PriorityBlockingQueue<? extends E>) c;
    this.comparator = (Comparator<? super E>) pq.comparator();
    screen = false;
    if (pq.getClass() == PriorityBlockingQueue.class) // exact match
      heapify = false;
  }
  Object[] a = c.toArray();
  int n = a.length;
  // 构造成对象数组结构
  if (a.getClass() != Object[].class)
    a = Arrays.copyOf(a, n, Object[].class);
  // 检查 null 元素
  if (screen && (n == 1 || this.comparator != null)) {
    for (int i = 0; i < n; ++i)
      if (a[i] == null)
        throw new NullPointerException();
  }
  this.queue = a;
  this.size = n;
  // 堆排序
  if (heapify)
    heapify();
}

# 插入方法

# add(e)

public boolean add(E e) {
  // 直接调用 offer 方法,由于是无界阻塞队列,所以不需要
  return offer(e);
}

# offer(e)

public boolean offer(E e) {
  if (e == null)
    throw new NullPointerException();
  // 获取锁进行插入
  final ReentrantLock lock = this.lock;
  lock.lock();
  //n 是元素大小,cap 是数组长度
  int n, cap;
  Object[] array;
  // 数组扩容
  while ((n = size) >= (cap = (array = queue).length))
    tryGrow(array, cap);
  try {
    Comparator<? super E> cmp = comparator;
    // 自然排序
    if (cmp == null)
      siftUpComparable(n, e, array);
    // 使用自定义比较器排序
    else
      siftUpUsingComparator(n, e, array, cmp);
    // 更新数组元素大小
    size = n + 1;
    // 唤醒等待消费的线程
    notEmpty.signal();
  } finally {
    lock.unlock();
  }
  return true;
}

# put(e)

public void put(E e) {
  // 直接调用 offer 方法,由于是无界阻塞队列,所以不需要阻塞
  offer(e);
}

# offer(e, timeout, unit)

public boolean offer(E e, long timeout, TimeUnit unit) {
  // 直接调用 offer 方法,不需要超时阻塞
  return offer(e); 
}

# 移除方法

# remove()

直接继承父类方法。

/**
 * AbstractQueue 的 remove 方法
 * 可看出与 poll 方法不同的是,如果没有拿到元素,会抛出异常
 */
public E remove() {
  E x = poll();
  if (x != null)
    return x;
  else
    throw new NoSuchElementException();
}

# poll()

public E poll() {
  // 获取到锁并拿到堆顶元素,并重新构造堆
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    return dequeue();
  } finally {
    lock.unlock();
  }
}

# take()

public E take() throws InterruptedException {
  // 可中断锁,阻塞获取堆顶元素,并重新构造堆
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  E result;
  try {
    while ( (result = dequeue()) == null)
      notEmpty.await();
  } finally {
    lock.unlock();
  }
  return result;
}

# poll(timeout, unit)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  // 可中断锁,超时阻塞获取堆顶元素,并重新构造堆
  long nanos = unit.toNanos(timeout);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  E result;
  try {
    while ( (result = dequeue()) == null && nanos > 0)
      nanos = notEmpty.awaitNanos(nanos);
  } finally {
    lock.unlock();
  }
  return result;
}

# 检查方法

# element()

直接继承自 AbstractQueue 的 element () 方法。

/**
 * AbstractQueue 的方法
 * 可看出,与 peek () 方法不同的是,如果为空,会抛出异常
 */
public E element() {
  E x = peek();
  if (x != null)
    return x;
  else
    throw new NoSuchElementException();
}

# peek()

public E peek() {
  // 用锁获取堆顶元素
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    return (size == 0) ? null : (E) queue[0];
  } finally {
    lock.unlock();
  }
}

# 核心方法

优先阻塞队列是基于数组的无界队列,所以数组会进行动态扩容。tryGrow (Object [] array, int oldCap) 方法对数组进行了扩容,通过 CAS 设置一个标志位,保证只有一个线程在进行扩容。

移除元素时,使用一个公共的方法 dequeue () 来获取堆顶元素,并重新生成小顶堆。

同时,优先队列使用的是堆排序,其核心方法会有堆相关的操作,方法 heapify () 负责构造堆,方法 siftUpComparable、siftUpUsingComparator 负责上浮操作,方法 siftDownComparable、siftDownUsingComparator 负责下沉操作。siftUpComparable 和 siftUpUsingComparator 两者区别是,一个使用自然排序,一个使用自定义排序,siftDownComparable 和 siftDownUsingComparator 同理。

# tryGrow(Object[] array, int oldCap)

private void tryGrow(Object[] array, int oldCap) {
  // 先释放,后面重新获取锁,这是为了让其他线程能够操作队列,提高并发效率
  lock.unlock(); 
  Object[] newArray = null;
  // CAS 获取锁,然后进行扩容
  if (allocationSpinLock == 0 &&
      UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                               0, 1)) {
    try {
      // 扩容策略:如果旧的容量小于 64,增加旧容量 + 2 的大小,如果大于 64,增加旧容量的一半
      int newCap = oldCap + ((oldCap < 64) ?
                             (oldCap + 2) : 
                             (oldCap >> 1));
      // 超过了最大容量
      if (newCap - MAX_ARRAY_SIZE > 0) {    
        int minCap = oldCap + 1;
        // 旧容量增加 1 个超过最大容量,直接抛出 OOM 异常
        if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
          throw new OutOfMemoryError();
        // 新容量设置为最大容量
        newCap = MAX_ARRAY_SIZE;
      }
      // 新容量比旧容量大,且其他线程没有进行过扩容,则初始化新的数组
      if (newCap > oldCap && queue == array)
        newArray = new Object[newCap];
    } finally {
      allocationSpinLock = 0;
    }
  }
  // 新的数组没有初始化,说明当前线程无法进行扩容,可能是 CAS 没有获取到锁,让出 CPU 执行权,让别的线程尽快进行扩容
  if (newArray == null) 
    Thread.yield();
  // 加锁,进行扩容或者后续的插入操作
  lock.lock();
  // 新数组初始化过且其他线程没有进行过扩容,将新数组赋值给旧数组,进行扩容
  if (newArray != null && queue == array) {
    queue = newArray;
    System.arraycopy(array, 0, newArray, 0, oldCap);
  }
}

# dequeue()

可以看出,这里没有加锁,是因为在 poll、offer、take 等方法中,进行了加锁,这里就已经是同步操作了。

private E dequeue() {
  // 新的大小
  int n = size - 1;
  if (n < 0)
    return null;
  else {
    Object[] array = queue;
    E result = (E) array[0];
    // 将最后一个元素拿出,进行下沉操作,然后将最后一个元素放在空位置
    E x = (E) array[n];
    array[n] = null;
    Comparator<? super E> cmp = comparator;
    if (cmp == null)
      siftDownComparable(0, x, array, n);
    else
      siftDownUsingComparator(0, x, array, n, cmp);
    // 更新大小
    size = n;
    return result;
  }
}

# heapify()

private void heapify() {
  Object[] array = queue;
  int n = size;
  // 对前一半元素进行下沉操作
  int half = (n >>> 1) - 1;
  Comparator<? super E> cmp = comparator;
  if (cmp == null) {
    for (int i = half; i >= 0; i--)
      siftDownComparable(i, (E) array[i], array, n);
  }
  else {
    for (int i = half; i >= 0; i--)
      siftDownUsingComparator(i, (E) array[i], array, n, cmp);
  }
}

# siftUpComparable(int k, T x, Object[] array)

上浮操作一般用于新增元素,将新增的元素放在堆尾,然后从下到上遍历,直到将元素放在一个合适的位置。

/**
 * @param k 填充的位置
 * @param x 插入的元素
 * @param array 堆
 */
private static <T> void siftUpComparable(int k, T x, Object[] array) {
  // 要求插入的元素支持自然排序
  Comparable<? super T> key = (Comparable<? super T>) x;
  while (k > 0) {
    // 父节点
    int parent = (k - 1) >>> 1;
    Object e = array[parent];
    // 如果比父节点大,直接退出
    if (key.compareTo((T) e) >= 0)
      break;
    // 比父节点小,与父节点进行交换,继续进行上浮
    array[k] = e;
    k = parent;
  }
  // 将元素插入到最后的父节点
  array[k] = key;
}

# siftDownComparable(int k, T x, Object[] array, int n)

下沉操作一般用于堆排序以及移除堆顶时,从上到下遍历,将较小的元素放在父节点上,最后将插入的元素放在空余位置。

/**
 * @param k 填充的位置
 * @param x 插入的元素
 * @param array 堆
 * @param 堆的大小
 */
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
  if (n > 0) {
    // 要求插入的元素支持自然排序
    Comparable<? super T> key = (Comparable<? super T>)x;
    // 找到最后一个非叶子节点
    int half = n >>> 1;       
    // 扫描一半元素,即所有的非叶子节点
    while (k < half) {
      // 左右节点最小值的位置,先假设左子节点最小
      int child = (k << 1) + 1; 
      // 左右节点的最小值
      Object c = array[child];
      // 右子节点
      int right = child + 1;
      // 如果右子节点更小,进行更新
      if (right < n &&
          ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
        c = array[child = right];
      // 如果当前节点比左右节点都小直接退出
      if (key.compareTo((T) c) <= 0)
        break;
      // 当前节点比子节点小,将小值节点提上来,继续进行下沉
      array[k] = c;
      k = child;
    }
    // 下沉结束,将元素插入到最后的子节点上
    array[k] = key;
  }
}

# 并发控制

插入、移除和检查的操作,都会用锁同步进行。

在数组扩容时,首先会释放掉插入方法加的锁,然后通过 CAS 获取扩容锁进行扩容,真正进行扩容的时候,会再次加锁进行扩容。这里先释放掉锁,最后再加锁,是为了减少同步操作,提高扩容的效率。