# 前言

它是一个具有延时的队列,队列中的元素必须实现 getDelay 和 compareTo 方法。

# 源码分析

# Delay

Delayed 接口定义了 getDelay 方法,获取每个元素的延时时间,同时也继承了 Comparable 接口,来比较元素之间的大小。

public interface Delayed extends Comparable<Delayed> {
    /**
     * 每个元素都必须实现该接口,以便判断延时时间
     */
    long getDelay(TimeUnit unit);
}

# 变量

队列元素的直接使用优先队列来存储,优先队列使用数组存储,是无界队列。

// 锁
private final transient ReentrantLock lock = new ReentrantLock();
// 使用优先队列来存储
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 等待线程,用来等待队列第一个元素的延时,使用 Leader/Follower 模式
private Thread leader = null;
// 移除或检查元素时,队列为空则阻塞等待
private final Condition available = lock.newCondition();

# 构造方法

/**
 * 由于存储结构直接使用优先队列,初始化容量使用优先队列默认值,不能进行自定义
 */
public DelayQueue() {}
/**
 * 初始化一些数据
 */
public DelayQueue(Collection<? extends E> c) {
  // AbstractQueue 的 addAll 方法,遍历进行插入,插入方法由阻塞队列自己重写
  this.addAll(c);
}

# 插入方法

# add(e)

public boolean add(E e) {
  // 直接调用 offer 方法,由于是无界阻塞队列,所以不需要抛出队列已满的异常
  return offer(e);
}

# offer(e)

public boolean offer(E e) {
  // 获取锁进行插入
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    // 插入优先队列
    q.offer(e);
    // 插入成功
    if (q.peek() == e) {
      // ?
      leader = null;
      // 唤醒移除等待线程
      available.signal();
    }
    return true;
  } finally {
    lock.unlock();
  }
}

# put(e)

public void put(E e) {
  // 直接调用 offer 方法,由于是无界阻塞队列,所以不需要阻塞
  offer(e);
}

# offer(e, timeout, unit)

public boolean offer(E e, long timeout, TimeUnit unit) {
  // 直接调用 offer 方法,无界队列不需要不需要超时阻塞
  return offer(e); 
}

# 移除方法

# remove()

直接继承父类方法。

/**
 * AbstractQueue 的 remove 方法
 * 可看出与 poll 方法不同的是,如果没有拿到元素,会抛出异常
 */
public E remove() {
  E x = poll();
  if (x != null)
    return x;
  else
    throw new NoSuchElementException();
}

# poll()

public E poll() {
  // 获取锁进行操作
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    // 先获取第一个
    E first = q.peek();
    // 第一个是空或者还没到时间,返回 null
    if (first == null || first.getDelay(NANOSECONDS) > 0)
      return null;
    // 否则移除第一个
    else
      return q.poll();
  } finally {
    lock.unlock();
  }
}

# take()

public E take() throws InterruptedException {
  // 获取锁进行操作
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    for (;;) {
      // 先获取第一个
      E first = q.peek();
      // 队列为空进行等待
      if (first == null)
        available.await();
      // 队列不空尝试取出第一个
      else {
        long delay = first.getDelay(NANOSECONDS);
        // 延时已到,取出
        if (delay <= 0)
          return q.poll();
        // 延迟未到,先把 first 设为 null,下次 first 可能变了
        first = null; 
        //leader 不为空,说明有线程在延时等待,进入 Follower 状态
        if (leader != null)
          available.await();
        else {
          // 没有还 Leader,将 leader 设置为当前线程,进入 Leader 状态
          Thread thisThread = Thread.currentThread();
          leader = thisThread;
          try {
            // 当前线程进行延时等待
            available.awaitNanos(delay);
          } finally {
            // 延时已到,如果当前线程是 Leader 状态,释放 leader,进入 Processing 状态
            if (leader == thisThread)
              leader = null;
          }
        }
      }
    }
  } finally {
    // 没有线程在延时等待且第一个元素有值,唤醒下一个 Leader
    if (leader == null && q.peek() != null)
      available.signal();
    lock.unlock();
  }
}

# poll(timeout, unit)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  // 等待时间
  long nanos = unit.toNanos(timeout);
  // 获取锁进行操作
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    for (;;) {
      // 先获取到第一个元素
      E first = q.peek();
      // 队列为空
      if (first == null) {
        // 阻塞等待时间已到,直接返回 null
        if (nanos <= 0)
          return null;
        // 阻塞等待时间未到,进行超时阻塞等待
        else
          nanos = available.awaitNanos(nanos);
      } else {
        // 队列不空
        long delay = first.getDelay(NANOSECONDS);
        // 延时已到,直接移除第一个元素
        if (delay <= 0)
          return q.poll();
        // 延时未到且已到超时时间,不能移除第一个元素,返回 null
        if (nanos <= 0)
          return null;
        // 未到阻塞超时时间
        first = null; 
        // 阻塞时间小于延时时间或者有其他线程在等待延时,进行超时等待
        if (nanos < delay || leader != null)
          nanos = available.awaitNanos(nanos);
        else {
          // 阻塞时间大于延时时间且没有其他线程在等待延时,当前线程直接等待延时时间
          Thread thisThread = Thread.currentThread();
          leader = thisThread;
          try {
            // 同时更新阻塞超时时间
            long timeLeft = available.awaitNanos(delay);
            nanos -= delay - timeLeft;
          } finally {
            if (leader == thisThread)
              leader = null;
          }
        }
      }
    }
  } finally {
    if (leader == null && q.peek() != null)
      available.signal();
    lock.unlock();
  }
}

# 检查方法

# element()

直接继承自 AbstractQueue 的 element () 方法。

/** 
 * AbstractQueue 的方法
 * 可看出,与 peek () 方法不同的是,如果为空,会抛出异常
 */
public E element() {  
  E x = peek();  
  if (x != null)    
    return x;  
  else    
    throw new NoSuchElementException();
}

# peek()

public E peek() {
  // 获取锁进行操作
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    // 直接使用优先队列的方法
    return q.peek();
  } finally {
    lock.unlock();
  }
}

# 并发控制

延时队列使用一个锁,插入、移除和检查均使用锁进行同步操作,队列为空时,移除线程进行等待,直到队列不空被唤醒。在移除元素时,除了队列为空等待时,第一个元素延时未到也要进行等待,后者的等待没有必要让所有线程都进行等待,这样会浪费资源,只需要选取一个 Leader 进行等待,然后移除即可,其他线程进入 Follower 状态,等待移除下一个元素。在 Leader 等待延时移除元素之后,进入 Processing 状态,要唤醒下一个线程竞争 Leader。这就是 Leader/Follower 模式。