这一篇重点看下消费者的处理逻辑,我们来分析下disruptor为什么快?

在分析Disruptor的消费者的处理逻辑的时候,如果有多个消费者,Disruptor如何处理这些消费者的竞争关系? handleEventsWith和handleEventsWithWorkerPool又有什么区别?

我自己简单写了一个测试类来进行测试。

package com.lmax.disruptor.example;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;

/**
 * @Auther: 李泳权
 * @Date: 2019/10/10 14:05
 * @Description:
 */
public class SimpleDisruptorTest {
    static final int RING_SIZE = 8;
    public static void main(String[] args) throws InterruptedException
    {
        Disruptor<SimpleMessage> disruptor = new Disruptor<>(
                SimpleMessage.FACTORY, RING_SIZE, DaemonThreadFactory.INSTANCE, ProducerType.MULTI,
                new BlockingWaitStrategy());
        //disruptor.handleEventsWith(new Consumer()).then(new Consumer());
        //disruptor.handleEventsWith(new Consumer[]{new Consumer(),new Consumer()});
        disruptor.handleEventsWithWorkerPool(new Consumer[]{new Consumer(),new Consumer()});
        final RingBuffer<SimpleMessage> ringBuffer = disruptor.getRingBuffer();
        Publisher p = new Publisher();
        System.out.println("publishing " + RING_SIZE + " messages");
        int i = 0;
        for (; i < RING_SIZE; i++)
        {
            ringBuffer.publishEvent(p, (long)i);
            Thread.sleep(10);
        }
        System.out.println("start disruptor");
        disruptor.start();
        System.out.println("continue publishing");
        while (true)
        {
            ringBuffer.publishEvent(p, (long)i);
            Thread.sleep(10);
            i++;
        }
    }

    public static class Publisher implements EventTranslatorOneArg<SimpleMessage, Long>
    {
        @Override
        public void translateTo(SimpleMessage message, long sequence,Long value)
        {
            message.setValue(value);
            message.setSequence(sequence);
        }
    }

    public static class Consumer implements EventHandler<SimpleMessage>,WorkHandler<SimpleMessage>
    {
        @Override
        public void onEvent(SimpleMessage event, long sequence, boolean endOfBatch) throws Exception
        {
            System.out.println("sequence:"+sequence+","+"event:"+event.toString());
        }

        @Override
        public void onEvent(SimpleMessage event) throws Exception {
            System.out.println("event:"+event.toString());
        }
    }

    public static class SimpleMessage{
        private long value;
        private long sequence;
        private static final EventFactory<SimpleMessage> FACTORY = new EventFactory<SimpleMessage>()
        {
            @Override
            public SimpleMessage newInstance()
            {
                return new SimpleMessage();
            }
        };

        public long getValue() {
            return value;
        }

        public void setValue(long value) {
            this.value = value;
        }

        public long getSequence() {
            return sequence;
        }

        public void setSequence(long sequence) {
            this.sequence = sequence;
        }

        @Override
        public String toString() {
            return "SimpleMessage{" +
                    "value=" + value +
                    ", sequence=" + sequence +
                    '}';
        }
    }
}

对于同一条消息m,handleEventsWith会让每个消费者都分别消费一遍(消息广播)。而handleEventsWithWorkerPool的多个消费者会竞争消费,对于一条消息只会被一个消费者消费。

我们看disruptor的方法介绍

    /**
     * Set up a {@link WorkerPool} to distribute an event to one of a pool of work handler threads.
     * Each event will only be processed by one of the work handlers.
     * The Disruptor will automatically start this processors when {@link #start()} is called.
     *
     * @param workHandlers the work handlers that will process events.
     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
     */
    @SafeVarargs
    @SuppressWarnings("varargs")
    public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
    {
        return createWorkerPool(new Sequence[0], workHandlers);
    }
graph LR
m1-->c1(consumer1)
m1-->c2(consumer2)
m2-->c1(consumer1)
m2-->c2(consumer2)
graph LR
m1-->c1(consumer1)
m2-->c2(consumer2)

我们关注下下面的两个类WorkProcessor和BatchEventProcessor

graph BT
E(EventProcessor)
WorkProcessor-->E
BatchEventProcessor-->E

handleEventsWith会创建BatchEventProcessor对象,handleEventsWithWorkerPool会构建WorkProcessor。

我们看下官方的类描述

/**
 * Convenience class for handling the batching semantics of consuming entries from a {@link RingBuffer}
 * and delegating the available events to an {@link EventHandler}.
 * <p>
 * If the {@link EventHandler} also implements {@link LifecycleAware} it will be notified just after the thread
 * is started and just before the thread is shutdown.
 *
 * @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
 */
public final class BatchEventProcessor<T>
    implements EventProcessor

BatchEventProcessor是批处理事件模型。

/**
 * <p>A {@link WorkProcessor} wraps a single {@link WorkHandler}, effectively consuming the sequence
 * and ensuring appropriate barriers.</p>
 *
 * <p>Generally, this will be used as part of a {@link WorkerPool}.</p>
 *
 * @param <T> event implementation storing the details for the work to processed.
 */
public final class WorkProcessor<T>
    implements EventProcessor
{

WorkProcessor对于每条消息只会处理一次。

我们看下BatchEventProcessor的处理逻辑。

    private void processEvents()
    {
        T event = null;
        long nextSequence = sequence.get() + 1L;

        while (true)
        {
            try
            {
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (batchStartAware != null)
                {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }

                while (nextSequence <= availableSequence)
                {
                    event = dataProvider.get(nextSequence);
                    //事件触发
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }

                sequence.set(availableSequence);
            }
            ...
        }
    }

一开始nextSequence = sequence.get() + 1L;

availableSequence相当于写入的尾指针

graph LR
m0("-1")-->m1(0)
m1-->m2(1)
m2-->m3(2)
m3-->m4(3)
m4-->m5(4)
subgraph availableSequence
m4
end
subgraph sequence
m0
end
subgraph nextSequence
m1
end

nextSequence会不断地往后移。当nextSequence>availableSequence,会把sequence更新为availableSequence。然后再读取下一个availableSequence的位点

graph LR
m0("-1")-->m1(0)
m1-->m2(1)
m2-->m3(2)
m3-->m4(3)
m4-->m5(4)
subgraph availableSequence
m4
end
subgraph sequence
m0
end
subgraph nextSequence
m2
end
graph LR
m0("-1")-->m1(0)
m1-->m2(1)
m2-->m3(2)
m3-->m4(3)
m4-->m5(4)
subgraph sequence
subgraph availableSequence
m4
end
end
subgraph nextSequence
m5
end

由于BatchEventProcessor是并行的模型,对于sequence和nextSequence并不会产生竞争,因此也需要锁来协调。

我们需要重点关注的是WorkProcessor这个类。

            try
            {
                // if previous sequence was processed - fetch the next sequence and set
                // that we have successfully processed the previous sequence
                // typically, this will be true
                // this prevents the sequence getting too far forward if an exception
                // is thrown from the WorkHandler
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }

                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }

我们用一个图来模拟整个过程

graph LR
subgraph 消费者指针
m2(1)
end
m1(0)-->m2
m2-->m3(2)
subgraph 生产者指针
m4(3)
end
m3-->m4
m4-->m5(4)
c1(消费者1)
c2(消费者2)
c1==>|抢消息1|m2
c2==>|抢消息1|m2

假设消费者1抢成功,并成功把 消费者的指针往下挪一位。

graph LR
subgraph 消费者指针
m3
end
m1(0)-->m2(1)
m2-->m3(2)
subgraph 生产者指针
m4(3)
end
m3-->m4
m4-->m5(4)
c1(消费者1)
c2(消费者2)
c1==>|抢成功|m2
c1==>|事件处理|c1
c2==>|继续抢消费者指针|m3

消费者1抢到消息1,然后接下来进行事件处理。抢不到消费的消费者2继续抢消费者指针。

graph LR
m1(0)-->m2(1)
m2-->m3(2)
subgraph 生产者指针
subgraph 消费者指针
m4(3)
end
end
m3-->m4
m4-->m5(4)
c1(消费者1)
c2(消费者2)
c1==>|继续抢消费者指针|m4
c2==>|抢成功|m3
c2==>|事件处理|c2

消费者2抢消息2成功,接着继续处理消息。消费者1抢消息3

graph LR
m1(0)-->m2(1)
m2-->m3(2)
subgraph 生产者指针
m4(3)
end
subgraph 消费者指针
m5
end
m3-->m4
m4-->m5(4)
c1(消费者1)
c2(消费者2)
c1==>|抢成功|m4
c1==>|事件处理|c1
c2==>|等待新消息|m5

总结

问题