DelayQueue源码分析

DelayQueue/BlockingQueue

Posted by Jay on June 25, 2019

DelayQueue源码分析

DelayQueue是一个包含Delayed类型元素的无界阻塞队列,只有在元素延迟时间过期后才能获取这些元素。DelayQueue的特性基本上是由BlockingQueue、PriorityQueue和Delayed的特性来决定的。DelayQueue通过Delayed接口,使得不同元素之间能按照剩余的延迟时间进行排序(Delayed.getDelay()),然后通过PriorityQueue(优先队列,基于小顶堆),使得Delayed.getDelay()最小的元素或者最早过期的元素能最先被处理,然后利用BlockingQueue,将元素处理的操作阻塞住。

一、Delayed接口分析

public interface Delayed extends Comparable<Delayed> {

  	// 返回剩余的延迟时间。value<=0,表示延迟已过期
    long getDelay(TimeUnit unit);
}

An implementation of this interface must define a {@code compareTo} method that provides an ordering consistent with its {@code getDelay} method.

Delayed接口继承自Comparable接口,并且有一个getDelay(TimeUnit)方法用于获取剩余延迟时间。同时该接口定义中指出,Delayed接口的实现类必须实现compareTo()方法,并且compareTo()方法必须提供与getDelay()方法一致的排序规则和顺序。下面DelayQueue中的元素实现了Delayed接口,并保存在优先队列(小顶堆)PriorityQueue中,排序规则便是getDelay()方法的值。

二、DelayQueue分析

1.重要属性
// 重入锁,用于线程同步
private final transient ReentrantLock lock = new ReentrantLock(); 
// 优先队列(小顶堆),存放Delayed类型元素。Delayed.getDelay()值小的元素在堆顶。
private final PriorityQueue<E> q = new PriorityQueue<E>(); 

// 当前等待获取过期元素的leader线程
// 1.当有线程已成为leader时,该线程是限时等待元素,其他follower线程是无限期的等待元素。
// 2.当leader获取到元素并从take()、poll()返回时,它必须唤醒其他等待的线程,
// 除非其他线程成为了新leader(队列头部元素被具有更早过期时间的元素替换)
// 3.当堆顶元素(队列头部元素)被具有更早过期时间的元素替换时,leader属性置为null,
// 唤醒任何等待的线程(原leader或followers)
// 4.任何等待的线程都有可能获取和失去leader资格
private Thread leader = null;

// 1.当堆顶元素(队列头部元素)被具有更早过期时间的元素替换时,调用available.signal();
// 2.当leader线程获取到过期元素,在返回之前,调用available.signal()唤醒某个线程,使之能成为leader
private final Condition available = lock.newCondition();
2.构造函数
public DelayQueue() {} // 构造空的DelayQueue

public DelayQueue(Collection<? extends E> c) { // 构造一个初始包含c中元素的DelayQueue
    this.addAll(c);
}
3.添加元素的方法(非阻塞)

添加元素的方法有add(E e),offer(E e),put(E e)和offer(E e, long timeout, TimeUnit unit)。

(1) add(E e)
public boolean add(E e) {
    return offer(e);
}
(2) put(E e)
public void put(E e) {
    offer(e);
}
(3) offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}
(4) offer(E e)

以上add(E e),put(E e)和offer(E e, long timeout, TimeUnit unit)最终都调用到了offer(E e)方法。下面分析其实现。

public boolean offer(E e) { // 非阻塞式添加元素
    final ReentrantLock lock = this.lock;
    lock.lock(); // 先获取互斥锁
    try {
        q.offer(e); // e保存到优先队列q中,堆顶元素可能发生了变化
      	// 下面有两种情形:
        // 1.第一次存入元素,所以q.peek() == e,唤醒等待的线程;
        // 2.非第一次添加元素,添加后,该元素上浮到堆顶,此时之前线程等待的队列头部元素发生了变化,
        // 不管之前线程leader与followers,直接将leader置为null,重新选择leader线程(执行
        // 唤醒操作,可能是原leader或follower线程被唤醒,成为新leader),执行元素的获取。
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true; // 因为DelayQueue是无界队列,所以添加操作总是返回成功
    } finally {
        lock.unlock(); // 释放锁
    }
}
4.获取元素的操作

获取元素的方法有poll(),take()和poll(long timeout, TimeUnit unit)。

(1) poll()(非阻塞)

获取并删除队列的头部元素,如果还未过期,则返回null。

// 获取并删除队列的头部元素,如果还未过期,则返回null
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 获取互斥锁
    try {
        E first = q.peek(); // 获取堆顶元素(优先队列头部元素)
        if (first == null || first.getDelay(NANOSECONDS) > 0)// 如果为null或者还未过期,返回null
            return null;
        else
            return q.poll(); // 否则,返回已过期的头部元素
    } finally {
        lock.unlock(); // 释放锁
    }
}
(2) take()(阻塞)

阻塞式地获取过期的元素。

// 阻塞式地获取过期的元素
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 获取互斥锁,可被中断
    try {
        for (;;) {
            E first = q.peek(); // 队列头部元素
            if (first == null)
                available.await(); // 队头元素为空(无元素),直接等待
            else {
                long delay = first.getDelay(NANOSECONDS); // 获取元素过期还需要等待的时间
                if (delay <= 0)
                    return q.poll(); // 元素已过期,可以直接取出、返回
                first = null; // 线程在等待时不能持有队头元素的引用,防止内存泄漏
                if (leader != null) // leader线程已存在,该线程直接等待
                  	// 等待线程可能被leader线程唤醒,也可能被添加元素的线程唤醒(选择新leader)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread; // 当前线程成为leader线程
                    try {
                        available.awaitNanos(delay); // 限时等待
                    } finally {
                      	// 1.该leader线程限时等待完成,可以获取队头过期元素返回了,
                        // 并将leader置为null
                        // 2.有可能等待期间队头元素发生变化(新元素添加),此时
                        // leader != thisThread(leader重新选择)
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
      	// 如果leader线程为null且队头任务不为空,唤醒其中一个等待线程,使之能成为新leader
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock(); // 释放锁
    }
}
(3)poll(long timeout, TimeUnit unit)(阻塞)

限时获取队头过期的元素,如果在指定时间内没有元素过期,则返回null。

// 限时获取队头过期的元素,如果在指定时间内没有元素过期,则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout); // 最多等待的时间(纳秒)
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 获取互斥锁,可被中断
    try {
        for (;;) {
            E first = q.peek(); // 队列头部元素
            if (first == null) {
                if (nanos <= 0) // 元素为null(队列为空)且已超时,则直接返回null
                    return null;
                else // 元素为null(队列为空)且未超时,则等待nanos时间
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS); // 获取元素过期还需要等待的时间
                if (delay <= 0) // 元素已过期,可以直接取出、返回
                    return q.poll();
                if (nanos <= 0) // 已超时,则直接返回null
                    return null;
                first = null; // 线程在等待时不能持有队头任务的引用,防止内存泄漏
              	// leader线程已存在或者最多等待的时间小于delay,该线程直接等待nanos时间
                if (nanos < delay || leader != null)
                  	// 等待线程可能被leader线程唤醒,也可能被添加元素的线程唤醒(选择新leader)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread; // 当前线程成为leader线程
                    try {
                        long timeLeft = available.awaitNanos(delay); // 等待delay时间
                        nanos -= delay - timeLeft; // 还可以等待的时间
                    } finally {
                      // 1.该leader线程限时等待完成,可以获取队头过期元素返回了,
                      // 并将leader置为null
                      // 2.有可能等待期间队头元素发生变化(新元素添加,并成为堆顶),此时
                      // leader != thisThread(leader重新选择)
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
      	// 如果leader线程为null且队头任务不为空,唤醒其中一个等待线程,使之能成为新leader
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock(); // 释放锁
    }
}
5.DelayQueue总结
  • DelayQueue内部使用PriorityQueue来保存元素和维护元素顺序;
  • DelayQueue存储的元素必须实现Delayed接口,通过实现Delayed接口,可以获取到元素的剩余延迟时间,以及可以比较元素大小(Delayed 继承Comparable);
  • DelayQueue通过一个重入锁来控制元素的入队出队行为;
  • PriorityQueue只是负责存储数据以及维护元素的顺序,对于元素是否过期以及取数据则是在DelayQueue中进行判断控制的。

三、参考文献