# 前言
它是一个具有优先级的无界阻塞队列,但由于底层是数组实现,可能出现 OOM。
# 源码分析
# 变量
队列元素的用对象数组存放,用一个 size 记录元素个数。
| |
| private static final int DEFAULT_INITIAL_CAPACITY = 11; |
| |
| |
| private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; |
| |
| |
| private transient Object[] queue; |
| |
| |
| private transient int size; |
| |
| |
| private transient Comparator<? super E> comparator; |
并发控制使用 ReentrantLock,同时使用一个 Condition 以用来阻塞等待队列空的情况,无界队列不存在满的情况,队列满的时候会抛出 OOM 异常。
| |
| private final ReentrantLock lock; |
| |
| |
| private final Condition notEmpty; |
| |
| |
| private transient volatile int allocationSpinLock; |
# 构造方法
| |
| * 使用默认容量 |
| */ |
| public PriorityBlockingQueue() { |
| this(DEFAULT_INITIAL_CAPACITY, null); |
| } |
| |
| |
| * 指定初始化容量 |
| */ |
| public PriorityBlockingQueue(int initialCapacity) { |
| this(initialCapacity, null); |
| } |
| |
| |
| * 指定初始化容量和比较器 |
| */ |
| public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { |
| if (initialCapacity < 1) |
| throw new IllegalArgumentException(); |
| this.lock = new ReentrantLock(); |
| this.notEmpty = lock.newCondition(); |
| this.comparator = comparator; |
| this.queue = new Object[initialCapacity]; |
| } |
| |
| |
| * 初始化数据 |
| */ |
| public PriorityBlockingQueue(Collection<? extends E> c) { |
| this.lock = new ReentrantLock(); |
| this.notEmpty = lock.newCondition(); |
| |
| boolean heapify = true; |
| |
| boolean screen = true; |
| |
| if (c instanceof SortedSet<?>) { |
| SortedSet<? extends E> ss = (SortedSet<? extends E>) c; |
| this.comparator = (Comparator<? super E>) ss.comparator(); |
| heapify = false; |
| } |
| |
| else if (c instanceof PriorityBlockingQueue<?>) { |
| PriorityBlockingQueue<? extends E> pq = |
| (PriorityBlockingQueue<? extends E>) c; |
| this.comparator = (Comparator<? super E>) pq.comparator(); |
| screen = false; |
| if (pq.getClass() == PriorityBlockingQueue.class) |
| heapify = false; |
| } |
| Object[] a = c.toArray(); |
| int n = a.length; |
| |
| if (a.getClass() != Object[].class) |
| a = Arrays.copyOf(a, n, Object[].class); |
| |
| if (screen && (n == 1 || this.comparator != null)) { |
| for (int i = 0; i < n; ++i) |
| if (a[i] == null) |
| throw new NullPointerException(); |
| } |
| this.queue = a; |
| this.size = n; |
| |
| if (heapify) |
| heapify(); |
| } |
# 插入方法
# add(e)
| public boolean add(E e) { |
| |
| return offer(e); |
| } |
# offer(e)
| public boolean offer(E e) { |
| if (e == null) |
| throw new NullPointerException(); |
| |
| final ReentrantLock lock = this.lock; |
| lock.lock(); |
| |
| int n, cap; |
| Object[] array; |
| |
| while ((n = size) >= (cap = (array = queue).length)) |
| tryGrow(array, cap); |
| try { |
| Comparator<? super E> cmp = comparator; |
| |
| if (cmp == null) |
| siftUpComparable(n, e, array); |
| |
| else |
| siftUpUsingComparator(n, e, array, cmp); |
| |
| size = n + 1; |
| |
| notEmpty.signal(); |
| } finally { |
| lock.unlock(); |
| } |
| return true; |
| } |
# put(e)
| public void put(E e) { |
| |
| offer(e); |
| } |
# offer(e, timeout, unit)
| public boolean offer(E e, long timeout, TimeUnit unit) { |
| |
| return offer(e); |
| } |
# 移除方法
# remove()
直接继承父类方法。
| |
| * AbstractQueue 的 remove 方法 |
| * 可看出与 poll 方法不同的是,如果没有拿到元素,会抛出异常 |
| */ |
| public E remove() { |
| E x = poll(); |
| if (x != null) |
| return x; |
| else |
| throw new NoSuchElementException(); |
| } |
# poll()
| public E poll() { |
| |
| final ReentrantLock lock = this.lock; |
| lock.lock(); |
| try { |
| return dequeue(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
# take()
| public E take() throws InterruptedException { |
| |
| final ReentrantLock lock = this.lock; |
| lock.lockInterruptibly(); |
| E result; |
| try { |
| while ( (result = dequeue()) == null) |
| notEmpty.await(); |
| } finally { |
| lock.unlock(); |
| } |
| return result; |
| } |
# poll(timeout, unit)
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
| |
| long nanos = unit.toNanos(timeout); |
| final ReentrantLock lock = this.lock; |
| lock.lockInterruptibly(); |
| E result; |
| try { |
| while ( (result = dequeue()) == null && nanos > 0) |
| nanos = notEmpty.awaitNanos(nanos); |
| } finally { |
| lock.unlock(); |
| } |
| return result; |
| } |
# 检查方法
# element()
直接继承自 AbstractQueue 的 element () 方法。
| |
| * AbstractQueue 的方法 |
| * 可看出,与 peek () 方法不同的是,如果为空,会抛出异常 |
| */ |
| public E element() { |
| E x = peek(); |
| if (x != null) |
| return x; |
| else |
| throw new NoSuchElementException(); |
| } |
# peek()
| public E peek() { |
| |
| final ReentrantLock lock = this.lock; |
| lock.lock(); |
| try { |
| return (size == 0) ? null : (E) queue[0]; |
| } finally { |
| lock.unlock(); |
| } |
| } |
# 核心方法
优先阻塞队列是基于数组的无界队列,所以数组会进行动态扩容。tryGrow (Object [] array, int oldCap) 方法对数组进行了扩容,通过 CAS 设置一个标志位,保证只有一个线程在进行扩容。
移除元素时,使用一个公共的方法 dequeue () 来获取堆顶元素,并重新生成小顶堆。
同时,优先队列使用的是堆排序,其核心方法会有堆相关的操作,方法 heapify () 负责构造堆,方法 siftUpComparable、siftUpUsingComparator 负责上浮操作,方法 siftDownComparable、siftDownUsingComparator 负责下沉操作。siftUpComparable 和 siftUpUsingComparator 两者区别是,一个使用自然排序,一个使用自定义排序,siftDownComparable 和 siftDownUsingComparator 同理。
# tryGrow(Object[] array, int oldCap)
| private void tryGrow(Object[] array, int oldCap) { |
| |
| lock.unlock(); |
| Object[] newArray = null; |
| |
| if (allocationSpinLock == 0 && |
| UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, |
| 0, 1)) { |
| try { |
| |
| int newCap = oldCap + ((oldCap < 64) ? |
| (oldCap + 2) : |
| (oldCap >> 1)); |
| |
| if (newCap - MAX_ARRAY_SIZE > 0) { |
| int minCap = oldCap + 1; |
| |
| if (minCap < 0 || minCap > MAX_ARRAY_SIZE) |
| throw new OutOfMemoryError(); |
| |
| newCap = MAX_ARRAY_SIZE; |
| } |
| |
| if (newCap > oldCap && queue == array) |
| newArray = new Object[newCap]; |
| } finally { |
| allocationSpinLock = 0; |
| } |
| } |
| |
| if (newArray == null) |
| Thread.yield(); |
| |
| lock.lock(); |
| |
| if (newArray != null && queue == array) { |
| queue = newArray; |
| System.arraycopy(array, 0, newArray, 0, oldCap); |
| } |
| } |
# dequeue()
可以看出,这里没有加锁,是因为在 poll、offer、take 等方法中,进行了加锁,这里就已经是同步操作了。
| private E dequeue() { |
| |
| int n = size - 1; |
| if (n < 0) |
| return null; |
| else { |
| Object[] array = queue; |
| E result = (E) array[0]; |
| |
| E x = (E) array[n]; |
| array[n] = null; |
| Comparator<? super E> cmp = comparator; |
| if (cmp == null) |
| siftDownComparable(0, x, array, n); |
| else |
| siftDownUsingComparator(0, x, array, n, cmp); |
| |
| size = n; |
| return result; |
| } |
| } |
# heapify()
| private void heapify() { |
| Object[] array = queue; |
| int n = size; |
| |
| int half = (n >>> 1) - 1; |
| Comparator<? super E> cmp = comparator; |
| if (cmp == null) { |
| for (int i = half; i >= 0; i--) |
| siftDownComparable(i, (E) array[i], array, n); |
| } |
| else { |
| for (int i = half; i >= 0; i--) |
| siftDownUsingComparator(i, (E) array[i], array, n, cmp); |
| } |
| } |
# siftUpComparable(int k, T x, Object[] array)
上浮操作一般用于新增元素,将新增的元素放在堆尾,然后从下到上遍历,直到将元素放在一个合适的位置。
| |
| * @param k 填充的位置 |
| * @param x 插入的元素 |
| * @param array 堆 |
| */ |
| private static <T> void siftUpComparable(int k, T x, Object[] array) { |
| |
| Comparable<? super T> key = (Comparable<? super T>) x; |
| while (k > 0) { |
| |
| int parent = (k - 1) >>> 1; |
| Object e = array[parent]; |
| |
| if (key.compareTo((T) e) >= 0) |
| break; |
| |
| array[k] = e; |
| k = parent; |
| } |
| |
| array[k] = key; |
| } |
# siftDownComparable(int k, T x, Object[] array, int n)
下沉操作一般用于堆排序以及移除堆顶时,从上到下遍历,将较小的元素放在父节点上,最后将插入的元素放在空余位置。
| |
| * @param k 填充的位置 |
| * @param x 插入的元素 |
| * @param array 堆 |
| * @param 堆的大小 |
| */ |
| private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { |
| if (n > 0) { |
| |
| Comparable<? super T> key = (Comparable<? super T>)x; |
| |
| int half = n >>> 1; |
| |
| while (k < half) { |
| |
| int child = (k << 1) + 1; |
| |
| Object c = array[child]; |
| |
| int right = child + 1; |
| |
| if (right < n && |
| ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) |
| c = array[child = right]; |
| |
| if (key.compareTo((T) c) <= 0) |
| break; |
| |
| array[k] = c; |
| k = child; |
| } |
| |
| array[k] = key; |
| } |
| } |
# 并发控制
插入、移除和检查的操作,都会用锁同步进行。
在数组扩容时,首先会释放掉插入方法加的锁,然后通过 CAS 获取扩容锁进行扩容,真正进行扩容的时候,会再次加锁进行扩容。这里先释放掉锁,最后再加锁,是为了减少同步操作,提高扩容的效率。