# 前言它是基于链表的无界队列。
# 源码分析# 数据结构# Node
static class Node < E > { E item; Node < E > next; Node ( E x) { item = x; } }
# 变量数据结构使用链表存储,维护一个链表的头节点和尾节点,队列长度使用一个原子性的计数。
private final int capacity; private final AtomicInteger count = new AtomicInteger ( ) ; transient Node < E > head; private transient Node < E > last;
和 ArrayBLockingQueue 不同的是,它使用了两个锁,一个插入锁,一个移除锁。
private final ReentrantLock takeLock = new ReentrantLock ( ) ; private final Condition notEmpty = takeLock. newCondition ( ) ; private final ReentrantLock putLock = new ReentrantLock ( ) ; private final Condition notFull = putLock. newCondition ( ) ;
# 构造方法 * 容量默认 Integer 最大值 */ public LinkedBlockingQueue ( ) { this ( Integer . MAX_VALUE ) ; } * 指定容量 */ public LinkedBlockingQueue ( int capacity) { if ( capacity <= 0 ) throw new IllegalArgumentException ( ) ; this . capacity = capacity; last = head = new Node < E > ( null ) ; } * 初始化数据 */ public LinkedBlockingQueue ( Collection < ? extends E > c) { this ( Integer . MAX_VALUE ) ; final ReentrantLock putLock = this . putLock; putLock. lock ( ) ; try { int n = 0 ; for ( E e : c) { if ( e == null ) throw new NullPointerException ( ) ; if ( n == capacity) throw new IllegalStateException ( "Queue full" ) ; enqueue ( new Node < E > ( e) ) ; ++ n; } count. set ( n) ; } finally { putLock. unlock ( ) ; } }
# 插入方法# add(e)直接继承的父类方法,父类调用 offer () 方法,返回 false 的话会抛队列满了的异常。
# offer(e)public boolean offer ( E e) { if ( e == null ) throw new NullPointerException ( ) ; final AtomicInteger count = this . count; if ( count. get ( ) == capacity) return false ; int c = - 1 ; Node < E > node = new Node < E > ( e) ; final ReentrantLock putLock = this . putLock; putLock. lock ( ) ; try { if ( count. get ( ) < capacity) { enqueue ( node) ; c = count. getAndIncrement ( ) ; if ( c + 1 < capacity) notFull. signal ( ) ; } } finally { putLock. unlock ( ) ; } if ( c == 0 ) signalNotEmpty ( ) ; return c >= 0 ; }
# put(e)public void put ( E e) throws InterruptedException { if ( e == null ) throw new NullPointerException ( ) ; int c = - 1 ; Node < E > node = new Node < E > ( e) ; final ReentrantLock putLock = this . putLock; final AtomicInteger count = this . count; putLock. lockInterruptibly ( ) ; try { while ( count. get ( ) == capacity) { notFull. await ( ) ; } enqueue ( node) ; c = count. getAndIncrement ( ) ; if ( c + 1 < capacity) notFull. signal ( ) ; } finally { putLock. unlock ( ) ; } if ( c == 0 ) signalNotEmpty ( ) ; }
# offer(e, timeout, unit)public boolean offer ( E e, long timeout, TimeUnit unit) throws InterruptedException { if ( e == null ) throw new NullPointerException ( ) ; long nanos = unit. toNanos ( timeout) ; int c = - 1 ; final ReentrantLock putLock = this . putLock; final AtomicInteger count = this . count; putLock. lockInterruptibly ( ) ; try { while ( count. get ( ) == capacity) { if ( nanos <= 0 ) return false ; nanos = notFull. awaitNanos ( nanos) ; } enqueue ( new Node < E > ( e) ) ; c = count. getAndIncrement ( ) ; if ( c + 1 < capacity) notFull. signal ( ) ; } finally { putLock. unlock ( ) ; } if ( c == 0 ) signalNotEmpty ( ) ; return true ; }
# 移除方法# remote()直接继承父类 remove (),调用子类 poll () 方法,如果获取到的元素为 null,就抛出队列为空的异常。
# poll()public E poll ( ) { final AtomicInteger count = this . count; if ( count. get ( ) == 0 ) return null ; E x = null ; int c = - 1 ; final ReentrantLock takeLock = this . takeLock; takeLock. lock ( ) ; try { if ( count. get ( ) > 0 ) { x = dequeue ( ) ; c = count. getAndDecrement ( ) ; if ( c > 1 ) notEmpty. signal ( ) ; } } finally { takeLock. unlock ( ) ; } if ( c == capacity) signalNotFull ( ) ; return x; }
# take()public E take ( ) throws InterruptedException { E x; int c = - 1 ; final AtomicInteger count = this . count; final ReentrantLock takeLock = this . takeLock; takeLock. lockInterruptibly ( ) ; try { while ( count. get ( ) == 0 ) { notEmpty. await ( ) ; } x = dequeue ( ) ; c = count. getAndDecrement ( ) ; if ( c > 1 ) notEmpty. signal ( ) ; } finally { takeLock. unlock ( ) ; } if ( c == capacity) signalNotFull ( ) ; return x; }
# poll(timeout, unit)public E poll ( long timeout, TimeUnit unit) throws InterruptedException { E x = null ; int c = - 1 ; long nanos = unit. toNanos ( timeout) ; final AtomicInteger count = this . count; final ReentrantLock takeLock = this . takeLock; takeLock. lockInterruptibly ( ) ; try { while ( count. get ( ) == 0 ) { if ( nanos <= 0 ) return null ; nanos = notEmpty. awaitNanos ( nanos) ; } x = dequeue ( ) ; c = count. getAndDecrement ( ) ; if ( c > 1 ) notEmpty. signal ( ) ; } finally { takeLock. unlock ( ) ; } if ( c == capacity) signalNotFull ( ) ; return x; }
# 检查方法# element()直接继承父类方法。
# peek()public E peek ( ) { if ( count. get ( ) == 0 ) return null ; final ReentrantLock takeLock = this . takeLock; takeLock. lock ( ) ; try { Node < E > first = head. next; if ( first == null ) return null ; else return first. item; } finally { takeLock. unlock ( ) ; } }
# 核心方法# enqueue(e)private void enqueue ( Node < E > node) { last = last. next = node; }
# dequeue()private E dequeue ( ) { Node < E > h = head; Node < E > first = h. next; h. next = h; head = first; E x = first. item; first. item = null ; return x; }
# signalNotEmpty()private void signalNotEmpty ( ) { final ReentrantLock takeLock = this . takeLock; takeLock. lock ( ) ; try { notEmpty. signal ( ) ; } finally { takeLock. unlock ( ) ; } }
# signalNotFull()private void signalNotFull ( ) { final ReentrantLock putLock = this . putLock; putLock. lock ( ) ; try { notFull. signal ( ) ; } finally { putLock. unlock ( ) ; } }
# 并发控制链表阻塞队列使用的锁是一种双锁算法,两个锁分别控制插入和移除的同步。那么双锁算法是怎么控制插入和移除的并发呢?
对于空或满队列,插入和移除线程会有一个无法进行操作,此时无线程并发;对于既可插入又可移除的队列,插入和移除的操作如图所示,从上到下分别是:空队列、插入线程插入 1、插入线程插入 2 和移除线程移除 1。
首先是一个空队列,插入 1 之后,插入 2 的线程和移除 1 的线程同时执行,插入线程更改的数据有:原 head 的 next 节点、原 first 的值、head 指向原 first,移除线程更改的数据有:原 first 的 next 执行新增节点、原 tail 指向新增节点,可以看出两个线程的操作并不冲突。