一、最基础的队列
最基础的消息队列其实就是一个双端队列,我们可以用双向链表来实现,如下图所示:
- push_front:添加元素到队首;
- pop_tail:从队尾取出元素。
有了这样的数据结构之后,我们就可以在内存中构建一个消息队列,一些任务不停地往队列里添加消息,同时另一些任务不断地从队尾有序地取出这些消息。添加消息的任务我们称为 producer,而取出并使用消息的任务,我们称之为 consumer。
要实现这样的内存消息队列并不难,甚至可以说很容易。但是如果要让它能在应对海量的并发读写时保持高效,还是需要下很多功夫的。
#
二、Redis 的队列
redis 刚好提供了上述的数据结构 ——list。redis list 支持:
- lpush:从队列左边插入数据;
- rpop:从队列右边取出数据。
这正好对应了我们队列抽象的 push_front 和 pop_tail,因此我们可以直接把 redis 的 list 当成一个消息队列来使用。而且 redis 本身对高并发做了很好的优化,内部数据结构经过了精心地设计和优化。所以从某种意义上讲,用 redis 的 list 大概率比你自己重新实现一个 list 强很多。
但另一方面,使用 redis list 作为消息队列也有一些不足,比如:
- 消息持久化:redis 是内存数据库,虽然有 aof 和 rdb 两种机制进行持久化,但这只是辅助手段,这两种手段都是不可靠的。当 redis 服务器宕机时一定会丢失一部分数据,这对于很多业务都是没法接受的。
- 热 key 性能问题:不论是用 codis 还是 twemproxy 这种集群方案,对某个队列的读写请求最终都会落到同一台 redis 实例上,并且无法通过扩容来解决问题。如果对某个 list 的并发读写非常高,就产生了无法解决的热 key,严重可能导致系统崩溃。
- 没有确认机制:每当执行 rpop 消费一条数据,那条消息就被从 list 中永久删除了。如果消费者消费失败,这条消息也没法找回了。你可能说消费者可以在失败时把这条消息重新投递到进队列,但这太理想了,极端一点万一消费者进程直接崩了呢,比如被 kill -9,panic,coredump…
- 不支持多订阅者:一条消息只能被一个消费者消费,rpop 之后就没了。如果队列中存储的是应用的日志,对于同一条消息,监控系统需要消费它来进行可能的报警,BI 系统需要消费它来绘制报表,链路追踪需要消费它来绘制调用关系…… 这种场景 redis list 就没办法支持了。
- 不支持二次消费:一条消息 rpop 之后就没了。如果消费者程序运行到一半发现代码有 bug,修复之后想从头再消费一次就不行了。
对于上述的不足,目前看来第一条(持久化)是可以解决的。很多公司都有团队基于 rocksdb leveldb 进行二次开发,实现了支持 redis 协议的 kv 存储。这些存储已经不是 redis 了,但是用起来和 redis 几乎一样。它们能够保证数据的持久化,但对于上述的其他缺陷也无能为力了。
其实 redis 5.0 开始新增了一个 stream 数据类型,它是专门设计成为消息队列的数据结构,借鉴了很多 kafka 的设计,但是依然还有很多问题… 直接进入到 kafka 的世界它不香吗?
#
三、Kafka
从上面你可以看到,一个真正的消息中间件不仅仅是一个队列那么简单。尤其是当它承载了公司大量业务的时候,它的功能完备性、吞吐量、稳定性、扩展性都有非常苛刻的要求。kafka 应运而生,它是专门设计用来做消息中间件的系统。
前面说 redis list 的不足时,虽然有很多不足,但是如果你仔细思考,其实可以归纳为两点:
- 热 key 的问题无法解决,即:无法通过加机器解决性能问题;
- 数据会被删除:rpop 之后就没了,因此无法满足多个订阅者,无法重新从头再消费,无法做 ack。
这两点也是 kafka 要解决的核心问题。
热 key 的本质问题是数据都集中在一台实例上,所以想办法把它分散到多个机器上就好了。为此,kafka 提出了 partition 的概念。一个队列(redis 中的 list),对应到 kafka 里叫 topic。kafka 把一个 topic 拆成了多个 partition,每个 partition 可以分散到不同的机器上,这样就可以把单机的压力分散到多台机器上。因此 topic 在 kafka 中更多是一个逻辑上的概念,实际存储单元都是 partition。
其实 redis 的 list 也能实现这种效果,不过这需要在业务代码中增加额外的逻辑。比如可以建立 n 个 list,key1, key2, ..., keyn,客户端每次往不同的 key 里 push,消费端也可以同时从 key1 到 keyn 这 n 个 list 中 rpop 消费数据,这就能达到 kafka 多 partition 的效果。所以你可以看到,partition 就是一个非常朴素的概念,用来把请求分散到多台机器。
redis list 中另一个大问题是 rpop 会删除数据,所以 kafka 的解决办法也很简单,不删就行了嘛。kafka 用游标(cursor)解决这个问题。
和 redis list 不同的是,首先 kafka 的 topic(实际上是 partion)是用的单向队列来存储数据的,新数据每次直接追加到队尾。同时它维护了一个游标 cursor,从头开始,每次指向即将被消费的数据的下标。每消费一条,cursor+1 。通过这种方式,kafka 也能和 redis list 一样实现先入先出的语义,但是 kafka 每次只需要更新游标,并不会去删数据。
这样设计的好处太多了,尤其是性能方面,顺序写一直是最大化利用磁盘带宽的不二法门。但我们主要讲讲游标这种设计带来功能上的优势。
首先可以支持消息的 ACK 机制了。由于消息不会被删除,因此可以等消费者明确告知 kafka 这条消息消费成功以后,再去更新游标。这样的话,只要 kafka 持久化存储了游标的位置,即使消费失败进程崩溃,等它恢复时依然可以重新消费
第二是可以支持分组消费:
这里需要引入一个消费组的概念,这个概念非常简单,因为消费组本质上就是一组游标。对于同一个 topic,不同的消费组有各自的游标。监控组的游标指向第二条,BI 组的游标指向第 4 条,trace 组指向到了第 10000 条…… 各消费者游标彼此隔离,互不影响。
通过引入消费组的概念,就可以非常容易地支持多业务方同时消费一个 topic,也就是说所谓的 1-N 的 “广播”,一条消息广播给 N 个订阅方。
最后,通过游标也很容易实现重新消费。因为游标仅仅就是记录当前消费到哪一条数据了,要重新消费的话直接修改游标的值就可以了。你可以把游标重置为任何你想要指定的位置,比如重置到 0 重新开始消费,也可以直接重置到最后,相当于忽略现有所有数据。
因此你可以看到,kafka 这种数据结构相比于 redis 的双向链表有了一个质的飞跃,不仅是性能上,同时也是功能上,全面的领先。
我们可以来看看 kafka 的一个简单的架构图:
从这个图里我们可以看出,topic 是一个逻辑上的概念,不是一个实体。一个 topic 包含多个 partition,partition 分布在多台机器上。这个机器,kafka 中称之为 broker。(kafka 集群中的一个 broker 对应 redis 集群中的一个实例)。对于一个 topic,可以有多个不同的消费组同时进行消费。一个消费组内部可以有多个消费者实例同时进行消费,这样可以提高消费速率。
但是这里需要非常注意的是,一个 partition 只能被消费组中的一个消费者实例来消费。换句话说,消费组中如果有多个消费者,不能够存在两个消费者同时消费一个 partition 的场景。
为什么呢?其实 kafka 要在 partition 级别提供顺序消费的语义,如果多个 consumer 消费一个 partition,即使 kafka 本身是按顺序分发数据的,但是由于网络延迟等各种情况,consumer 并不能保证按 kafka 的分发顺序接收到数据,这样达到消费者的消息顺序就是无法保证的。因此一个 partition 只能被一个 consumer 消费。kafka 各 consumer group 的游标可以表示成类似这样的数据结构:
{ | |
"topic-foo": { | |
"groupA": { | |
"partition-0": 0, | |
"partition-1": 123, | |
"partition-2": 78 | |
}, | |
"groupB": { | |
"partition-0": 85, | |
"partition-1": 9991, | |
"partition-2": 772 | |
}, | |
} | |
} |
了解了 kafka 的宏观架构,你可能会有个疑惑,kafka 的消费如果只是移动游标并不删除数据,那么随着时间的推移数据肯定会把磁盘打满,这个问题该如何解决呢?这就涉及到 kafka 的 retention 机制,也就是消息过期,类似于 redis 中的 expire。
不同的是,redis 是按 key 来过期的,如果你给 redis list 设置了 1 分钟有效期,1 分钟之后 redis 直接把整个 list 删除了。而 kafka 的过期是针对消息的,不会删除整个 topic (partition),只会删除 partition 中过期的消息。不过好在 kafka 的 partition 是单向的队列,因此队列中消息的生产时间都是有序的。因此每次过期删除消息时,从头开始删就行了。
看起来似乎很简单,但仔细想一下还是有不少问题。举例来说,假如 topicA-partition-0 的所有消息被写入到一个文件中,比如就叫 topicA-partition-0.log。我们再把问题简化一下,假如生产者生产的消息在 topicA-partition-0.log 中一条消息占一行,很快这个文件就到 200G 了。现在告诉你,这个文件前 x 行失效了,你应该怎么删除呢?非常难办,这和让你删除一个数组中的前 n 个元素一样,需要把后续的元素向前移动,这涉及到大量的 CPU copy 操作。假如这个文件有 10M,这个删除操作的代价都非常大,更别说 200G 了。
因此,kafka 在实际存储 partition 时又进行了一个拆分。topicA-partition-0 的数据并不是写到一个文件里,而是写到多个 segment 文件里。假如设置的一个 segment 文件大小上限是 100M,当写满 100M 时就会创建新的 segment 文件,后续的消息就写到新创建的 segment 文件,就像我们业务系统的日志文件切割一样。这样做的好处是,当 segment 中所有消息都过期时,可以很容易地直接删除整个文件。而由于 segment 中消息是有序的,看是否都过期就看最后一条是否过期就行了。
# 1. Kafka 中的数据查找
topic 的一个 partition 是一个逻辑上的数组,由多个 segment 组成,如下图所示:
这时候就有一个问题,如果我把游标重置到一个任意位置,比如第 2897 条消息,我怎么读取数据呢?
根据上面的文件组织结构,你可以发现我们需要确定两件事才能读出对应的数据:
- 第 2897 条消息在哪个 segment 文件里;
- 第 2897 条消息在 segment 文件里的什么位置。
为了解决上面两个问题,kafka 有一个非常巧妙的设计。首先,segment 文件的文件名是以该文件里第一条消息的 offset 来命名的。一开始的 segment 文件名是 0.log,然后一直写直到写了 18234 条消息后,发现达到了设置的文件大小上限 100M,然后就创建一个新的 segment 文件,名字是 18234.log……
- /kafka/topic/order_create/partition-0 | |
- 0.log | |
- 18234.log #segment file | |
- 39712.log | |
- 54101.log |
当我们要找 offset 为 x 的消息在哪个 segment 时,只需要通过文件名做一次二分查找就行了。比如 offset 为 2879 的消息(第 2880 条消息),显然就在 0.log 这个 segment 文件里。
定位到 segment 文件之后,另一个问题就是要找到该消息在文件中的位置,也就是偏移量。如果从头开始一条条地找,这个耗时肯定是无法接受的!kafka 的解决办法就是索引文件。
就如 mysql 的索引一样,kafka 为每个 segment 文件创建了一个对应的索引文件。索引文件很简单,每条记录就是一个 kv 组,key 是消息的 offset,value 是该消息在 segment 文件中的偏移量:
offset | position |
---|---|
0 | 0 |
1 | 124 |
2 | 336 |
每个 segment 文件对应一个索引文件:
- /kafka/topic/order_create/partition-0 | |
- 0.log | |
- 0.index | |
- 18234.log #segment file | |
- 18234.index #index file | |
- 39712.log | |
- 39712.index | |
- 54101.log | |
- 54101.index |
有了索引文件,我们就可以拿到某条消息具体的位置,从而直接进行读取。再捋一遍这个流程:
- 当要查询 offset 为 x 的消息
- 利用二分查找找到这条消息在 y.log
- 读取 y.index 文件找到消息 x 的 y.log 中的位置
- 读取 y.log 的对应位置,获取数据
通过这种文件组织形式,我们可以在 kafka 中非常快速地读取出任何一条消息。但这又引出了另一个问题,如果消息量特别大,每条消息都在 index 文件中加一条记录,这将浪费很多空间。
可以简单地计算一下,假如 index 中一条记录 16 个字节(offset 8 + position 8),一亿条消息就是 16*10^8 字节 = 1.6G。对于一个稍微大一点的公司,kafka 用来收集日志的话,一天的量远远不止 1 亿条,可能是数十倍上百倍。这样的话,index 文件就会占用大量的存储。因此,权衡之下 kafka 选择了使用” 稀疏索引 “。
所谓稀疏索引就是并非所有消息都会在 index 文件中记录它的 position,每间隔多少条消息记录一条,比如每间隔 10 条消息记录一条 offset-position:
offset | position |
---|---|
0 | 0 |
10 | 1852 |
20 | 4518 |
30 | 6006 |
40 | 8756 |
50 | 10844 |
这样的话,如果当要查询 offset 为 x 的消息,我们可能没办法查到它的精确位置,但是可以利用二分查找,快速地确定离他最近的那条消息的位置,然后往后多读几条数据就可以读到我们想要的消息了。
比如,当我们要查到 offset 为 33 的消息,按照上表,我们可以利用二分查找定位到 offset 为 30 的消息所在的位置,然后去对应的 log 文件中从该位置开始向后读取 3 条消息,第四条就是我们要找的 33。这种方式其实就是在性能和存储空间上的一个折中,很多系统设计时都会面临类似的选择,牺牲时间换空间还是牺牲空间换时间。
到这里,我们对 kafka 的整体架构应该有了一个比较清晰的认识了。不过在上面的分析中,我故意隐去了 kafka 中另一个非常非常重要的点,就是高可用方面的设计。因为这部分内容比较晦涩,会引入很多分布式理论的复杂性,妨碍我们理解 kafka 的基本模型。在接下来的部分,将着重讨论这个主题。
# 2. Kafka 高可用
高可用(HA)对于企业的核心系统来说是至关重要的。因为随着业务的发展,集群规模会不断增大,而大规模集群中总会出现故障,硬件、网络都是不稳定的。当系统中某些节点各种原因无法正常使用时,整个系统可以容忍这个故障,继续正常对外提供服务,这就是所谓的高可用性。对于有状态服务来说,容忍局部故障本质上就是容忍丢数据(不一定是永久,但是至少一段时间内读不到数据)。
系统要容忍丢数据,最朴素也是唯一的办法就是做备份,让同一份数据复制到多台机器,所谓的冗余,或者说多副本。为此,kafka 引入 leader-follower 的概念。topic 的每个 partition 都有一个 leader,所有对这个 partition 的读写都在该 partition leader 所在的 broker 上进行。partition 的数据会被复制到其它 broker 上,这些 broker 上对应的 partition 就是 follower:
producer 在生产消息时,会直接把消息发送到 partition leader 上,partition leader 把消息写入自己的 log 中,然后等待 follower 来拉取数据进行同步。具体交互如下:
上图中对 producer 进行 ack 的时机非常关键,这直接关系到 kafka 集群的可用性和可靠性。
如果 producer 的数据到达 leader 并成功写入 leader 的 log 就进行 ack
优点:不用等数据同步完成,速度快,吞吐率高,可用性高;
缺点:如果 follower 数据同步未完成时 leader 挂了,就会造成数据丢失,可靠性低。
如果等 follower 都同步完数据时进行 ack
优点:当 leader 挂了之后 follower 中也有完备的数据,可靠性高;
缺点:等所有 follower 同步完成很慢,性能差,容易造成生产方超时,可用性低。
而具体什么时候进行 ack,对于 kafka 来说是可以根据实际应用场景配置的。
其实 kafka 真正的数据同步过程还是非常复杂的,本文主要是想讲一讲 kafka 的一些核心原理,数据同步里面涉及到的很多技术细节,HW epoch 等,就不在此一一展开了。最后展示一下 kafka 的一个全景图:
最后对 kafka 进行一个简要地总结:kafka 通过引入 partition 的概念,让 topic 能够分散到多台 broker 上,提高吞吐率。但是引入多 partition 的代价就是无法保证 topic 维度的全局顺序性,需要这种特性的场景只能使用单个 partition。在内部,每个 partition 以多个 segment 文件的方式进行存储,新来的消息 append 到最新的 segment log 文件中,并使用稀疏索引记录消息在 log 文件中的位置,方便快速读取消息。当数据过期时,直接删除过期的 segment 文件即可。为了实现高可用,每个 partition 都有多个副本,其中一个是 leader,其它是 follower,分布在不同的 broker 上。对 partition 的读写都在 leader 所在的 broker 上完成,follower 只会定时地拉取 leader 的数据进行同步。当 leader 挂了,系统会选出和 leader 保持同步的 follower 作为新的 leader,继续对外提供服务,大大提高可用性。在消费端,kafka 引入了消费组的概念,每个消费组都可以互相独立地消费 topic,但一个 partition 只能被消费组中的唯一一个消费者消费。消费组通过记录游标,可以实现 ACK 机制、重复消费等多种特性。除了真正的消息记录在 segment 中,其它几乎所有 meta 信息都保存在全局的 zookeeper 中。
# 3. 优缺点
(1)优点:kafka 的优点非常多
- 高性能:单机测试能达到 100w tps;
- 低延时:生产和消费的延时都很低,e2e 的延时在正常的 cluster 中也很低;
- 可用性高:replicate + isr + 选举 机制保证;
- 工具链成熟:监控 运维 管理 方案齐全;
- 生态成熟:大数据场景必不可少 kafka stream.
(2)不足
- 无法弹性扩容:对 partition 的读写都在 partition leader 所在的 broker,如果该 broker 压力过大,也无法通过新增 broker 来解决问题;
- 扩容成本高:集群中新增的 broker 只会处理新 topic,如果要分担老 topic-partition 的压力,需要手动迁移 partition,这时会占用大量集群带宽;
- 消费者新加入和退出会造成整个消费组 rebalance:导致数据重复消费,影响消费速度,增加 e2e 延迟;
- partition 过多会使得性能显著下降:ZK 压力大,broker 上 partition 过多让磁盘顺序写几乎退化成随机写。
在了解了 kafka 的架构之后,你可以仔细想一想,为什么 kafka 扩容这么费劲呢?其实这本质上和 redis 集群扩容是一样的!当 redis 集群出现热 key 时,某个实例扛不住了,你通过加机器并不能解决什么问题,因为那个热 key 还是在之前的某个实例中,新扩容的实例起不到分流的作用。kafka 类似,它扩容有两种:新加机器(加 broker)以及给 topic 增加 partition。给 topic 新加 partition 这个操作,你可以联想一下 mysql 的分表。比如用户订单表,由于量太大把它按用户 id 拆分成 1024 个子表 user_order_{0..1023},如果到后期发现还不够用,要增加这个分表数,就会比较麻烦。因为分表总数增多,会让 user_id 的 hash 值发生变化,从而导致老的数据无法查询。所以只能停服做数据迁移,然后再重新上线。kafka 给 topic 新增 partition 一样的道理,比如在某些场景下 msg 包含 key,那 producer 就要保证相同的 key 放到相同的 partition。但是如果 partition 总量增加了,根据 key 去进行 hash,比如 hash (key) % parition_num,得到的结果就不同,就无法保证相同的 key 存到同一个 partition。当然也可以在 producer 上实现一个自定义的 partitioner,保证不论怎么扩 partition 相同的 key 都落到相同的 partition 上,但是这又会使得新增加的 partition 没有任何数据。
其实你可以发现一个问题,kafka 的核心复杂度几乎都在存储这一块。数据如何分片,如何高效的存储,如何高效地读取,如何保证一致性,如何从错误中恢复,如何扩容再平衡……
上面这些不足总结起来就是一个词:scalebility。通过直接加机器就能解决问题的系统才是大家的终极追求。Pulsar 号称云原生时代的分布式消息和流平台,所以接下来我们看看 pulsar 是怎么样的。
#
四、Pulsar
kafka 的核心复杂度是它的存储,高性能、高可用、低延迟、支持快速扩容的分布式存储不仅仅是 kafka 的需求,应该是现代所有系统共同的追求。而 apache 项目底下刚好有一个专门就是为日志存储打造的这样的系统,它叫 bookeeper!
有了专门的存储组件,那么实现一个消息系统剩下的就是如何来使用这个存储系统来实现 feature 了。pulsar 就是这样一个” 计算 - 存储 分离 “的消息系统:
pulsar 利用 bookeeper 作为存储服务,剩下的是计算层。这其实是目前非常流行的架构也是一种趋势,很多新型的存储都是这种” 存算分离 “的架构。比如 tidb,底层存储其实是 tikv 这种 kv 存储。tidb 是更上层的计算层,自己实现 sql 相关的功能。还有的例子就是很多 "持久化"redis 产品,大部分底层依赖于 rocksdb 做 kv 存储,然后基于 kv 存储关系实现 redis 的各种数据结构。
在 pulsar 中,broker 的含义和 kafka 中的 broker 是一致的,就是一个运行的 pulsar 实例。但是和 kafka 不同的是,pulsar 的 broker 是无状态服务,它只是一个”API 接口层 “,负责处理海量的用户请求,当用户消息到来时负责调用 bookeeper 的接口写数据,当用户要查询消息时从 bookeeper 中查数据,当然这个过程中 broker 本身也会做很多缓存之类的。同时 broker 也依赖于 zookeeper 来保存很多元数据的关系。
由于 broker 本身是无状态的,因此这一层可以非常非常容易地进行扩容,尤其是在 k8s 环境下,点下鼠标的事儿。至于消息的持久化,高可用,容错,存储的扩容,这些都通通交给 bookeeper 来解决。
但就像能量守恒定律一样,系统的复杂性也是守恒的。实现既高性能又可靠的存储需要的技术复杂性,不会凭空消失,只会从一个地方转移到另一个地方。就像你写业务逻辑,产品经理提出了 20 个不同的业务场景,就至少对应 20 个 if else,不论你用什么设计模式和架构,这些 if else 不会被消除,只会从从一个文件放到另一个文件,从一个对象放到另一个对象而已。所以那些复杂性一定会出现在 bookeeper 中,并且会比 kafka 的存储实现更为复杂。
但是 pulsar 存算分离架构的一个好处就是,当我们在学习 pulsar 时可以有一个比较明确的界限,所谓的 concern segregation。只要理解 bookeeper 对上层的 broker 提供的 API 语义,即使不了解 bookeeper 内部的实现,也能很好的理解 pulsar 的原理。
接下来你可以思考一个问题:既然 pulsar 的 broker 层是无状态的服务,那么我们是否可以随意在某个 broker 进行对某个 topic 的数据生产呢?
看起来似乎没什么问题,但答案还是否定的 —— 不可以。为什么呢?想一想,假如生产者可以在任意一台 broker 上对 topic 进行生产,比如生产 3 条消息 a b c,三条生产消息的请求分别发送到 broker A B C,那最终怎么保证消息按照 a b c 的顺序写入 bookeeper 呢?这是没办法保证,只有让 a b c 三条消息都发送到同一台 broker,才能保证消息写入的顺序。
既然如此,那似乎又回到和 kafka 一样的问题,如果某个 topic 写入量特别特别大,一个 broker 扛不住怎么办?所以 pulsar 和 kafka 一样,也有 partition 的概念。一个 topic 可以分成多个 partition,为了每个 partition 内部消息的顺序一致,对每个 partition 的生产必须对应同一台 broker。
这里看起来似乎和 kafka 没区别,也是每个 partition 对应一个 broker,但是其实差别很大。为了保证对 partition 的顺序写入,不论 kafka 还是 pulsar 都要求写入请求发送到 partition 对应的 broker 上,由该 broker 来保证写入的顺序性。然而区别在于,kafka 同时会把消息存储到该 broker 上,而 pulsar 是存储到 bookeeper 上。这样的好处是,当 pulsar 的某台 broker 挂了,可以立刻把 partition 对应的 broker 切换到另一个 broker,只要保证全局只有一个 broker 对 topic-partition-x 有写权限就行了,本质上只是做一个所有权转移而已,不会有任何数据的搬迁。
当对 partition 的写请求到达对应 broker 时,broker 就需要调用 bookeeper 提供的接口进行消息存储。和 kafka 一样,pulsar 在这里也有 segment 的概念,而且和 kafka 一样的是,pulsar 也是以 segment 为单位进行存储的(respect respect respect)。
为了说清楚这里,就不得不引入一个 bookeeper 的概念,叫 ledger,也就是账本。可以把 ledger 类比为文件系统上的一个文件,比如在 kafka 中就是写入到 xxx.log 这个文件里。pulsar 以 segment 为单位,存入 bookeeper 中的 ledger。
在 bookeeper 集群中每个节点叫 bookie(为什么集群的实例在 kafka 叫 broker 在 bookeeper 又叫 bookie…… 无所谓,名字而已,作者写了那么多代码,还不能让人开心地命个名啊)。在实例化一个 bookeeper 的 writer 时,就需要提供 3 个参数:
- 节点数 n:bookeeper 集群的 bookie 数;
- 副本数 m:某一个 ledger 会写入到 n 个 bookie 中的 m 个里,也就是说所谓的 m 副本;
- 确认写入数 t:每次向 ledger 写入数据时(并发写入到 m 个 bookie),需要确保收到 t 个 acks,才返回成功。
bookeeper 会根据这三个参数来为我们做复杂的数据同步,所以我们不用担心那些副本啊一致性啊的东西,直接调 bookeeper 的提供的 append 接口就行了,剩下的交给它来完成。
如上图所示,parition 被分为了多个 segment,每个 segment 会写入到 4 个 bookie 其中的 3 个中。比如 segment1 就写入到了 bookie1,2,4 中,segment2 写入到 bookie1,3,4 中…
这其实就相当于把 kafka 某个 partition 的 segment 均匀分布到了多台存储节点上。这样的好处是什么呢?在 kafka 中某个 partition 是一直往同一个 broker 的文件系统中进行写入,当磁盘不够用了,就需要做非常麻烦的扩容 + 迁移数据的操作。而对于 pulsar,由于 partition 中不同 segment 可以保存在 bookeeper 不同的 bookies 上,当大量写入导致现有集群 bookie 磁盘不够用时,我们可以快速地添加机器解决问题,让新的 segment 寻找最合适的 bookie(磁盘空间剩余最多或者负载最低等)进行写入,只要记住 segment 和 bookies 的关系就好了。
由于 partition 以 segment 为粒度均匀的分散到 bookeeper 上的节点上,这使得存储的扩容变得非常非常容易。这也是 Pulsar 一直宣称的存算分离架构的先进性的体现:
- broker 是无状态的,随便扩容;
- partition 以 segment 为单位分散到整个 bookeeper 集群,没有单点,也可以轻易地扩容;
- 当某个 bookie 发生故障,由于多副本的存在,可以另外 t-1 个副本中随意选出一个来读取数据,不间断地对外提供服务,实现高可用。
其实在理解 kafka 的架构之后再来看 pulsar,你会发现 pulsar 的核心就在于 bookeeper 的使用以及一些 metadata 的存储。但是换个角度,正是这个恰当的存储和计算分离的架构,帮助我们分离了关注点,从而能够快速地去学习上手。
# 消费模型
Pulsar 相比于 kafka 另一个比较先进的设计就是对消费模型的抽象,叫做 subscription。通过这层抽象,可以支持用户各种各样的消费场景。还是和 kafka 进行对比,kafka 中只有一种消费模式,即一个或多个 partition 对一个 consumer。如果想要让一个 partition 对多个 consumer,就无法实现了。pulsar 通过 subscription,目前支持 4 种消费方式:
可以把 pulsar 的 subscription 看成 kafka 的 consumer group,但 subscription 更进一步,可以设置这个”consumer group“的消费类型:
- exclusive:消费组里有且仅有一个 consumer 能够进行消费,其它的根本连不上 pulsar;
- failover:消费组里的每个消费者都能连上每个 partition 所在的 broker,但有且仅有一个 consumer 能消费到数据。当这个消费者崩溃了,其它的消费者会被选出一个来接班;
- shared:消费组里所有消费者都能消费 topic 中的所有 partition,消息以 round-robin 的方式来分发;
- key-shared:消费组里所有消费者都能消费到 topic 中所有 partition,但是带有相同 key 的消息会保证发送给同一个消费者。
这些消费模型可以满足多种业务场景,用户可以根据实际情况进行选择。通过这层抽象,pulsar 既支持了 queue 消费模型,也支持了 stream 消费模型,还可以支持其它无数的消费模型(只要有人提 pr),这就是 pulsar 所说的统一了消费模型。
其实在消费模型抽象的底下,就是不同的 cursor 管理逻辑。怎么 ack,游标怎么移动,怎么快速查找下一条需要重试的 msg…… 这都是一些技术细节,但是通过这层抽象,可以把这些细节进行隐藏,让大家更关注于应用。
#
五、存算分离架构
其实技术的发展都是螺旋式的,很多时候你会发现最新的发展方向又回到了 20 年前的技术路线了。
在 20 年前,由于普通计算机硬件设备的局限性,对大量数据的存储是通过 NAS (Network Attached Storage) 这样的 “云端” 集中式存储来完成。但这种方式的局限性也很多,不仅需要专用硬件设备,而且最大的问题就是难以扩容来适应海量数据的存储。
数据库方面也主要是以 Oracle 小型机为主的方案。然而随着互联网的发展,数据量越来越大,Google 后来又推出了以普通计算机为主的分布式存储方案,任意一台计算机都能作为一个存储节点,然后通过让这些节点协同工作组成一个更大的存储系统,这就是 HDFS。
然而移动互联网使得数据量进一步增大,并且 4G 5G 的普及让用户对延迟也非常敏感,既要可靠,又要快,又要可扩容的存储逐渐变成了一种企业的刚需。而且随着时间的推移,互联网应用的流量集中度会越来越高,大企业的这种刚需诉求也越来越强烈。
因此,可靠的分布式存储作为一种基础设施也在不断地完善。它们都有一个共同的目标,就是让你像使用 filesystem 一样使用它们,并且具有高性能高可靠自动错误恢复等多种功能。然而我们需要面对的一个问题就是 CAP 理论的限制,线性一致性(C),可用性(A),分区容错性(P),三者只能同时满足两者。因此不可能存在完美的存储系统,总有那么一些 “不足”。我们需要做的其实就是根据不同的业务场景,选用合适的存储设施,来构建上层的应用。这就是 pulsar 的逻辑,也是 tidb 等 newsql 的逻辑,也是未来大型分布式系统的基本逻辑,所谓的 “云原生”。