# 前言

它是基于链表的无界队列。

# 源码分析

# 数据结构

# Node

static class Node<E> {
  E item;
  Node<E> next;
  Node(E x) { item = x; }
}

# 变量

数据结构使用链表存储,维护一个链表的头节点和尾节点,队列长度使用一个原子性的计数。

// 队列容量,默认 Integer 的最大值
private final int capacity;
// 队列元素数量
private final AtomicInteger count = new AtomicInteger();
// 队列头节点,头节点的下一个节点才是队列的第一个节点,头节点元素值为 null
transient Node<E> head;
// 队列尾节点
private transient Node<E> last;

和 ArrayBLockingQueue 不同的是,它使用了两个锁,一个插入锁,一个移除锁。

// 移除锁
private final ReentrantLock takeLock = new ReentrantLock();
// 阻塞等待队列非空
private final Condition notEmpty = takeLock.newCondition();
// 插入锁
private final ReentrantLock putLock = new ReentrantLock();
// 阻塞等待队列有空位置
private final Condition notFull = putLock.newCondition();

# 构造方法

/**
 * 容量默认 Integer 最大值
 */
public LinkedBlockingQueue() {
  this(Integer.MAX_VALUE);
}
/**
 * 指定容量
 */
public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
  this.capacity = capacity;
  // 头节点和尾节点初始化为空
  last = head = new Node<E>(null);
}
/**
 * 初始化数据
 */
public LinkedBlockingQueue(Collection<? extends E> c) {
  this(Integer.MAX_VALUE);
  final ReentrantLock putLock = this.putLock;
  // 这里不会发生冲突,但是需要保证可见性
  putLock.lock(); 
  try {
    int n = 0;
    for (E e : c) {
      if (e == null)
        throw new NullPointerException();
      if (n == capacity)
        throw new IllegalStateException("Queue full");
      enqueue(new Node<E>(e));
      ++n;
    }
    count.set(n);
  } finally {
    putLock.unlock();
  }
}

# 插入方法

# add(e)

直接继承的父类方法,父类调用 offer () 方法,返回 false 的话会抛队列满了的异常。

# offer(e)

public boolean offer(E e) {
  if (e == null) throw new NullPointerException();
  final AtomicInteger count = this.count;
  // 队列满了
  if (count.get() == capacity)
    return false;
  int c = -1;
  Node<E> node = new Node<E>(e);
  // 获取插入锁进行插入
  final ReentrantLock putLock = this.putLock;
  putLock.lock();
  try {
    if (count.get() < capacity) {
      // 插入,不会对 count 做修改
      enqueue(node);
      //c 是之前的元素数量
      c = count.getAndIncrement();
      // 如果还有空位置,唤醒等待插入的线程
      if (c + 1 < capacity)
        notFull.signal();
    }
  } finally {
    putLock.unlock();
  }
  // 如果之前队列为空,唤醒等待移除的线程
  if (c == 0)
    signalNotEmpty();
  return c >= 0;
}

# put(e)

public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  int c = -1;
  Node<E> node = new Node<E>(e);
  final ReentrantLock putLock = this.putLock;
  final AtomicInteger count = this.count;
  // 可中断锁
  putLock.lockInterruptibly();
  try {
    // 阻塞等待插入
    while (count.get() == capacity) {
      notFull.await();
    }
    // 插入
    enqueue(node);
    c = count.getAndIncrement();
		// 如果还有空位,唤醒等待插入的线程
    if (c + 1 < capacity)
      notFull.signal();
  } finally {
    putLock.unlock();
  }
  // 如果之前是空,唤醒之前等待移除的线程
  if (c == 0)
    signalNotEmpty();
}

# offer(e, timeout, unit)

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  long nanos = unit.toNanos(timeout);
  int c = -1;
  final ReentrantLock putLock = this.putLock;
  final AtomicInteger count = this.count;
  putLock.lockInterruptibly();
  try {
    // 超时等待
    while (count.get() == capacity) {
      if (nanos <= 0)
        return false;
      nanos = notFull.awaitNanos(nanos);
    }
    enqueue(new Node<E>(e));
    c = count.getAndIncrement();
    if (c + 1 < capacity)
      notFull.signal();
  } finally {
    putLock.unlock();
  }
  // 如果之前是空,唤醒之前等待移除的线程
  if (c == 0)
    signalNotEmpty();
  return true;
}

# 移除方法

# remote()

直接继承父类 remove (),调用子类 poll () 方法,如果获取到的元素为 null,就抛出队列为空的异常。

# poll()

public E poll() {
  final AtomicInteger count = this.count;
  // 如果队列为空,直接返回 null
  if (count.get() == 0)
    return null;
  E x = null;
  int c = -1;
  // 获取移除锁进行移除
  final ReentrantLock takeLock = this.takeLock;
  takeLock.lock();
  try {
    if (count.get() > 0) {
      x = dequeue();
      c = count.getAndDecrement();
      // 如果队列还有元素,唤醒等待移除的线程
      if (c > 1)
        notEmpty.signal();
    }
  } finally {
    takeLock.unlock();
  }
  // 如果之前满了,唤醒等待插入的线程
  if (c == capacity)
    signalNotFull();
  return x;
}

# take()

public E take() throws InterruptedException {
  E x;
  int c = -1;
  final AtomicInteger count = this.count;
  final ReentrantLock takeLock = this.takeLock;
  takeLock.lockInterruptibly();
  try {
    // 阻塞等待直到队列有元素被唤醒
    while (count.get() == 0) {
      notEmpty.await();
    }
    x = dequeue();
    c = count.getAndDecrement();
    // 唤醒正在等待移除的元素
    if (c > 1)
      notEmpty.signal();
  } finally {
    takeLock.unlock();
  }
  // 如果之前容量达到上限,唤醒等待插入的元素
  if (c == capacity)
    signalNotFull();
  return x;
}

# poll(timeout, unit)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  E x = null;
  int c = -1;
  long nanos = unit.toNanos(timeout);
  final AtomicInteger count = this.count;
  final ReentrantLock takeLock = this.takeLock;
  takeLock.lockInterruptibly();
  try {
    // 超时等待
    while (count.get() == 0) {
      if (nanos <= 0)
        return null;
      nanos = notEmpty.awaitNanos(nanos);
    }
    x = dequeue();
    c = count.getAndDecrement();
    if (c > 1)
      notEmpty.signal();
  } finally {
    takeLock.unlock();
  }
  if (c == capacity)
    signalNotFull();
  return x;
}

# 检查方法

# element()

直接继承父类方法。

# peek()

public E peek() {
  if (count.get() == 0)
    return null;
  // 获取移除锁,直接读取第一个节点,从这里可以看出,第一个节点是头节点的下一个节点
  final ReentrantLock takeLock = this.takeLock;
  takeLock.lock();
  try {
    Node<E> first = head.next;
    if (first == null)
      return null;
    else
      return first.item;
  } finally {
    takeLock.unlock();
  }
}

# 核心方法

# enqueue(e)

private void enqueue(Node<E> node) {
  // 直接在尾结点插入元素
  last = last.next = node;
}

# dequeue()

private E dequeue() {
  //head 的 next 指向自己,first 变为新的 head,且元素值设为 null,这里不会修改 first 的 next,所以当队列只有一个元素时,也不会和 enqueue (e) 方法冲突
  Node<E> h = head;
  Node<E> first = h.next;
  h.next = h; 
  head = first;
  E x = first.item;
  first.item = null;
  return x;
}

# signalNotEmpty()

private void signalNotEmpty() {
  // 获取移除锁,唤醒等待移除的线程
  final ReentrantLock takeLock = this.takeLock;
  takeLock.lock();
  try {
    notEmpty.signal();
  } finally {
    takeLock.unlock();
  }
}

# signalNotFull()

private void signalNotFull() {
  // 获取插入锁,唤醒等待插入的线程
  final ReentrantLock putLock = this.putLock;
  putLock.lock();
  try {
    notFull.signal();
  } finally {
    putLock.unlock();
  }
}

# 并发控制

链表阻塞队列使用的锁是一种双锁算法,两个锁分别控制插入和移除的同步。那么双锁算法是怎么控制插入和移除的并发呢?

对于空或满队列,插入和移除线程会有一个无法进行操作,此时无线程并发;对于既可插入又可移除的队列,插入和移除的操作如图所示,从上到下分别是:空队列、插入线程插入 1、插入线程插入 2 和移除线程移除 1。

链表阻塞队列插入和移除并发操作

首先是一个空队列,插入 1 之后,插入 2 的线程和移除 1 的线程同时执行,插入线程更改的数据有:原 head 的 next 节点、原 first 的值、head 指向原 first,移除线程更改的数据有:原 first 的 next 执行新增节点、原 tail 指向新增节点,可以看出两个线程的操作并不冲突。