Disruptor(二)RingBuffer读取

上一章主要介绍 Ring Buffer 的数据结构,本章主要讲解如何使用 Disruptor 从 Ring Buffer 中读取数据。

1、消费者通过 ProcessingSequenceBarrier 读取数据

能够读取数据的前提是数据已经写入到 Ring Buffer 中,关于数据的写入,后面一章节会详细讲解。
image.png
RingBuffer 的元素的大小是 2 的 n 次方(上面 ringBufferSize 为 8,从序号 0 开始)。消费者(Consumer)是一个想从 RingBuffer 里读取数据的线程,它可以通过访问 ProcessingSequenceBarrier 对象和 RingBuffer 进行交互。消费者也需要知道它将要处理的序号,每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了 RingBuffer 里序号 8 之前(包括 8)的所有数据,那么它期待访问的下一个序号是 9。

2、消费者 BatchEventProcessor

关于消费者如何通过调用 SequenceBarrier 对象的 waitFor()方法,传递它所需要的下一个序号。本章节以 BatchEventProcessor 批量事件处理器为例进行讲解,首先查看类图。
image.png
主要继承 EventProcessor 接口和 Runnable 接口,本章主要介绍 run 方法,对于 BatchEventProcessor 的初始化暂时不做讲解。

public void run() {
    // 线程是否运行
    if (!running.compareAndSet(false, true)) {
        throw new IllegalStateException("Thread is already running");
    }
    // 将ProcessingSequenceBarrier的alerted设置成false
    sequenceBarrier.clearAlert();
    // start事件处理
    notifyStart();

    T event = null;
    // 获取当前事件处理器的下一个sequence
    long nextSequence = sequence.get() + 1L;
    try {
        while (true) {
            try {
                // 从ProcessingSequenceBarrier获取可用的availableSequence
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                // 下一个nextSequence比可用的availableSequence小的时候,获取事件,并触发事件处理
                while (nextSequence <= availableSequence) {
                    event = dataProvider.get(nextSequence);
                    // 消费者事件处理
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }
                // 设置当前事件处理器已经处理的sequence
                sequence.set(availableSequence);
            } catch (final TimeoutException e) {
                // 超时处理
                notifyTimeout(sequence.get());
            } catch (final AlertException ex) {
                if (!running.get()) {
                    break;
                }
            } catch (final Throwable ex) {
                // 异常事件处理
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    } finally {
        // 关闭事件处理
        notifyShutdown();
        running.set(false);
    }
}

拿到了数据后,消费者(Consumer)会更新自己的标识(cursor),消费者(Consumer)现在只需要通过简单通过 ProcessingSequenceBarrier 拿到可用的 Ringbuffer 中的 Sequence 序号就可以可以读取数据了。因为这些新的节点的确已经写入了数据(RingBuffer 本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问不用加锁。不仅代码实现起来可以更加安全和简单,而且不用加锁使得速度更快。另一个好处是可以用多个消费者(Consumer)去读同一个 RingBuffer,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在 Disruptor 的协调下实现真正的并发数据处理。

3、ProcessingSequenceBarrier 获取可用序号

在上面的 BatchEventProcessor 中的 run 方法中有如下调用

final long availableSequence = sequenceBarrier.waitFor(nextSequence);

获取 RingBuffer 最大可访问的 availableSequence 序号,在上面的例子中是 10。
首先看下 ProcessingSequenceBarrier 的类图。
image.png
其实现了 SequenceBarrier 接口,用于和 RingBuffer 之间进行交互,下面主要看下构造函数和 waitFor 函数。

public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException {
    // 检查clert异常
    checkAlert();
    // 通过waitStrategy策略获取可用的序号,cursorSequence为当前的Sequence,dependentSequence为依赖的Sequence[]
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    // 产生比预期的sequence小,可能序号被重置回老的的oldSequence值
    //可参考https://github.com/LMAX-Exchange/disruptor/issues/76
    if (availableSequence <sequence) {
        return availableSequence;
    }
    // 获取最大的可用的已经发布的sequence,可能比sequence小
    // 会在多生产者中出现,当生产者1获取到序号13,生产者2获取到14;生产者1没发布,生产者2发布,会导致获取的可用序号为12,而sequence为13
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

4、WaitStrategy 策略

waitFor 函数的主要功能为获取到可用的 sequence 并返回给事件处理器。SequenceBarrier 内部有一个 WaitStrategy 方法来决定它如何等待这个序号,我现在不会去描述它的细节,代码的注释里已经概括了每一种 WaitStrategy 的优点和缺点,目前的实现方式主要有以下几种,后续会做详细介绍。

  • BlockingWaitStrategy
  • BusySpinWaitStrategy
  • LiteBlockingWaitStrategy
  • PhasedBackoffWaitStrategy
  • SleepingWaitStrategy
  • TimeoutBlockingWaitStrategy
  • YieldingWaitStrategy
https://alicharles.oss-cn-hangzhou.aliyuncs.com/static/images/mp_qrcode.jpg
文章目录
  1. 1、消费者通过 ProcessingSequenceBarrier 读取数据
  2. 2、消费者 BatchEventProcessor
  3. 3、ProcessingSequenceBarrier 获取可用序号
  4. 4、WaitStrategy 策略