Disruptor(三)RingBuffer单生产者写入

上一章主要介绍了消费者从 RingBuffer 读取数据,本章主要介绍单个生产者如何向 RingBuffer 数据写入数据。在 RingBuffer 数据写入过程中如何不要让 Ring 重叠,写入后通知消费者,生产者一端的批处理,以及多个生产者如何协同工作。
在 RingBuffer 写入数据的过程涉及到两阶段提交(two-phasecommit)
1)生产者需要申请 buffer 里的下一个节点。
2)当生产者向节点写完数据,需要调用调用 publish 发布数据。

1、单个生产者 SingleProducerSequencer 数据写入

在后台由 ProducerSequencer 负责所有的交互细节,来从 RingBuffer 中找到下一个节点,然后才允许生产者向它写入数据。
image.png
在图中一个生产者写入 RingBuffer,SingleProducerSequencer 对象拥有所有正在访问 RingBuffer 的消费者 gatingSequences 列表(区别于队列需要追踪队列的头和尾,而且它们有时候会指向相同的位置),Disruptor 中由消费者负责通知它们处理到了哪个序列号,而不是 RingBuffer。
如果想确定我们没有让 RingBuffer 重叠,需要检查所有的消费者们都读到了哪里。在上图中有 2 个消费者,一个消费者顺利的读到了最大序号 13(用蓝色高亮),第二个消费者有点儿落后停在序号 6。因此消费者 2 在赶上消费者 1 之前要跑完整个 RingBuffer 一圈的距离。
现在生产者想要写入 RingBuffer 中序号 6 占据的节点,因为它是 RingBuffer 当前游标的下一个节点。但是 SingleProducerSequencer 明白现在不能写入,因为有一个消费者正在占用它。所以 SingleProducerSequencer 停下来自旋(spins),等待,直到那个消费者离开。

2、申请下一个节点

现在可以想像消费者 2 已经处理完了一批节点,并且向前移动了它的序号。可能它挪到了序号 9(因为消费端的批处理方式,现实中我会预计它到达 13)
image.png
上图显示了当消费者 2 挪动到序号 9 时发生的情况。SingleProducerSequencer 会看到下一个节点序号 6 那个已经可以用了。它会抢占这个节点上的 Entry(我还没有特别介绍 Entry 对象,基本上它是一个放写入到某个序号的 RingBuffer 数据的桶),把下一个序号(14)更新成 Entry 的序号,然后把 Entry 返回给生产者。生产者可以接着往 Entry 里写入数据。

3、提交新的数据

将生产的数据提交,通知消费之。
image.png
绿色表示最近写入的 Entry,序号是 14,通过 publish 方法提交,设置 RingBuffer 的 cursor 为 14,通知消费者 14 被更新了,可以读取了(不同的 WaitStrategy 实现以不同的方式来实现提醒,取决于它是否采用阻塞模式)。现在消费者 2 可以读 Entry14 的数据进行消费了。
看完上面的原理后下面分析 SingleProducerSequencer 是如何获取序号和提交数据的。**

4、SingleProducerSequencer 生产者类图

image.png
SingleProducerSequencer 继承 AbstractSequencer,实现了 Sequencer 接口。
Sequencer 提供增加删除消费者序列,创建 SequenceBarrier,获取最小序号,和最大发布的序号。
Cursored 获取当前的游标。
Sequenced 获取当前 ringbuffer 大小,获取想一个序号,以及提交数据接口。

5、消费者和生产者直接的关联

首先看下 AbstractSequencer 中定义

// 生产者的当前的游标位置
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 消费者当前处理的序号集合
protected volatile Sequence[] gatingSequences = new Sequence[0];

由于 volatile 只能保存可见性和禁止编译器优化,当时不能保证互斥性,多线程并发读写的话会有问题。

private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");

使用 AtomicReferenceFieldUpdater 原子字段更新解决多线程更新 gatingSequences 问题
具体实现参照 SequenceGroups 中使用 CAS 进行更新。

public final void addGatingSequences(Sequence... gatingSequences) {
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
public boolean removeGatingSequence(Sequence sequence) {
    return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}

6、生产者使用 next 获取下一个可用的序号

public long next(int n) {
    if (n <1) {
        throw new IllegalArgumentException("n must be > 0");
    }
    // 当前的最小序号(单个生产者为生产者的游标)
    long nextValue = this.nextValue;
    // 下一个序号
    long nextSequence = nextValue + n;
    // 重叠点位置
    long wrapPoint = nextSequence - bufferSize;
    // 缓存的消费者处理的序号
    long cachedGatingSequence = this.cachedValue;
    // wrapPoint > cachedGatingSequence,
    // 重叠位置大于缓存的消费者处理的序号,说明有消费者没有处理完成,不能够防止数据
    // cachedGatingSequence > nextValue
    // 只会在https://github.com/LMAX-Exchange/disruptor/issues/76情况下存在
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        long minSequence;
        // 等待不重叠后退出循环
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
            // 通知消费者处理事件
            waitStrategy.signalAllWhenBlocking();
            // 生产者等待的时候后自旋,后续需要使用策略
            LockSupport.parkNanos(1L);
        }
        // 缓存消费者和生产者的最小序号
        this.cachedValue = minSequence;
    }
    // 设置生产者下一个可用的的序号
    this.nextValue = nextSequence;
    return nextSequence;
}

7、生产者使用 publish 发布数据

public void publish(long sequence) { // 设置生产者的游标序号
    cursor.set(sequence);
    // 通知消费者处理事件
    waitStrategy.signalAllWhenBlocking();
}

当发布数据后,消费者 sequenceBarrier.waitFor(nextSequence)就能够获取 RingBuffer 最大可访问的 availableSequence 序号,处理数据了。

8、消费者消费数据

再回忆下 ProcessingSequenceBarrier 的 waitFor 函数,其中调用到了 sequencer.getHighestPublishedSequence(sequence, availableSequence);

public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException {
    // 检查clert异常
    checkAlert();
    // 通过waitStrategy策略获取可用的序号,cursorSequence为当前的Sequence,dependentSequence为依赖的Sequence[]
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    // 产生比预期的sequence小,可能序号被重置回老的的oldSequence值
    //可参考https://github.com/LMAX-Exchange/disruptor/issues/76
    if (availableSequence <sequence) {
        return availableSequence;
    }
    // 获取最大的可用的已经发布的sequence,可能比sequence小
    // 会在多生产者中出现,当生产者1获取到序号13,生产者2获取到14;生产者1没发布,生产者2发布,会导致获取的可用序号为12,而sequence为13
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
    return availableSequence;
}

在 SingleProducerSequencer 的 getHighestPublishedSequence 方法中直接返回可用的 availableSequence,通知消费者消费数据。通过以上步骤,生产者和消费者就协同起来了。

https://alicharles.oss-cn-hangzhou.aliyuncs.com/static/images/mp_qrcode.jpg
文章目录
  1. 1、单个生产者 SingleProducerSequencer 数据写入
  2. 2、申请下一个节点
  3. 3、提交新的数据
  4. 4、SingleProducerSequencer 生产者类图
  5. 5、消费者和生产者直接的关联
  6. 6、生产者使用 next 获取下一个可用的序号
  7. 7、生产者使用 publish 发布数据
  8. 8、消费者消费数据