Etcd Watch机制

watch 是 mvcc 包中的一个功能,之所以拿出来说,是因为它确实有很重的逻辑。watch 是监听一个或一组 key,key 的任何变化都会发出消息。某种意义上讲,这就是发布订阅模式。https://segmentfault.com/a/1190000021787055

对比

既然 Watch 机制就是发布订阅模式,我们通过对比 Kafka,来更深入了解 Watch。
首先说明结论:
ETCD 没有消费者组的概念,所以不能代替 Kafka
对比其他方面呢:

ETCD Kafka
消费方式 监听一个 Key 订阅一个 Topic
生产方式 Put(Key, Value) Produce(Topic, Message)
历史消息是否保留 保留 保留
能否从指定位置消费 可以从指定 Revision 消费 可以从指定 offset 消费
能否保证消息不重放 不能 消费者会主动上报 offset,kafka 会保存每个消费者的 offset,消费者重启会从当前进度消费

对比 Kafka 不是试图用 ETCD 代替 Kafka,是想通过对比了解 Watch 的特性和局限性

猜想

在讨论别人是怎么实现的时候,自己总要先猜想下。想的过程中就会发现难点在哪。
我的想法:

type watcher struct {
    key string // 要监听的key

    ch  chan struct{} // 通过ch将消息发出来
}

func loop() {
    for _, w := range []watchers {
        ch <- message
    }
}

解释下,我的想法中,每一个监听者都是一个 watcher,监听者会自己消费自己的 ch,实现消费功能。在服务端需要维护一个 loop,将消息不断的发送到每一个监听者的 ch 中。
我感觉大多数人的最直观想法应该就是这样。
这样做我实现了

  • 订阅发布功能

但我没有做到

  • 同时监听一个范围的 key(比如:我可以监听 key=foo,但不能监听 key=foo ~ fox。这是 ETCD 一个重要的功能)
  • 消费者消费速率不同(比如:按我的设想,有一个消费者出现阻塞,会导致 loop 阻塞)

有了这些想法之后,我们来看看 ETCD 中 Watch 是怎么实现的。

实现

在 MVCC 文章中提到,KV 接口的具体实现是 store 结构体。Watch 的实现是在 store 上封装了一层,叫做:watchableStore,重写了 store 的 Write 方法。
通过 MVCC 中介绍,store 的任何写操作,都需要 Write 方法返回的 TxnWrite。所以这里重写 Write 方法意味这任何写操作都会经过 watchableStore。

func (tw *watchableStoreTxnWrite) End() {
   changes := tw.Changes()

   evs := make([]mvccpb.Event, len(changes))
   for i, change := range changes {
      evs[i].Kv = &changes[i]
   }

   tw.s.notify(rev, evs)
   tw.TxnWrite.End()
}

type watchableStoreTxnWrite struct {
   TxnWrite
   s *watchableStore
}

func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {
   return &watchableStoreTxnWrite{s.store.Write(trace), s}
}

以上代码只列出了核心的逻辑,不难看出,watchableStoreTxnWrite 在事务提交时,先将本次变更 changes 打包成 Event,然后调用 notify 来将变更通知出去。最后真正提交事务 TxnWrite.End()
现在待推送的消息(Event)已经通过 notify 方法进入到了 Watch 机制中,我们看看这个消息是如何流转的。
首先需要介绍几个对象:

  • Event

事件。变更的消息是以 Event 的形式发送出去的,Event 包括 KeyValue,同时包括操作类型(Put、Delete 等)

  • watcher

watcher 监听一个或一组 key,如果有变更,watcher 将变更内容通过 chan 发送出去。

  • watcherGroup

顾名思义,一组 watcher。watcherGroup 管理多个 watcher,能够根据 key 快速找到监听该 key 的一个或多个 watcher。

  • watchableStore

继承自 store,在 store 基础上实现了 watch 功能。watchableStore 管理着两个 watcherGroup:synced、unsynced,和一个用于缓存的 victims。victims 是缓存当前未发出去的 Event。

  • watchStream

watchStream 是对 watchableStore 的封装。因为 watchableStore 继承自 store,所以他实现了很多方法,但这些方法并不都是用于 Watch 功能。所以 watchStream 对 watchableStore 再次封装,暴露出与 Watch 有关的方法。
在知道这 5 个对象之后,我们是如何使用 Watch 呢?

func testWatch() {
    s := newWatchableStore()

    w := s.NewWatchStream()

    w.Watch(start_key: foo, end_key: nil)

    w.Watch(start_key: bar, end_key: nil)

    for {
        consume := <- w.Chan()
    }
}

解释下,我们先创建了 watchableStore,这是 ETCD 启动后就创建了的。当我们要使用 Watch 功能时,我们创建了一个 watchStream(s.NewWatchStream)。创建出来的 w 可以监听多个 key:foo、bar。之后我们就可以消费 w.Chan()返回的 chan。foo 或 bar 的任何变化,都会通过这个 chan 发送给消费端 consume。
于是我们便得到下面这幅图:
image.png
可以看到 watchStream 实现了在一大堆 kv 的变化中,过滤出监听的 key,将 key 的变化输出。
紧接着,我们将这幅图补充完整:
image.png
这幅图是什么意思呢?
watchableStore 收到了所有 key 的变更后,将这些 key 交给 synced(watchGroup),synced 能够快速地从所有 key 中找到监听的 key。将这些 key 发送给对应的 watcher,这些 watcher 再通过 chan 将变更信息发送出去。
synced 是怎么快速找到符合条件的 key 呢?
ETCD 中使用了 map 和 adt(红黑树)来实现。
不单独使用 map 是因为 watch 可以监听一个范围的 key。如果只监听一个 key
watch(start_key: foo, end_key: nil)
我们可以这样存储
map[key]*watcher
这样可以根据 key 快速找到对应的 watcher,ETCD 也是这样做的。
但对于一组 key 呢?
watch(start_key: foo, end_key: fop)
这里我监听了从 foo->fop 之间的所有 key,理论上这些 key 的数目是无限的,所以无法再使用 map。
比如:key=fooac 也属于监听范围。
ETCD 用 adt 来存储这种 key。
image.png
adt 的实现这里不做介绍,只用知道 adt 能够根据 key=fooac 快速地找到所属范围 foo->fop。

adt 的原理推荐这篇文章:https://www.jianshu.com/p/e13...
adt 的 go 实现:go.etcd.io/etcd/pkg/ad

在找到 watcher 后,调用 watcher 的 send()方法,将变更的 Event 发送出去。
这就是上述图的意思,也就是正常的 Watch 流程。

各种场景

上图所述是正常流程,但是会有很多不正常的情况发生。
上图可以看到,消息都是通过一个 Chan 发送出去,但如果消费者消费速度慢,Chan 就容易堆积。Chan 的空间不可能无限大,那就必然会有满的时候,满了后该怎么办呢?
接下来就要讨论上图 unsynced、victims 的作用了。
Chan 什么时候会满呢?
image.png
代码中 Chan 的长度是 1024。不过这也是一个随机值,只是没有现在更好的选择。
一旦满了,会发生以下操作:

func (s *watchableStore) notify() {
    var victim watcherBatch
    ...
    w.minRev = rev + 1           // w是当前watcher
    if victim == nil {
       victim = make(watcherBatch)
    }
    w.victim = true              // w被标记为受损的
    victim[w] = eb               // eb是当前的变更消息EventBatch
    s.synced.delete(w)
    ...
    s.addVictim(victim)          // 将victim添加到s的victims中
}

watcher 会记录当前的 Revision,并将自身标记为受损的。此次的变更操作会被保存到 watchableStore 的 victims 中。同时该 watcher 会被从 synced 踢出。
假设此时有一个写操作:foo=f1。而正好 Chan 此时刚满,则监听 foo 的 watcher 将从 synced 中踢出,同时 foo=f1 被保存到 victims 中
image.png
接下来对 foo 的任何变更,该 watcher 都不会记录。那这些消息就都丢掉了吗?当然不是,watcher 变成受损状态时记录下了当时的 Revision,这个很重要。
这时要说到两个工作协程了:

// 我们在创建watchableStore时,会同时启动两个工作协程
go s.syncWatchersLoop()
go s.syncVictimsLoop()

顾名思义,第一个协程用于将 unsynced 的 watcher 同步为 synced。
第二个协程用于循环清除 watchableStore 中的 victims。
在上面的场景中,我们知道,队列满时,当时变更的 Event 被放入了 victims 中。这个协程就会试图清除这个 Event。怎么清除呢?协程会不断尝试让 watcher 发送这个 Event,一旦队列不满,watcher 将这个 Event 发出后。该 watcher 就被划入了 unsycned 中,同时不再是受损状态。
image.png
此时 syncWatchersLoop 协程就开始起作用。由于在受损状态下,这个 watcher 已经错过了很多消息。为了追回进度,协程会根据 watcher 保存的 Revision,找出受损之后所有的消息,将关于 foo 的消息全部给 watcher,当 watcher 将这些消息都发送出去后。watcher 就脱离了 unsynced,成为了 synced。
至此就解决了 Chan 满导致的问题。同时也阐明了 Watch 的设计实现。

https://www.jianshu.com/p/0c1c462c19d0

介绍

查询是后台领域经常使用的一种数据同步方式。但是在一些场景,需求方需要针对一些数据变化做出响应。虽然定期轮询也可以满足部分的需求,但在以下场景中就不太合适了。

  • 存活检测:为了检测低于%1 异常。轮询会一直耗费查询资源。
  • 实时响应:变化响应时间要尽可能小,但是轮询周期越小,消耗的资源也越多。

因此数据层往往在“增删改查”这 4 种基本接口之外还会提供一个 watch 接口用来实时推送数据变化事件。

Watch 机制设计

watch 机制是一个典型的 CS 架构,其中数据需求方作为 client,数据提供方作为 server。由 client 向 server 发起请求,server 端推送数据给 client。

版本号

带有 watch 机制的存储系统往往都是具有版本功能的。具备版本功能的系统可以实现以下两个功能:

  • 因为某些原因(崩溃、重启)client 可能错失部分历史事件。恢复之后 client 可以利用版本号重新接收这些事件。
  • client 可以在请求参数中附带版本号表示该版本号之前的历史数据已经接收。server 可以过滤掉过时事件只发送新的事件给 client。具体如下图所示。

image.png

连接层

连接层讨论的是 client 和 server 之间通信的协议。相较于单机程序之间的 IPC 通信,分布式系统网络通信的 IO 成本是非常大的。下面就连接层实现方式、性能和开发成本展开具体分析。

http 长轮询

http 长轮询是一种非常容易实现的 watch 手段,因此也是使用最广泛的。比如 etcd v2、consul 等都使用了 http 长轮询技术来 watch 事件。具体实现原理如下图。
image.png
http 长轮询的优点是实现简单、兼容性好,不需要额外开发客户端程序。但是这样的实现意味每接收一个 event 都需要至少走完一个 http 请求应答流程。这对于 watch key 非常多的系统,负荷是相当大的。假设某个后台系统需要 watch 10W 个 key,每个 http 轮询超时时间为 100s。计算下来即使在空闲的时候系统需要承受并发 10W 个连接和 1K/QSP 的请求量。

长连接

长连接模式是对 http 长轮询的一种优化。不同于 http 长轮询每个连接都只能处理一个事件,长连接模式一个连接可以接收多个事件,通过减少了 tcp 三次握手的开销,提高了资源利用率。
image.png
但相对而言长连接模式开发成本要比 http 长轮询高,主要体现在:

  • 需要定义事件流的序列化和反序列化协议。目前没有公认的标准,只能私有定制应用于内部系统。
  • 没有成熟的反向代理组件。需要考虑在大规模部署下的负载均衡问题。

长连接模式虽然减少了 tcp 握手的开销,但每个 watch key 都需要一个连接。假设某个后台系统需要 watch 10w 个 key,就需要建立 10W 个连接,这很容易消耗光 server 的 socket 和内存资源。

多路复用

多路复用是通信工程上的概念,指的是将多个低速信道整合到一个高速信道进行传输,从而有效地利用了高速信道。通过使用多路复用,可以避免维护多条线路,从而有效地节约运营成本。
网络工程上有很多方面借鉴了多路复用的思想。比如 L4 层的 TCP、UDP 就是复用了 L3 层的 IP 层的通道。同理我们也可以在 tcp 上层定义更高层协议来复用 tcp 连接。如下图,虽然端到端之间只有一个 tcp 连接。但在逻辑层上可以抽象出多个双工的 session stream。每个 session stream 负责一个 watch 任务。假设一个客户端需要 watch 1k 个 key,原先按照需要 1k 个连接,但现在只需要一个连接即可。
image.png
多路复用核心就是在 tcp 连接上构建一个逻辑层。该逻辑层负责处理一下内容:

  • 建立、关闭 stream 需要的控制帧。
  • 接收时将 tcp 连接里的数据流,拆分为一个个 frame,再按照协议封装到对于的 stream 中。
  • 发送时将 stream 里的数据流分割成 frame,再通过 tcp 层发送出去。
  • 为了防止 stream 之间相互干扰抢占带宽资源,需要设计流控机制公平调度。
  • 空闲 stream 保活通信,维持 session 会话心跳。
  • 封装好类似 socket 的 Read 和 Write 的接口,方便业务调用。

可以看出,多路复用技术是开发是比较复杂的。但值得庆幸的是,业内已经有了成熟可靠的工具和标准。HTTP/2 定义了多路复用的协议,grpc 实现多语言版本的接口。我们完全可以秉着拿来主义的思想直接来用,比如 etcd v3 版本就是使用 grpc stream 模式来处理 watch 的。

The etcd3 API multiplexes watches on a single connection. Instead of opening a new connection, a client registers a watcher on a bidirectional gRPC stream. The stream delivers events tagged with a watcher’s registered ID. Multiple watch streams can even share the same TCP connection. Multiplexing and stream connection sharing reduce etcd3’s memory footprint by at least an order of magnitude.

consul 的 watch 也采用了多路复用技术。它自己实现了一个多路复用库yamux,虽然没有 http/2 和 grpc 那么完备,但是还是可以供愿意自己练手的同学参考学习。

watch gateway

对于一般的场合,多路复用已经有很好的性能表现了。但是 etd v3.2 版本提出一个进一步提高 watch 性能的优化方案 watch gateway。
考虑到 k8s 使用场合可能存在有上万个 watcher。一旦事件触发 etcd 需要广播给所有的 watcher,就会带来相当大的性能消耗,甚至会影响的读写性能。但在实际场景,这些 watcher 很有可能监听的资源是重复的,比如每个 api-server 监听的资源都是一样的。为了优化这种大量重复监听 watcher 的场景,etcd v3.2 版本设计了 gateway 组件。gateway 可以聚合 watch 相同范围 key 的 watcher。举例说明如下图,client1、client2、client3 都对事件 a 感兴趣,如果直接请求 server,server 需要负担 3 个 watcher。但如果通过 gateway 聚合,可以合并 3 个 watch 变成 1 个,这样可以降低 server 的压力
image.png
本质上来说,gateway 只是将广播的压力从 server 转移到自己身上去了。但是 server 作为存储服务器,一般都是有状态的。无论是扩容还是迁移都是有一定成本的。但是 gateway 是一个无状态的服务,完全可以根据实际需求横向部署 gateway 服务器来降低存储层的压力。
下图是 etcd v3.2 使用 watch-gateway 性能提升的对比图。在不使用 gateway 时,随着 watcher 的增多,写和 watch 速率下降。但是使用了 gateway 之后,watch 数量增加对性能没有影响。详细文档见(https://coreos.com/blog/etcd-3.2-announcement)
image.png存储
watch 机制实现的另一个核心问题就是如何存储数据。按照存储类型来分,可以分为内存和硬盘里两大类。下面会根据具体场景来讨论这两种类型的数据格式的设计。

历史数据存储

watch 机制的特点决定了存储系统需要保存历史数据。举例说明,如下图一个数据同步场景。client 向 server watch 同步数据,历史同步数据已经达到 1G。某个时间点网络异常导致 client 和 server 之间通信中断,watch 被迫停止。在网络中断时间内,server 的数据发生了变更。当网络恢复的时候,client 重新发送 watch 请求,希望能够从 version=10001 继续获取事件。但此时 server 端的 version=10010,并且没有保存历史数据。客户端发现数据丢失,只好作废之前同步的数据,重新同步高达 1G 的数据,等追上之后再继续 watch。
image.png
保留历史数据可以简化客户端的工作,但是这也给存储方带来了极大的压力。

滑动事件窗口

内存型数据库一个缺点是容量相对有限。如果在数据更改频繁的情况下保留历史数据的话,有可能导致内存溢出。因此内存型数据库往往采用滑动事件窗口来作为妥协方案。
滑动事件窗口就是一个简单的回环数组。不断的插入新事件、淘汰掉超过大小的旧事件。因为窗口的大小是固定的,因此不会出现内存溢出。
如果 watch 的版本命中了滑动事件窗口里的事件版本,就可以返回给 client。
image.png
滑动事件窗口的缺点是显而易见的。对于修改频繁的系统,滑动事件窗口可以保存的事件时间非常短,很有可能丢失事件。这个是内存型存储系统的硬伤,没办法根本解决。目前 etcd v2、consul 和 k8s api server 都是采用这样的机制。

多版本存储

相较于受限容量的内存型数据库,磁盘数据库的空间就大很多了。有能力存储足够旧的历史版本数据。比如etcd v3就是存储了多个版本的数据。
简单来说,etcd v3 在内存里维护一个 B 树,存储的是 Key 和这个 Key 所有的版本列表。磁盘里维护了一个 B+树,存储的是版本和 KV 的实际内容。磁盘 B+树是实际的数据,内存 B 数一个二级索引。查找某个 Key 某个版本的数据可以分为以下两步:

  • 通过内存 B 数查找到 Key 对应的版本列表。再从版本列表中找到里查询版本参数最近的一个版本号 Version。
  • 再到磁盘 B+树中查找 Version 对应的数据信息。

image.png
因为需要将 Key 存储在内存中,etcd 的实际存储量也是非常有限的。按照 etcd 文档,默认存储是 2G、最大可配置到 8G。当然这个比内存型的 consul 容量还是大的多了。

事件触发

和其他的存储系统不一样的是,watch 存储系统需要在某个 key 变更的时候通知到 client。这就需要设计对应的触发响应机制。watcher 往往不仅仅监听单个的 key,还可能是监听某个前缀或是范围 key,只要其中之一有变化,就需要触发事件。
事件触发最简单的实现方式就是采用遍历的方法:当某个 Key 发生变化时,逐个遍历 watcher,一旦发现满足条件的 watcher 就发送数据。这种 O(n)复杂度的处理方式固然简单,但随着 watcher 数量的增多,带来的性能损失也是越来越大的。下面介绍两种应用于工程的数据结构。

radix 树

说到前缀匹配,很容易想到和前缀匹配相关的数据结构 radix 树。在计算机科学中,基数树,或称 Patricia trie/tree,或 crit bit tree,压缩前缀树,是一种更节省空间的 Trie(前缀树)。对于基数树的每个节点,如果该节点是唯一的子树的话,就和父节点合并。
image.png
radix.png
如图当前缀匹配 watch ro 的时候,可以通过 radix 树找到 om 节点。之后切割 o、m 节点并将 watcher 挂载在 o 节点上。如果 o 下面的节点有任何的变化,都会通过回调通知 watcher 触发事件。
consul 就是采用 radix 树来存储 KV 数据的。但是 radix 树只能解决前缀匹配的问题,无法解决范围 Key 的问题。因此 consul 是不支持范围 key watch 的。

区间树

radix 作为一个树的问题在于它太长了,需要大量使用间接指针。对于内存型存储结构还算好,但对于磁盘数据结构而言,多次间接查找是非常消耗性能的。目前 B+树还是最适合查找的磁盘数据结构。但 B+树没法反向查找某个 Key 是否在某个 watcher 范围内。为了解决这个问题,etcd v3 采用了区间树。
区间树是在红黑树基础上进行扩展得到的支持以区间为元素的动态集合的操作,其中每个节点的关键值是区间的左端点。通过建立这种特定的结构,可是使区间的元素的查找和插入都可以在 O(lgn)的时间内完成。
image.png
区间树.png
关于区间树原理本文不再赘述,感兴趣的同学可以查阅算法导论。简而言之,每个 watcher 将自己的监听范围[start,end]封装成一个节点插入区间树。当某个 Key 发生变化需要查找对应 watcher 的时候,就可以利用区间树快速查找到重叠的 watcher。

应用场景

之前说的是 watch 的实现机制。下面谈谈 watch 的应用场景。利用(list watch 机制)的方式解决以下场景的读性能瓶颈问题:

  • 读多写少
  • 可以接受最终一致性
  • 数据量不大,可以存储在内存中

服务发现

image.png
服务发现.png
服务发现场景恰好满足了上述的 3 个条件,因此非常适合采用 watch 同步机制来减缓服务发现服务器的读压力。每个客户端可以利用 list watch 缓存一份同步数据到本地,程序直接查询本地缓存,性能非常优异。

k8s api-server

image.png
k8s-server.png
k8s api 每个 server 利用 list watch 机制保留一份和 etcd 数据同步的缓存。当接收到查询请求时,直接读取缓存数据返回给客户端。对于新增、修改和删除请求直接透传给 etcd。
需要注意的是,在服务发现场景里客户端不会修改缓存数据,但 api-server 是可以修改数据的。一旦涉及数据修改,就会有数据一致性的问题。假设原先数据 a=1,之后客户端写入 a=2。写入成功后让客户端读取另外一个 server 的数据,有可能读取到 a=1(watch 有时间差)。这就产生了读写不一致的问题。
image.png
当然实际情况 k8s 是不会出现上述读写不一致的现象的。解决方法是 ResourceVersion 管理。k8s 里每个 Object 都有对应的 ResourceVersion,其实这个就是 etcd 的 revision,也就是 watch 的版本号。这个版本号是自增的,对于每个请求 k8s 都要求客户端在请求里带上特点的版本号。api-server 在收到客户端请求后会对比自身缓存里的版本信息,如果小于客户端的版本信息则需要阻塞等待新数据同步。只有缓存数据版本大于等于客户端请求的版本信息才可以返回数据给客户端。
image.png

总结

本文主要讨论了 watch 机制的具体实现和一些应用场景。虽然要实现一个简易的 watch 机制很容易,但随着业务发展,数据量和请求量逐步上升,就不得不就各个环节进行优化。虽然 etcd 和 consul 都是基于 raft 的 KV 数据库,但两者发展的方向已经越来越不相同。etcd 是伴随着 k8s 不断成长,在性能优化上一步步改进。consul 则是向着服务发现场景不断进步。当从 watch 机制实现上来看,consul 做的确不如 etcd 做的好,但在实际应用上,很难找到一个像 k8s 一样对性能要求如此严苛的场景。可以说 k8s 采用了 etcd,也是 etcd 的幸运。

参考

其他系统文章可参考
https://zhuanlan.zhihu.com/p/369782579

https://alicharles.oss-cn-hangzhou.aliyuncs.com/static/images/mp_qrcode.jpg
文章目录
  1. 对比
  2. 猜想
  3. 实现
    1. 各种场景
  • 介绍
  • Watch 机制设计
    1. 版本号
    2. 连接层
      1. http 长轮询
      2. 长连接
      3. 多路复用
      4. watch gateway
      5. 历史数据存储
        1. 滑动事件窗口
        2. 多版本存储
      6. 事件触发
        1. radix 树
        2. 区间树
  • 应用场景
    1. 服务发现
    2. k8s api-server
  • 总结
  • 参考