# 前言
它是基于数组的有界队列。
# 源码分析
# 变量
队列元素的用对象数组存放,同时有两个 index 分别记录插入和移除的位置。
| |
| final Object[] items; |
| |
| |
| int takeIndex; |
| |
| |
| int putIndex; |
| |
| |
| int count; |
并发控制使用 ReentrantLock,同时使用两个 Condition 以用来阻塞等待队列空或满的情况。
| |
| final ReentrantLock lock; |
| |
| |
| private final Condition notEmpty; |
| |
| |
| private final Condition notFull; |
# 构造方法
| |
| * 指定容量,默认是用非公平锁 |
| */ |
| public ArrayBlockingQueue(int capacity) { |
| this(capacity, false); |
| } |
| |
| |
| * 指定容量和是否是公平锁 |
| * takeIndex 和 putIndex 默认为 0 |
| */ |
| public ArrayBlockingQueue(int capacity, boolean fair) { |
| if (capacity <= 0) |
| throw new IllegalArgumentException(); |
| this.items = new Object[capacity]; |
| lock = new ReentrantLock(fair); |
| notEmpty = lock.newCondition(); |
| notFull = lock.newCondition(); |
| } |
| |
| |
| * 指定容量和是否公平锁,并放入一些元素 |
| */ |
| public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { |
| this(capacity, fair); |
| |
| |
| final ReentrantLock lock = this.lock; |
| lock.lock(); |
| try { |
| final Object[] items = this.items; |
| int i = 0; |
| try { |
| for (E e : c) |
| items[i++] = Objects.requireNonNull(e); |
| } catch (ArrayIndexOutOfBoundsException ex) { |
| throw new IllegalArgumentException(); |
| } |
| count = i; |
| putIndex = (i == capacity) ? 0 : i; |
| } finally { |
| lock.unlock(); |
| } |
| } |
# 插入方法
# add(e)
| puublic boolean add(E e) { |
| |
| return super.add(e); |
| } |
| |
| |
| * AbstractQueue 的 add 方法 |
| * 可以看出,与 offer 不同的地方就在于,如果失败,会抛出队列满的异常 |
| */ |
| public boolean add(E e) { |
| if (offer(e)) |
| return true; |
| else |
| throw new IllegalStateException("Queue full"); |
| } |
# offer(e)
| public boolean offer(E e) { |
| |
| Objects.requireNonNull(e); |
| |
| final ReentrantLock lock = this.lock; |
| lock.lock(); |
| try { |
| |
| if (count == items.length) |
| return false; |
| |
| else { |
| enqueue(e); |
| return true; |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
# put(e)
| public void put(E e) throws InterruptedException { |
| |
| Objects.requireNonNull(e); |
| |
| final ReentrantLock lock = this.lock; |
| |
| lock.lockInterruptibly(); |
| try { |
| |
| while (count == items.length) |
| notFull.await(); |
| |
| enqueue(e); |
| } finally { |
| lock.unlock(); |
| } |
| } |
# offer(e, timeout, unit)
| public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { |
| |
| Objects.requireNonNull(e); |
| |
| long nanos = unit.toNanos(timeout); |
| |
| final ReentrantLock lock = this.lock; |
| |
| lock.lockInterruptibly(); |
| try { |
| |
| while (count == items.length) { |
| |
| if (nanos <= 0L) |
| return false; |
| nanos = notFull.awaitNanos(nanos); |
| } |
| enqueue(e); |
| return true; |
| } finally { |
| lock.unlock(); |
| } |
| } |
# 移除方法
# remove()
直接继承父类方法。
| |
| * AbstractQueue 的 remove 方法 |
| * 可看出与 poll 方法不同的是,如果没有拿到元素,会抛出异常 |
| */ |
| public E remove() { |
| E x = poll(); |
| if (x != null) |
| return x; |
| else |
| throw new NoSuchElementException(); |
| } |
# poll()
| public E poll() { |
| |
| final ReentrantLock lock = this.lock; |
| lock.lock(); |
| try { |
| |
| return (count == 0) ? null : dequeue(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
# take()
| public E take() throws InterruptedException { |
| |
| final ReentrantLock lock = this.lock; |
| |
| lock.lockInterruptibly(); |
| try { |
| |
| while (count == 0) |
| notEmpty.await(); |
| return dequeue(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
# poll(timeout, unit)
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
| |
| long nanos = unit.toNanos(timeout); |
| |
| final ReentrantLock lock = this.lock; |
| lock.lockInterruptibly(); |
| try { |
| |
| while (count == 0) { |
| |
| if (nanos <= 0L) |
| return null; |
| nanos = notEmpty.awaitNanos(nanos); |
| } |
| return dequeue(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
# 检查方法
# element()
直接继承自 AbstractQueue 的 element () 方法。
| |
| * AbstractQueue 的方法 |
| * 可看出,与 peek () 方法不同的是,如果为空,会抛出异常 |
| */ |
| public E element() { |
| E x = peek(); |
| if (x != null) |
| return x; |
| else |
| throw new NoSuchElementException(); |
| } |
# peek()
| public E peek() { |
| |
| final ReentrantLock lock = this.lock; |
| lock.lock(); |
| try { |
| |
| return itemAt(takeIndex); |
| } finally { |
| lock.unlock(); |
| } |
| } |
# 核心方法
插入、移除的几种方法只是阻塞策略不同,实际上使用的核心方法是相同的。插入使用的是 enqueue (e) 方法,移除使用的是 dequeue () 方法,这些方法都不考虑异常情况,直接进行插入和移除。下面具体来看下这几种方法。
# enqueue(e)
| private void enqueue(E x) { |
| |
| final Object[] items = this.items; |
| items[putIndex] = x; |
| |
| if (++putIndex == items.length) |
| putIndex = 0; |
| count++; |
| |
| notEmpty.signal(); |
| } |
# dequeue()
| private E dequeue() { |
| final Object[] items = this.items; |
| @SuppressWarnings("unchecked") |
| E x = (E) items[takeIndex]; |
| |
| items[takeIndex] = null; |
| if (++takeIndex == items.length) |
| takeIndex = 0; |
| count--; |
| |
| if (itrs != null) |
| itrs.elementDequeued(); |
| |
| notFull.signal(); |
| return x; |
| } |
# 并发控制
在插入和移除时,都会用锁进行同步操作,有可能发生阻塞的同步操作中,会使用可被中断的锁。