上一章主要介绍 Ring Buffer 的数据结构,本章主要讲解如何使用 Disruptor 从 Ring Buffer 中读取数据。
1、消费者通过 ProcessingSequenceBarrier 读取数据
能够读取数据的前提是数据已经写入到 Ring Buffer 中,关于数据的写入,后面一章节会详细讲解。
RingBuffer 的元素的大小是 2 的 n 次方(上面 ringBufferSize 为 8,从序号 0 开始)。消费者(Consumer)是一个想从 RingBuffer 里读取数据的线程,它可以通过访问 ProcessingSequenceBarrier 对象和 RingBuffer 进行交互。消费者也需要知道它将要处理的序号,每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了 RingBuffer 里序号 8 之前(包括 8)的所有数据,那么它期待访问的下一个序号是 9。
2、消费者 BatchEventProcessor
关于消费者如何通过调用 SequenceBarrier 对象的 waitFor()方法,传递它所需要的下一个序号。本章节以 BatchEventProcessor 批量事件处理器为例进行讲解,首先查看类图。
主要继承 EventProcessor 接口和 Runnable 接口,本章主要介绍 run 方法,对于 BatchEventProcessor 的初始化暂时不做讲解。
public void run() {
// 线程是否运行
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("Thread is already running");
}
// 将ProcessingSequenceBarrier的alerted设置成false
sequenceBarrier.clearAlert();
// start事件处理
notifyStart();
T event = null;
// 获取当前事件处理器的下一个sequence
long nextSequence = sequence.get() + 1L;
try {
while (true) {
try {
// 从ProcessingSequenceBarrier获取可用的availableSequence
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
// 下一个nextSequence比可用的availableSequence小的时候,获取事件,并触发事件处理
while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
// 消费者事件处理
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 设置当前事件处理器已经处理的sequence
sequence.set(availableSequence);
} catch (final TimeoutException e) {
// 超时处理
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
if (!running.get()) {
break;
}
} catch (final Throwable ex) {
// 异常事件处理
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
} finally {
// 关闭事件处理
notifyShutdown();
running.set(false);
}
}
拿到了数据后,消费者(Consumer)会更新自己的标识(cursor),消费者(Consumer)现在只需要通过简单通过 ProcessingSequenceBarrier 拿到可用的 Ringbuffer 中的 Sequence 序号就可以可以读取数据了。因为这些新的节点的确已经写入了数据(RingBuffer 本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问不用加锁。不仅代码实现起来可以更加安全和简单,而且不用加锁使得速度更快。另一个好处是可以用多个消费者(Consumer)去读同一个 RingBuffer,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在 Disruptor 的协调下实现真正的并发数据处理。
3、ProcessingSequenceBarrier 获取可用序号
在上面的 BatchEventProcessor 中的 run 方法中有如下调用
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
获取 RingBuffer 最大可访问的 availableSequence 序号,在上面的例子中是 10。
首先看下 ProcessingSequenceBarrier 的类图。
其实现了 SequenceBarrier 接口,用于和 RingBuffer 之间进行交互,下面主要看下构造函数和 waitFor 函数。
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException {
// 检查clert异常
checkAlert();
// 通过waitStrategy策略获取可用的序号,cursorSequence为当前的Sequence,dependentSequence为依赖的Sequence[]
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
// 产生比预期的sequence小,可能序号被重置回老的的oldSequence值
//可参考https://github.com/LMAX-Exchange/disruptor/issues/76
if (availableSequence <sequence) {
return availableSequence;
}
// 获取最大的可用的已经发布的sequence,可能比sequence小
// 会在多生产者中出现,当生产者1获取到序号13,生产者2获取到14;生产者1没发布,生产者2发布,会导致获取的可用序号为12,而sequence为13
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
4、WaitStrategy 策略
waitFor 函数的主要功能为获取到可用的 sequence 并返回给事件处理器。SequenceBarrier 内部有一个 WaitStrategy 方法来决定它如何等待这个序号,我现在不会去描述它的细节,代码的注释里已经概括了每一种 WaitStrategy 的优点和缺点,目前的实现方式主要有以下几种,后续会做详细介绍。
- BlockingWaitStrategy
- BusySpinWaitStrategy
- LiteBlockingWaitStrategy
- PhasedBackoffWaitStrategy
- SleepingWaitStrategy
- TimeoutBlockingWaitStrategy
- YieldingWaitStrategy