基于版本jdk1.7.0_80

java.util.concurrent.DelayQueue

代码如下

/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/ /*
*
*
*
*
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/ package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*; /**
* An unbounded {@linkplain BlockingQueue blocking queue} of
* <tt>Delayed</tt> elements, in which an element can only be taken
* when its delay has expired. The <em>head</em> of the queue is that
* <tt>Delayed</tt> element whose delay expired furthest in the
* past. If no delay has expired there is no head and <tt>poll</tt>
* will return <tt>null</tt>. Expiration occurs when an element's
* <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
* than or equal to zero. Even though unexpired elements cannot be
* removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
* treated as normal elements. For example, the <tt>size</tt> method
* returns the count of both expired and unexpired elements.
* This queue does not permit null elements.
*
* <p>This class and its iterator implement all of the
* <em>optional</em> methods of the {@link Collection} and {@link
* Iterator} interfaces.
*
* <p>This class is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this collection
*/ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> { private transient final ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>(); /**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private Thread leader = null; /**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
private final Condition available = lock.newCondition(); /**
* Creates a new <tt>DelayQueue</tt> that is initially empty.
*/
public DelayQueue() {} /**
* Creates a <tt>DelayQueue</tt> initially containing the elements of the
* given collection of {@link Delayed} instances.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
} /**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return <tt>true</tt> (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return offer(e);
} /**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return <tt>true</tt>
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
} /**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) {
offer(e);
} /**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @param timeout This parameter is ignored as the method never blocks
* @param unit This parameter is ignored as the method never blocks
* @return <tt>true</tt>
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
} /**
* Retrieves and removes the head of this queue, or returns <tt>null</tt>
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or <tt>null</tt> if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
} /**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
} /**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
*
* @return the head of this queue, or <tt>null</tt> if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
} /**
* Retrieves, but does not remove, the head of this queue, or
* returns <tt>null</tt> if this queue is empty. Unlike
* <tt>poll</tt>, if no expired elements are available in the queue,
* this method returns the element that will expire next,
* if one exists.
*
* @return the head of this queue, or <tt>null</tt> if this
* queue is empty.
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
} public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
} /**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
for (;;) {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
break;
c.add(q.poll());
++n;
}
return n;
} finally {
lock.unlock();
}
} /**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
while (n < maxElements) {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
break;
c.add(q.poll());
++n;
}
return n;
} finally {
lock.unlock();
}
} /**
* Atomically removes all of the elements from this delay queue.
* The queue will be empty after this call returns.
* Elements with an unexpired delay are not waited for; they are
* simply discarded from the queue.
*/
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
} /**
* Always returns <tt>Integer.MAX_VALUE</tt> because
* a <tt>DelayQueue</tt> is not capacity constrained.
*
* @return <tt>Integer.MAX_VALUE</tt>
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
} /**
* Returns an array containing all of the elements in this queue.
* The returned array elements are in no particular order.
*
* <p>The returned array will be "safe" in that no references to it are
* maintained by this queue. (In other words, this method must allocate
* a new array). The caller is thus free to modify the returned array.
*
* <p>This method acts as bridge between array-based and collection-based
* APIs.
*
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray();
} finally {
lock.unlock();
}
} /**
* Returns an array containing all of the elements in this queue; the
* runtime type of the returned array is that of the specified array.
* The returned array elements are in no particular order.
* If the queue fits in the specified array, it is returned therein.
* Otherwise, a new array is allocated with the runtime type of the
* specified array and the size of this queue.
*
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
* <tt>null</tt>.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
* <p>The following code can be used to dump a delay queue into a newly
* allocated array of <tt>Delayed</tt>:
*
* <pre>
* Delayed[] a = q.toArray(new Delayed[0]);</pre>
*
* Note that <tt>toArray(new Object[0])</tt> is identical in function to
* <tt>toArray()</tt>.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
* same runtime type is allocated for this purpose
* @return an array containing all of the elements in this queue
* @throws ArrayStoreException if the runtime type of the specified array
* is not a supertype of the runtime type of every element in
* this queue
* @throws NullPointerException if the specified array is null
*/
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray(a);
} finally {
lock.unlock();
}
} /**
* Removes a single instance of the specified element from this
* queue, if it is present, whether or not it has expired.
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
} /**
* Returns an iterator over all the elements (both expired and
* unexpired) in this queue. The iterator does not return the
* elements in any particular order.
*
* <p>The returned iterator is a "weakly consistent" iterator that
* will never throw {@link java.util.ConcurrentModificationException
* ConcurrentModificationException}, and guarantees to traverse
* elements as they existed upon construction of the iterator, and
* may (but is not guaranteed to) reflect any modifications
* subsequent to construction.
*
* @return an iterator over the elements in this queue
*/
public Iterator<E> iterator() {
return new Itr(toArray());
} /**
* Snapshot iterator that works off copy of underlying q array.
*/
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return;
int lastRet; // index of last element, or -1 if no such Itr(Object[] array) {
lastRet = -1;
this.array = array;
} public boolean hasNext() {
return cursor < array.length;
} @SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E)array[cursor++];
} public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
Object x = array[lastRet];
lastRet = -1;
// Traverse underlying queue to find == element,
// not just a .equals element.
lock.lock();
try {
for (Iterator it = q.iterator(); it.hasNext(); ) {
if (it.next() == x) {
it.remove();
return;
}
}
} finally {
lock.unlock();
}
}
} }

0. DelayQueue简介

无界的阻塞队列,线程安全,只能存放实现了Delayed接口的对象,存入的对象只能在到期后才能取出。

1. 接口分析

DelayQueue继承于AbstractQueue抽象类

BlockingQueue<E>(阻塞队列语义)接口

2. DelayQueue原理概述

DelayQueue内部维护了一个PriorityQueue,存入PriorityQueue的对象必须实现Delayed接口,Delayed接口又继承于Comparable接口,对象必须正确实现Delayed接口的语义。这样PriorityQueue就能根据对象的到期时间进行排序,每次poll/take的时候,会检查PriorityQueue中的第一个元素,如果它没有到期,说明整个队列中的所有对象都未到期,不能取出。这样就实现了存入的对象只能在到期后才能取出的语义。

DelayQueue内部还维护了一个ReentrantLock变量lock,每次poll/offer操作都会加锁,这样就实现了线程安全的语义。

DelayQueue内部还维护了lock的Condition变量available,用它来控制队列为空时继续poll元素会被阻塞,如果有新元素被放入则将阻塞线程唤醒,这样就实现了阻塞队列的语义。

3. Delayed接口

public interface Delayed extends Comparable<Delayed> {

    /**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}

很简单的代码,可以看出,任何实现了Delayed接口的类,都必须要实现getDelay与compareTo方法,这两个方法必须被正确实现,否则DelayQueue将无法正常工作

long getDelay(TimeUnit unit) -> 根据传入的时间单位,返回当前对象距离到期还有多久时间(不能无视单位随意返回值,因为DelayQueue会根据这个返回时间设置等待时间,乱设可能会导致多余的自旋占用CPU)

public int compareTo(T o); -> 与传入的另外一个实现了Delayed接口的对象比较,两者谁先到期

4. leader线程

DelayQueue有一个很有趣的设计,内部维护了一个Thread类型的leader变量,其注释如下:

大概意思是说如果有多个线程在等待对象到期,只有一个线程会被设置为leader线程,这个leader线程会根据最近到期的元素来设置等待时间,其他线程都是永久等待。leader线程等待超时后会去取元素,然后唤醒其他等待线程。

关键点在于,只有一个线程会超时等待,其他线程永久等待(超时等待要维护计时器,开销肯定相对较大),这样就减少了开销。再一次体现了Doug Lea大神对性能的追求,膜之。

5. DelayQueue.offer方法解析

    /**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return <tt>true</tt>
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//加锁保证线程安全
try {
q.offer(e);//向内部维护的PriorityQueue插入元素,由于插入的对象实现了Comparable接口,距离超时时间最近的对象会排在队首
if (q.peek() == e) {//如果新插入的对象就被排在队首了,那么leader线程的等待时间就不正确了,需要将leader线程唤醒
leader = null;
available.signal();//唤醒leader线程
}
return true;
} finally {
lock.unlock();//解锁
}
}

6. DelayQueue.take方法解析

    /**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {//阻塞可中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//加全局可中断锁
try {
for (;;) {
E first = q.peek();//获取队首元素,这个元素的等待时间是最短的
if (first == null)
available.await();//如果队列为空,那也没法根据队首元素设置线程等待时间了,就直接无限等待
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);//用ns为单位,获取队首元素的等待时间
if (delay <= 0)//如果这个元素已经过期了,直接出队
return q.poll();
else if (leader != null)//如果队首元素还没有过期,而且leader线程存在,当前线程选择永久等待,leader线程会将其唤醒
available.await();
else {//将自己设置为leader线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);//根据队首元素设置自己的等待超时时间
} finally {
if (leader == thisThread)//如果leader线程还是自己(可能在offer方法中被修改)
leader = null;//让出leader位置
}
}
}
}
} finally {
if (leader == null && q.peek() != null)//如果没有leader线程,并且优先队列不为空,那么唤醒一个等待线程
available.signal();
lock.unlock();//解锁
}
}

这段代码写得很好,重点关注对available条件与leader变量的操作

7. DelayQueue.poll方法解析

    /**
* Retrieves and removes the head of this queue, or returns <tt>null</tt>
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or <tt>null</tt> if this
* queue has no elements with an expired delay
*/
public E poll() {//非阻塞方法
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
E first = q.peek();//获取优先队列的队首元素,如果队首元素已经超时,则poll队首元素并返回,否则返回null
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
} /**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
*
* @return the head of this queue, or <tt>null</tt> if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {//超时可中断方法
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//全局可中断锁
try {
for (;;) {
E first = q.peek();//查找队首元素
if (first == null) {//如果队列为空,则根据传入的超时时间设置线程等待时间
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);//获取队首元素的等待时间
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
if (nanos < delay || leader != null)//如果leader的等待时间比当前线程的等待时间长,当前线程设置等待时间并等待
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

8. DelayQueue的迭代器

    /**
* Returns an iterator over all the elements (both expired and
* unexpired) in this queue. The iterator does not return the
* elements in any particular order.
*
* <p>The returned iterator is a "weakly consistent" iterator that
* will never throw {@link java.util.ConcurrentModificationException
* ConcurrentModificationException}, and guarantees to traverse
* elements as they existed upon construction of the iterator, and
* may (but is not guaranteed to) reflect any modifications
* subsequent to construction.
*
* @return an iterator over the elements in this queue
*/
public Iterator<E> iterator() {
return new Itr(toArray());//将PriorityQueue中的所有元素复制一份
} /**
* Snapshot iterator that works off copy of underlying q array.
*/
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return;
int lastRet; // index of last element, or -1 if no such Itr(Object[] array) {
lastRet = -1;
this.array = array;
} public boolean hasNext() {
return cursor < array.length;
} @SuppressWarnings("unchecked")
public E next() {//遍历数组
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E)array[cursor++];
} public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
Object x = array[lastRet];//获取上一次返回的元素,也就是将要被删除的元素
lastRet = -1;
// Traverse underlying queue to find == element,
// not just a .equals element.
lock.lock();//加全局锁
try {
for (Iterator it = q.iterator(); it.hasNext(); ) {//遍历底层的PriorityQueue,删除对应的元素(不一定真的能找到这个元素)
if (it.next() == x) {
it.remove();
return;
}
}
} finally {
lock.unlock();
}
}
}

迭代器是弱一致的,初始化迭代器时,会创建底层PriorityQueue中所有元素的一个拷贝,遍历操作会在这个拷贝上进行(弱一致,原PriorityQueue的修改无法在迭代器中体现)

用迭代器删除元素时,会记下试图删除的元素,然后去原PriorityQueue里寻找,如果找到相同的元素,则将其删除(不一定真的有删除动作,因为原PriorityQueue可能已经将这个元素出队了)

由于迭代器操作的实际上是PriorityQueue的一个快照,所以无论如何不会抛出ConcurrentModificationException

迭代器的遍历次序没有保证

05-11 22:43