# 前言
它是一个具有延时的队列,队列中的元素必须实现 getDelay 和 compareTo 方法。
# 源码分析
# Delay
Delayed 接口定义了 getDelay 方法,获取每个元素的延时时间,同时也继承了 Comparable 接口,来比较元素之间的大小。
| public interface Delayed extends Comparable<Delayed> { |
| |
| |
| * 每个元素都必须实现该接口,以便判断延时时间 |
| */ |
| long getDelay(TimeUnit unit); |
| } |
# 变量
队列元素的直接使用优先队列来存储,优先队列使用数组存储,是无界队列。
| |
| private final transient ReentrantLock lock = new ReentrantLock(); |
| |
| private final PriorityQueue<E> q = new PriorityQueue<E>(); |
| |
| private Thread leader = null; |
| |
| private final Condition available = lock.newCondition(); |
# 构造方法
| |
| * 由于存储结构直接使用优先队列,初始化容量使用优先队列默认值,不能进行自定义 |
| */ |
| public DelayQueue() {} |
| |
| |
| * 初始化一些数据 |
| */ |
| public DelayQueue(Collection<? extends E> c) { |
| |
| this.addAll(c); |
| } |
# 插入方法
# add(e)
| public boolean add(E e) { |
| |
| return offer(e); |
| } |
# offer(e)
| public boolean offer(E e) { |
| |
| final ReentrantLock lock = this.lock; |
| lock.lock(); |
| try { |
| |
| q.offer(e); |
| |
| if (q.peek() == e) { |
| |
| leader = null; |
| |
| available.signal(); |
| } |
| return true; |
| } finally { |
| lock.unlock(); |
| } |
| } |
# put(e)
| public void put(E e) { |
| |
| offer(e); |
| } |
# offer(e, timeout, unit)
| public boolean offer(E e, long timeout, TimeUnit unit) { |
| |
| return offer(e); |
| } |
# 移除方法
# remove()
直接继承父类方法。
| |
| * AbstractQueue 的 remove 方法 |
| * 可看出与 poll 方法不同的是,如果没有拿到元素,会抛出异常 |
| */ |
| public E remove() { |
| E x = poll(); |
| if (x != null) |
| return x; |
| else |
| throw new NoSuchElementException(); |
| } |
# poll()
| public E poll() { |
| |
| final ReentrantLock lock = this.lock; |
| lock.lock(); |
| try { |
| |
| E first = q.peek(); |
| |
| if (first == null || first.getDelay(NANOSECONDS) > 0) |
| return null; |
| |
| else |
| return q.poll(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
# take()
| public E take() throws InterruptedException { |
| |
| final ReentrantLock lock = this.lock; |
| lock.lockInterruptibly(); |
| try { |
| for (;;) { |
| |
| E first = q.peek(); |
| |
| if (first == null) |
| available.await(); |
| |
| else { |
| long delay = first.getDelay(NANOSECONDS); |
| |
| if (delay <= 0) |
| return q.poll(); |
| |
| first = null; |
| |
| if (leader != null) |
| available.await(); |
| else { |
| |
| Thread thisThread = Thread.currentThread(); |
| leader = thisThread; |
| try { |
| |
| available.awaitNanos(delay); |
| } finally { |
| |
| if (leader == thisThread) |
| leader = null; |
| } |
| } |
| } |
| } |
| } finally { |
| |
| if (leader == null && q.peek() != null) |
| available.signal(); |
| lock.unlock(); |
| } |
| } |
# poll(timeout, unit)
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
| |
| long nanos = unit.toNanos(timeout); |
| |
| final ReentrantLock lock = this.lock; |
| lock.lockInterruptibly(); |
| try { |
| for (;;) { |
| |
| E first = q.peek(); |
| |
| if (first == null) { |
| |
| if (nanos <= 0) |
| return null; |
| |
| else |
| nanos = available.awaitNanos(nanos); |
| } else { |
| |
| long delay = first.getDelay(NANOSECONDS); |
| |
| if (delay <= 0) |
| return q.poll(); |
| |
| if (nanos <= 0) |
| return null; |
| |
| first = null; |
| |
| if (nanos < delay || leader != null) |
| nanos = available.awaitNanos(nanos); |
| else { |
| |
| Thread thisThread = Thread.currentThread(); |
| leader = thisThread; |
| try { |
| |
| long timeLeft = available.awaitNanos(delay); |
| nanos -= delay - timeLeft; |
| } finally { |
| if (leader == thisThread) |
| leader = null; |
| } |
| } |
| } |
| } |
| } finally { |
| if (leader == null && q.peek() != null) |
| available.signal(); |
| lock.unlock(); |
| } |
| } |
# 检查方法
# element()
直接继承自 AbstractQueue 的 element () 方法。
| |
| * AbstractQueue 的方法 |
| * 可看出,与 peek () 方法不同的是,如果为空,会抛出异常 |
| */ |
| public E element() { |
| E x = peek(); |
| if (x != null) |
| return x; |
| else |
| throw new NoSuchElementException(); |
| } |
# peek()
| public E peek() { |
| |
| final ReentrantLock lock = this.lock; |
| lock.lock(); |
| try { |
| |
| return q.peek(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
# 并发控制
延时队列使用一个锁,插入、移除和检查均使用锁进行同步操作,队列为空时,移除线程进行等待,直到队列不空被唤醒。在移除元素时,除了队列为空等待时,第一个元素延时未到也要进行等待,后者的等待没有必要让所有线程都进行等待,这样会浪费资源,只需要选取一个 Leader 进行等待,然后移除即可,其他线程进入 Follower 状态,等待移除下一个元素。在 Leader 等待延时移除元素之后,进入 Processing 状态,要唤醒下一个线程竞争 Leader。这就是 Leader/Follower 模式。