Disruptor(四)RingBuffer多生产者写入

上一章主要介绍了单个生产者如何向 RingBuffer 数据写入数据,如何不要让 Ring 重叠,写入后通知消费者,生产者一端的批处理,以及多个生产者如何协同工作,本章主要介绍多生产者向 RingBuffer 数据写入数据。

1、多生产者 MultiProducerSequencer 申请下一个节点

image.png
和单生产者不同的是在 next 方法中会直接通过 cursor.compareAndSet(current, next)设置生产者的游标 cursor 的 sequence。大家很可能会问设置了生产者的游标后,没有提交数据之前,多生产者场景中消费者是否就能够获取到数据,答案是否定的,在 MultiProducerSequencer 实现的 getHighestPublishedSequence 的方法和单生产者有所区别,后面会详细讲解。

2、多生产者 MultiProducerSequencer 提交数据

和单生产者的区别是使用 setAvailable 将数据设置成可用状态。
在多个生产者的场景下,还需要其他东西来追踪序号。这个序号是指当前可写入的序号。注意这和“向 RingBuffer 的游标加 1”不一样,如果你有一个以上的生产者同时在向 RingBuffer 写入,就有可能出现某些 Entry 正在被生产者写入但还没有提交的情况。
生产者 1 拿到序号 14,生产者 2 拿到序号 15。现在假设生产者 1 因为某些原因没有来得及提交数据。
生产者 2 通过 setAvailable(15)请求完成提交数据,如图所示。
image.png
当这个时候消费者通过 waitFor(14),返回的结果会为 13,不错任何事件处理。
当生产者 1 通过 setAvailable(14)请求完成提交数据,如图所示。
image.png
以 BatchEventProcessor 的 run 实现会处理 14 和 15 位置上的数据,在下一次通过 waitFor(16)获取可用的数据。

3、MutiProducerSequencer 生产者类图。

image.png
MutiProducerSequencer 继承 AbstractSequencer,实现了 Sequencer 接口。
Sequencer 提供增加删除消费者序列,创建 SequenceBarrier,获取最小序号,和最大发布的序号。
Cursored 获取当前的游标。
Sequenced 获取当前 ringbuffer 大小,获取想一个序号,以及提交数据接口。
消费者和生产者之间的关联和单生产者一样,不做重复介绍。

4、多生产者通过 next 获取下一个可用的序号

public long next(int n) {
    if (n <1) {
        throw new IllegalArgumentException("n must be > 0");
    }
    long current;
    long next;
    do {
        // ringbuffer当前生产者cursor
        current = cursor.get();
        // 下一个可用的序号
        next = current + n;
        // 重叠点位置
        long wrapPoint = next - bufferSize;
        // 缓存的消费者处理的序号
        long cachedGatingSequence = gatingSequenceCache.get();
        // wrapPoint > cachedGatingSequence,
        // 重叠位置大于缓存的消费者处理的序号,说明有消费者没有处理完成,不能够防止数据
        // cachedGatingSequence > nextValue
        // 只会在https://github.com/LMAX-Exchange/disruptor/issues/76情况下存在
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
            // 获取消费者和生产者最小的序号
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
            // 仍然重叠
            if (wrapPoint > gatingSequence) {
                // 通知消费者处理事件
                waitStrategy.signalAllWhenBlocking();
                // 生产者等待的时候后自旋,后续需要使用策略
                LockSupport.parkNanos(1);
                continue;
            }
            // 没有重叠的话,设置消费者缓存
            gatingSequenceCache.set(gatingSequence);
        }
        // 没有重叠,直接将RingBuffer的序号设置成next
        else if (cursor.compareAndSet(current, next)) {
            break;
        }
    }
    while (true);
    // 返回可用的序号
    return next;
}

5、多生产者通过 publish 提交数据

public void publish(final long sequence) {
    // 将sequence设置为可用状态
    setAvailable(sequence);
    // 通知消费者处理事件
    waitStrategy.signalAllWhenBlocking();
}

多生产者在获取序号 next 方法中就已经设置了 cusor,提交数据的时候是将该 sequence 设置成可用状态,才能够被消费者使用。

6、消费者消费数据

再回忆下 ProcessingSequenceBarrier 的 waitFor 函数,其中调用到了 sequencer.getHighestPublishedSequence(sequence,availableSequence);

public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException {
    // 检查clert异常
    checkAlert();
    // 通过waitStrategy策略获取可用的序号,cursorSequence为当前的Sequence,dependentSequence为依赖的Sequence[]
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    // 产生比预期的sequence小,可能序号被重置回老的的oldSequence值
    //可参考https://github.com/LMAX-Exchange/disruptor/issues/76
    if (availableSequence <sequence) {
        return availableSequence;
    }
    // 获取最大的可用的已经发布的sequence,可能比sequence小
    // 会在多生产者中出现,当生产者1获取到序号13,生产者2获取到14;生产者1没发布,生产者2发布,会导致获取的可用序号为12,而sequence为13
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

获取最大的可用的已经发布的 sequence

public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
    for (long sequence = lowerBound; sequence <= availableSequence; sequence++) {
        // 判断是否可用
        if (!isAvailable(sequence)) {
            return sequence - 1;
        }
    }
    return availableSequence;
}

其中判断 isAvailable 通过 availableBuffer 进行判断

public boolean isAvailable(long sequence) {
    // 计算((int) sequence) & indexMask的索引index
    int index = calculateIndex(sequence);
    // 计算(int) (sequence >>> indexShift) ringbuffer的slot的设置次数
    int flag = calculateAvailabilityFlag(sequence);
    // index在数组中的偏移量
    long bufferAddress = (index * SCALE) + BASE;
    // 如果和flag相等,说明可用
    return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}

内部使用的变量如下。

// availableBuffer跟踪每个ringbuffer的slot槽的状态,是否可用
private final int[] availableBuffer = new int[bufferSize]; // 初始值为-1
private final int indexMask = bufferSize - 1;
private final int indexShift = Util.log2(bufferSize);

通过以上方式就能够判断当前的 sequence 是否可用了。
通过在 MutiProducerSequencer 的 getHighestPublishedSequence 方法中直接返回可用的 availableSequence,通知消费者消费数据,生产者和消费者就协同起来了。

https://alicharles.oss-cn-hangzhou.aliyuncs.com/static/images/mp_qrcode.jpg
文章目录
  1. 1、多生产者 MultiProducerSequencer 申请下一个节点
  2. 2、多生产者 MultiProducerSequencer 提交数据
    1. 3、MutiProducerSequencer 生产者类图。
      1. 4、多生产者通过 next 获取下一个可用的序号
  3. 5、多生产者通过 publish 提交数据
  4. 6、消费者消费数据