# 前言

它是基于数组的有界队列。

# 源码分析

# 变量

队列元素的用对象数组存放,同时有两个 index 分别记录插入和移除的位置。

// 元素存储  
final Object[] items;
// 下一个移除的位置
int takeIndex;
// 下一个插入的位置
int putIndex;
// 元素个数
int count;

并发控制使用 ReentrantLock,同时使用两个 Condition 以用来阻塞等待队列空或满的情况。

// 锁
final ReentrantLock lock;
// 队列为空移除元素的阻塞等待
private final Condition notEmpty;
// 队列满插入元素的阻塞等待
private final Condition notFull;

# 构造方法

/**
 * 指定容量,默认是用非公平锁  
 */
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
/**
 * 指定容量和是否是公平锁
 * takeIndex 和 putIndex 默认为 0
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
/**
 * 指定容量和是否公平锁,并放入一些元素
 */
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    this(capacity, fair);
    // 获取锁才操作插入
    final ReentrantLock lock = this.lock;
    lock.lock(); 
    try {
        final Object[] items = this.items;
        int i = 0;
        try {
            for (E e : c)
                items[i++] = Objects.requireNonNull(e);
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

# 插入方法

# add(e)

puublic boolean add(E e) {
    // 直接调用父类  AbstractQueue 的 add 方法
    return super.add(e);
}
/**
 * AbstractQueue 的 add 方法
 * 可以看出,与 offer 不同的地方就在于,如果失败,会抛出队列满的异常
 */
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

# offer(e)

public boolean offer(E e) {
    // 元素为空的话,抛出空指针
    Objects.requireNonNull(e);
    // 获取锁进行插入
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果元素已,直接返回 false
        if (count == items.length)
            return false;
        // 否则进行插入
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

# put(e)

public void put(E e) throws InterruptedException {
    // 元素为空的话,抛出空指针
    Objects.requireNonNull(e);
    // 获取锁进行插入
    final ReentrantLock lock = this.lock;
    // 可被中断
    lock.lockInterruptibly();
    try {
        // 如果元素满了,等待它有空位置
        while (count == items.length)
            notFull.await();
        // 直到有空位置,才会进行插入
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

# offer(e, timeout, unit)

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
	// 元素为空的话,抛出空指针
    Objects.requireNonNull(e);
    // 超时
    long nanos = unit.toNanos(timeout);
    // 获取锁进行插入
    final ReentrantLock lock = this.lock;
    // 可被中断
    lock.lockInterruptibly();
    try {
        // 队列满时进行超时等待 
        while (count == items.length) {
            // 如果超时,直接返回去 false
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

# 移除方法

# remove()

直接继承父类方法。

/**
 * AbstractQueue 的 remove 方法
 * 可看出与 poll 方法不同的是,如果没有拿到元素,会抛出异常
 */
public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

# poll()

public E poll() {
    // 获取到锁进行移除
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果空了返回 null
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

# take()

public E take() throws InterruptedException {
    // 获取到锁进行移除  
    final ReentrantLock lock = this.lock;
    // 可被中断
    lock.lockInterruptibly();
    try {
        //  队列为空时一直等待
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

# poll(timeout, unit)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 计算超时
    long nanos = unit.toNanos(timeout);
    // 获取到锁进行移除
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 超时等待
        while (count == 0) {
            // 如果超时,直接返回 null
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

# 检查方法

# element()

直接继承自 AbstractQueue 的 element () 方法。

/**
 * AbstractQueue 的方法
 * 可看出,与 peek () 方法不同的是,如果为空,会抛出异常
 */
public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

# peek()

public E peek() {
    // 获取到锁进行检查
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果元素为空返回 null
        return itemAt(takeIndex); 
    } finally {
        lock.unlock();
    }
}

# 核心方法

插入、移除的几种方法只是阻塞策略不同,实际上使用的核心方法是相同的。插入使用的是 enqueue (e) 方法,移除使用的是 dequeue () 方法,这些方法都不考虑异常情况,直接进行插入和移除。下面具体来看下这几种方法。

# enqueue(e)

private void enqueue(E x) {
	// 这里不考虑队列为空的情况,不考虑 count 大于容量,也不考虑锁,都是在外部考虑
  final Object[] items = this.items;
  items[putIndex] = x;
  // 如果 index 越界,从 0 开始重新计数
  if (++putIndex == items.length)
    putIndex = 0;
  count++;
  // 唤醒因为队列为空而阻塞的移除线程
  notEmpty.signal();
}

# dequeue()

private E dequeue() {
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  // 移除元素直接设置为 null
  items[takeIndex] = null;
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--;
  // 这是在通知其他的迭代器删除过期元素
  if (itrs != null)
    itrs.elementDequeued();
  // 唤醒因队列满而阻塞的插入线程
  notFull.signal();
  return x;
}

# 并发控制

在插入和移除时,都会用锁进行同步操作,有可能发生阻塞的同步操作中,会使用可被中断的锁。