之前看过Disruptor的实现,然而如果需要真正地了解Disruptor的为什么快,我们需要对既有的内存队列做一个分析(没有对比就没有伤害)。
下面列举了一些常见的内存队列:
| 队列 | 有界性 | 锁 | 数据结构 | 
|---|---|---|---|
| ArrayBlockingQueue | bounded | 加锁 | arraylist | 
| LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist | 
| ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist | 
| LinkedTransferQueue | unbounded | 无锁 | linkedlist | 
| PriorityBlockingQueue | unbounded | 加锁 | heap | 
| DelayQueue | unbounded | 加锁 | heap | 
我将重点分析前面3个队列。
- ArrayBlockingQueue
- LinkedBlockingQueue
- ConcurrentLinkedQueue

这3个类同样继承AbstractQueue
接口Queue的定义
先来复习一下queue接口的定义。
public interface Queue<E> extends Collection<E> {
    /**
     * Inserts the specified element into this queue if it is possible to do so
     * immediately without violating capacity restrictions, returning
     * {@code true} upon success and throwing an {@code IllegalStateException}
     * if no space is currently available.
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws IllegalStateException if the element cannot be added at this
     *         time due to capacity restrictions
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null and
     *         this queue does not permit null elements
     * @throws IllegalArgumentException if some property of this element
     *         prevents it from being added to this queue
     */
    boolean add(E e);
    /**
     * Inserts the specified element into this queue if it is possible to do
     * so immediately without violating capacity restrictions.
     * When using a capacity-restricted queue, this method is generally
     * preferable to {@link #add}, which can fail to insert an element only
     * by throwing an exception.
     *
     * @param e the element to add
     * @return {@code true} if the element was added to this queue, else
     *         {@code false}
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null and
     *         this queue does not permit null elements
     * @throws IllegalArgumentException if some property of this element
     *         prevents it from being added to this queue
     */
    boolean offer(E e);
    /**
     * Retrieves and removes the head of this queue.  This method differs
     * from {@link #poll poll} only in that it throws an exception if this
     * queue is empty.
     *
     * @return the head of this queue
     * @throws NoSuchElementException if this queue is empty
     */
    E remove();
    /**
     * Retrieves and removes the head of this queue,
     * or returns {@code null} if this queue is empty.
     *
     * @return the head of this queue, or {@code null} if this queue is empty
     */
    E poll();
    /**
     * Retrieves, but does not remove, the head of this queue.  This method
     * differs from {@link #peek peek} only in that it throws an exception
     * if this queue is empty.
     *
     * @return the head of this queue
     * @throws NoSuchElementException if this queue is empty
     */
    E element();
    /**
     * Retrieves, but does not remove, the head of this queue,
     * or returns {@code null} if this queue is empty.
     *
     * @return the head of this queue, or {@code null} if this queue is empty
     */
    E peek();
}
- add
添加一个元素到队列
- offer
添加一个元素到队列。跟add的区别是,add添加失败后会抛异常,而offer是返回false。
- remove
出队
- poll
出队,跟remove的区别是,队列为空,remove抛异常,而poll会返回一个null
- element
返回队列的头,不会删除。
- peek
返回队列的头,不会删除。跟element的区别是,如果队列为空,element抛异常,而peek返回null
一般业务使用offer,poll,peek居多。
ArrayBlockingQueue
顾名思义,使用数组存储队列的元素。通过一个写指针(putIndex)和取指针(takeIndex)来控制队列的入队和出队。
graph LR
0-->1
subgraph takeIndex
1
end
1-->2
2-->3
subgraph putIndex
4
end
3-->4
当putIndex=数组的大小时,指针又会回到0。
如图,下一个元素入队会导致putIndex到达0。
graph LR
0-->1
subgraph takeIndex
1
end
1-->2
2-->3
subgraph putIndex
0
end
3-->4
当putIndex和takeIndex到达同一个位置,有两种可能。
- 队列已满
- 队列为空
ArrayBlockingQueue通过一个属性count来区分这两种场景。
- 队列已满。如果写入会阻塞生产者(有两个offer方法,带timeout的才会阻塞)
- 队列为空。如果出队会阻塞消费者(有两个poll方法,带timeout的才会阻塞)
下面看下源码
- 入队
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
    //加锁
        lock.lockInterruptibly();
        try {
            //队列已满,等待timeout
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            //入队
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
    //队列已满,指针回到0点
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
    //唤醒阻塞等待的生产者
        notEmpty.signal();
    }
- 出队
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
    //加锁
        lock.lockInterruptibly();
        try {
            //队列为空,阻塞等待
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
    //对象置位空
        items[takeIndex] = null;
    //取指针+1
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
总结:
ArrayBlockingQueue是最简单的队列实现,通过锁来保证在多线程环境下的数据一致性。加锁的原因是由于putIndex和takeIndex在到达数组的length-1时,需要把指针挪回0的位置,而disruptor的指针是一直自增,可以利用AtmoicLong的CAS来保证线程安全。
LinkedBlockingQueue
先看下官方的介绍。
/**
 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
 * linked nodes.
 * This queue orders elements FIFO (first-in-first-out).
 * The <em>head</em> of the queue is that element that has been on the
 * queue the longest time.
 * The <em>tail</em> of the queue is that element that has been on the
 * queue the shortest time. New elements
 * are inserted at the tail of the queue, and the queue retrieval
 * operations obtain elements at the head of the queue.
 * Linked queues typically have higher throughput than array-based queues but
 * less predictable performance in most concurrent applications.
有几点关键信息
- 头尾指针的定义
- 链表队列的吞吐量高于数组队列
对于第二点个人持保留意见。
- 数组队列在物理存储上是一个连续的空间,计算机对于这种数据的连续读取是有优化的。
- 每次的入队都需要重新申请一个内存块,数据队列的空间是预申请。
- 出队后的对象是一个死链,需要通过GC释放
LinkedBlockingQueue在锁的层面有做优化。写锁和读锁是分开,锁的粒度比ArrayBlockingQueue要小。
/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
入队
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
/**
     * Links node at end of queue.
     *
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
加锁,尾指针(last)修改
出队的逻辑雷同。
总结
这篇文章主要分析了ArrayBlockingQueue和LinkedBlockingQueue,这两个队列在并发的场景下,都是使用了锁来保证线程安全。ConcurrentLinkedQueue的实现会稍复杂一些,计划在下一篇文章单独进行分析。
