开源地址:https://github.com/Tencent/phxqueue
PhxQueue 是微信开源的一款基于 Paxos 协议实现的高可用、高吞吐和高可靠的分布式队列,保证At-Least-Once Delivery,在微信内部广泛支持微信支付、公众平台等多个重要业务。
消息队列作为成熟的异步通信模式,对比常用的同步通信模式,有如下优势:
解耦:防止引入过多的 API 给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。
削峰和流控:消息生产者不会堵塞,突发消息缓存在队列中,消费者按照实际能力读取消息。
复用:一次发布多方订阅。
微信初期使用的分布式队列(称为旧队列)是微信后台自研的重要组件,广泛应用在各种业务场景中,为业务提供解耦、缓存、异步化等能力。
旧队列以 Quorum NRW 作为同步机制,其中 N=3、W=R=2,刷盘方式采用异步刷盘,兼顾了性能和可用性。
随着业务发展,接入业务种类日益增多,旧队列逐渐显得力不从心,主要不足如下:
异步刷盘,数据可靠性堪忧
对于支付相关业务,保证数据可靠是首要需求。
目前大多数分布式队列方案是以同步复制+异步刷盘来保证数据可靠性的,但我们认为需要同步刷盘来进一步提高数据可靠性。
乱序问题
部分业务提出了绝对有序的需求,但 NRW 并不保证顺序性,无法满足需求。
另外旧队列还存在出队去重、负载均衡等其他方面的问题亟需改善。上述种种促使了我们考虑新的方案。
Kafka 是大数据领域常用的消息队列,最初由 LinkedIn 采用 Scala 语言开发,用作 LinkedIn 的活动流追踪和运营系统数据处理管道的基础。
其高吞吐、自动容灾、出入队有序等特性,吸引了众多公司使用,在数据采集、传输场景中发挥着重要作用,详见Powerd By Kafka。
但我们充分调研了 Kafka,认为其在注重数据可靠性的场景下,有如下不足:
Kafka 在开启配置 log.flush.interval.messages=1,打开同步刷盘特性后,吞吐会急剧下降。
该现象由如下因素导致:
SSD 写放大
业务消息平均大小在数 1k 左右。
而 SSD 一次刷盘的最小单位为一个 page size,大小为 4k。
当 Kafka 对大小不足 4k 的消息进行刷盘时,实际写入的物理数据量是消息大小的数倍。导致硬盘写带宽资源被浪费。
业务场景下 Producer batch 效果不好
Kafka Producer batch,简单来说,就是把多个消息打包在一起发送到 Broker,广泛用于大数据场景。按道理,batch效果足够,是能抵消写放大的影响的。
但业务场景下的消息生产不同于大数据场景下的日志生产,每个需要入队的业务请求在业务系统中有独立的上下文,batch难度大。即使在业务和Broker之间加入代理层,将Producer转移到代理层内进行batch,也因代理层的节点数众多,batch效果难以提高,导致写放大无法抵消。
Kafka replica 同步设计概要:
Kafka Broker leader 会跟踪与其保持同步的 follower 列表,该列表称为ISR(即in-sync Replica)。如果一个 follower 宕机,或者落后太多,leader 将把它从ISR中移除。
该同步方式偏重于同步效率,但是在可用性方面表现略显不足:
Broker fail over 过程成功率下降严重
在3 replicas的场景下,leader 均匀分布在各 Broker 上,一个Broker出现故障,就意味着1/3的 leader、follower 离线,这时读写成功率下降:
对于 leader 离线的 partition,暂时无法读写,需要等待 Controller 选举出新的 leader 后才能恢复;
对于 follower 离线的 partition,也暂时无法读写,需要等待一定时长(取决于 replica.lag.time.max.ms,默认10s)后,leader 将故障 follower 从 ISR 中剔除才能恢复。
也就是说,任意一个 Broker 故障时,读写成功率会在一段时间内降为0。
同步延迟取决于最慢节点
在同步复制场景下,需要等待所有节点返回ack。
通过对比 Kafka replica 与 Paxos 的表现,我们认为在同步方式上 Paxos 是更好的选择:
所以,我们基于旧队列,用 Paxos 协议改造了同步逻辑,并且进行了包括同步刷盘之内的多项优化,完成了 PhxQueue。
PhxQueue 目前在微信内部广泛支持微信支付、公众平台等多个重要业务,日均入队达千亿,分钟入队峰值达一亿。
其设计出发点是高数据可靠性,且不失高可用和高吞吐,同时支持多种常见队列特性。
PhxQueue支持的特性如下:
同步刷盘,入队数据绝对不丢,自带内部实时对账
出入队严格有序
多订阅
出队限速
出队重放
所有模块均可平行扩展
存储层批量刷盘、同步,保证高吞吐
存储层支持同城多中心部署
存储层自动容灾/接入均衡
消费者自动容灾/负载均衡
PhxQueue 由下列5个模块组成。
Store 作为队列存储,引入了 PhxPaxos 库,以 Paxos 协议作副本同步。只要多数派节点正常工作及互联,即可提供线性一致性读写服务。
为了提高数据可靠性,同步刷盘作为默认开启特性,且性能不亚于异步刷盘。
在可用性方面,Store 内有多个独立的 paxos group,每个 paxos group 仅 master 提供读写服务,平时 master 动态均匀分布在 Store 内各节点,均衡接入压力,节点出灾时自动切换 master 到其它可用节点。
Producer 作为消息生产者,根据 key 决定消息存储路由。相同 key 的消息默认路由到同一个队列中,保证出队顺序与入队顺序一致。
Consumer 作为消费者,以批量拉取的方式从 Store 拉消息,支持多协程方式批量处理消息。
Consumer 以服务框架的形式提供服务,使用者以实现回调的方式,根据不同主题(Topic),不同处理类型(Handler)定义具体的消息处理逻辑。
Scheduler 的作用是,收集 Consumer 全局负载信息, 对 Consumer 做容灾和负载均衡。当使用者没有这方面的需求时,可以省略部署 Scheduler,此时各 Consumer 根据配置权重决定与队列的处理关系。
部署 Scheduler 后,Scheduler leader 与所有 Conusmer 维持心跳,在收集 Consumer 的负载信息的同时,反向调整 Consumer 与队列的处理关系。
当 Scheduler leader 宕机了后,Scheduler 依赖下述分布式锁服务选举出新 leader,不可用期间仅影响 Consumer 的容灾和负载均衡,不影响 Consumer 的正常消费。
Lock 是一个分布式锁,其接口设计非常通用化,使用者可以选择将 Lock 独立部署,提供通用分布式锁服务。
Lock 在 PhxQueue 中的作用有如下两点:
为 Scheduler 选举 leader;
防止多个 Consumer 同时处理一条队列。
Lock 同样也是可选择部署的模块:
若部署了 Scheduler,就必须部署 Lock 为 Scheduler 选举出 leader;
否则,若业务对重复消费不敏感,可选择不部署 Lock。
这里所指的重复消费场景是:若省略部署 Scheduler 的话,Consumer 需要通过读取配置得知可处理的队列集合;当队列有变更(如队列缩扩容)时,各 Consumer 机器上的配置改变有先有后,这时各 Consumer 在同一时间看到的配置状态可能不一样,导致一段时间内两个 Consumer 都认为自己该消费同一个队列,造成重复消费。Lock 的部署可以避免该场景下的重复消费。(注意,即使省略部署 Lock,该场景仅造成重复消费,而不会造成乱序消费)
PhxQueue Store 通过 PhxPaxos 协议进行副本复制。
PhxPaxos 的工程实现方式分为三层:app 层负责处理业务请求,paxos 层执行 paxos同步过程,状态机层更新业务状态。
其中,app 层发起 paxos 提议,paxos 层各节点通过 paxos 协议共同完成一个 paxos log 的确认,之后状态机以 paxos log 作为的输入作状态转移,更新业务的状态,最后返回状态转移结果给 app 层。
一致的状态机层,加上来自 paxos 层的一致输入,就产生一致的状态转移,从而保证多个节点强一致。
这里我们要基于 PhxPaxos 在状态机层实现一个队列,就需要作如下概念映射:
队列这种模型不涉及数据修改,是有序的数据集合,和 paxos log 的定义很像,所以可以让入队的数据直接作为 paxos log,而状态机只需要保存 paxos log 序列。
instance id 的严格递增特性,使得它可以方便地作为队列偏移。
队列中读偏移之前的数据,认为是可以删除的数据,这点和 check point 的定义一致。
整体上队列状态机和 paxos 能很好地切合。
未经优化的 Paxos 协议并未解决同步刷盘的写放大问题。而且,其副本同步效率不如 Kafka。
原因是,Kafka 的副本同步是流式批量的,而 Paxos 协议是以 paxos log 为单位串行同步,每个 paxos log 的同步开销是 1个RTT + 1次刷盘。
在多DC部署的场景下,ping 时延可达4ms,这样会导致单个 paxos group 的理论最高 TPS 仅250。
我们采用多 paxos group 部署 以及 Group Commit 的方式来同时解决同步刷盘的写放大问题以及Paxos吞吐问题。
如上图, 我们部署多个paxos group,以 paxos group 作为 Group Commit 的单位,一个 paxos group 内对应多个queue,将多个queue在一段时间内入队的数据合并在一起,当等待耗时或积累数据数目达到阀值,才会触发一次Paxos同步和同步刷盘,等待期间前端阻塞。
与Kafka的Producer批量逻辑相比,在存储层以 Group Commit 进行批量合并的好处如下:
业务层无需关注如何组织请求进行批量;
在存储层以 paxos group 为单位的聚合效果比上层聚合效果更好。
下面分别从设计、性能、存储层 failover 过程三方面对比 PhxQueue 与 Kafka。
PhxQueue 架构虽然与 Kafka 等常见分布式队列类似,但设计上仍有不少独特之处。为了能让对 Kafka 有一定了解的读者更方便地了解 PhxQueue,下面列出了两者的对比。
注:以下对比基于相同的数据可靠性场景:少数派节点失效,不会造成数据丢失,且整体依旧可用。
CPU: 64 x Intel(R) Xeon(R) CPU E5-2620 v3 @ 2.40GHz
Memory: 64 GB
Network: 10 Gigabit Ethernet
Disk: SSD Raid 10
Cluster Nodes: 3
Ping: 1ms
开启 Producer Batch:
关闭 Producer Batch:
以上场景,PhxQueue 瓶颈在 cpu,使用率达70% ~ 80%。
PhxQueue 性能与 Kafka 持平;
相同 QPS 下,由于不用等最慢节点返回,PhxQueue 平均耗时比 Kafka 稍优;
关闭 Producer Batch 后,在同步刷盘场景下,PhxQueue 性能可达 Kafka 的2倍,原因是,PhxQueue 存储层在写盘前做了 batch,而 Kafka 没有,所以后者会有写放大。
主要对比杀死存储层的一个节点后,对整体吞吐的影响。
表现:
Failover 期间,在不同阶段程度不同,入队成功率在0% ~ 33%;
Failover 持续时间由租约决定,租约时长默认10s。
测试过程:
将 replica.lag.time.max.ms 从 10s 调整为 60s(延长时间方便观察),然后 kill Broker 0,挑选3个 partition,观察 ISR 变化如下:
第一阶段(未 kill Broker 0):
Topic: test-dis-p100 Partition: 96 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2
第二阶段(kill Broker 0 后持续8s):
Topic: test-dis-p100 Partition: 96 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2
第三阶段(持续1分钟左右):
Topic: test-dis-p100 Partition: 96 Leader: 1 Replicas: 0,1,2 Isr: 2,1
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0
第四阶段(至此入队成功率完全恢复):
Topic: test-dis-p100 Partition: 96 Leader: 1 Replicas: 0,1,2 Isr: 2,1
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 2,1
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 2,1
其中,第二/三阶段标红处对应的partition入队成功率受损:
第二阶段期间,Partition 96/97/98 均无法写入,入队成功率成功率下降至0%。
第三阶段期间,Partition 96 可继续写入,但 Partition 97/98 无法写入,因为写入要等 Broker 0 回 ack,但 Broker 0 已 kill,入队成功率下降至33%。
而实际观察,第二/三阶段期间完全没吞吐,原因是压测工具不断报连接失败,停止了写入。
压测工具输出:
30551 records sent, 6107.8 records/sec (0.06 MB/sec), 1733.9 ms avg latency, 5042.0 max latency.
30620 records sent, 6117.9 records/sec (0.06 MB/sec), 1771.9 ms avg latency, 5076.0 max latency.
30723 records sent, 6123.8 records/sec (0.06 MB/sec), 1745.4 ms avg latency, 5009.0 max latency.
30716 records sent, 6127.3 records/sec (0.06 MB/sec), 1841.1 ms avg latency, 5299.0 max latency.
30674 records sent, 6133.6 records/sec (0.06 MB/sec), 1621.3 ms avg latency, 4644.0 max latency.
>>> kill Broker 0 here (入队成功率受损)>>>
10580 records sent, 123.4 records/sec (0.00 MB/sec), 1537.1 ms avg latency, 84236.0 max latency. <<---吞吐下降严重
11362 records sent, 132.3 records/sec (0.00 MB/sec), 1658.3 ms avg latency, 84232.0 max latency.
11367 records sent, 132.3 records/sec (0.00 MB/sec), 1582.4 ms avg latency, 84228.0 max latency.
11236 records sent, 130.9 records/sec (0.00 MB/sec), 1694.2 ms avg latency, 84240.0 max latency.
11406 records sent, 132.8 records/sec (0.00 MB/sec), 1650.5 ms avg latency, 84233.0 max latency.
压测工具连接Broker失败日志:
[2017-08-16 15:38:22,844] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-16 15:38:22,859] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
原因分析:
Kafka Broker leader 是通过 Controller 选举出来的,ISR 列表是 leader 维护的。
前者的的租约是 Controller 定义的,后者的租约是 Broker 配置 replica.lag.time.max.ms 指定的。
所以,第二阶段持续时间较短,是 Controller 的租约时间决定的,第三阶段持续时间较长,是 replica.lag.time.max.ms 决定的。
当 Broker 0 被 kill 时,前者影响本来 Broker 0 是 leader 的 1/3 partitions 的入队成功率,后者影响 Broker 0 作为 follower 的 2/3 partitions 的入队成功率。
表现:
Failover 期间,入队成功率仅下降至66%;
Failover 持续时间由租约决定,租约时长默认5s。
开启 换队列重试特性(适合没有绝对顺序性要求的业务提高可用性)后,Failover 期间仍有90+%入队成功率。
测试过程:
将 Store master 租约时长从10s调整为60s(延长时间方便观察),然后kill store 0,观察某 Producer 入队成功率:
关闭换队列重试特性:
>>> kill store 0 here (入队成功率受损)>>>
-------------------------------------------
-- total: 192323
-- time(ms): 10015
-- qps: 19203.49
-- routine_sleep: 73.88%
-- retcode cnt percent
-- -1 22097 11.49 <<--- 失败:连接失败
-- 0 125905 65.47 <<--- 成功:仍有66%成功率
-- 10102 44321 23.05 <<--- 失败:提示需要重定向到 master
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 0 0.00
-- < 5 610 0.32
-- < 10 7344 3.82
-- < 20 18937 9.85
-- < 50 36067 18.75
-- < 100 6971 3.62
-- < 200 20239 10.52
-- < 500 59059 30.71
-- < 1000 30601 15.91
-- >= 1000 12495 6.50
>>> (入队成功率完全恢复)>>>
-------------------------------------------
-- total: 198955
-- time(ms): 10001
-- qps: 19893.51
-- routine_sleep: 98.00%
-- retcode cnt percent
-- 0 198955 100.00 <<--- 成功:100%成功率
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 2 0.00
-- < 5 5895 2.96
-- < 10 30830 15.50
-- < 20 65887 33.12
-- < 50 95403 47.95
-- < 100 753 0.38
-- < 200 185 0.09
-- < 500 0 0.00
-- < 1000 0 0.00
-- >= 1000 0 0.00
开启换队列重试特性:
>>> kill store 0 here (入队成功率受损)>>>
-------------------------------------------
-- total: 134752
-- time(ms): 10001
-- qps: 13473.85
-- routine_sleep: 77.43%
-- retcode cnt percent
-- -202 14 0.01 <<--- 失败:超时
-- -1 2712 2.01 <<--- 失败:连接失败
-- 0 127427 94.56 <<--- 成功:仍有94%成功率
-- 10102 4572 3.39 <<--- 失败:提示需要重定向到 master
-- 10105 27 0.02 <<--- 失败:master 未选举出来
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 4 0.00
-- < 5 3284 2.44
-- < 10 10704 7.94
-- < 20 22109 16.41
-- < 50 32752 24.31
-- < 100 4541 3.37
-- < 200 4331 3.21
-- < 500 11265 8.36
-- < 1000 19706 14.62
-- >= 1000 26056 19.34
>>> (入队成功率完全恢复)>>>
-------------------------------------------
-- total: 198234
-- time(ms): 10014
-- qps: 19795.69
-- routine_sleep: 94.36%
-- retcode cnt percent
-- 0 198234 100.00 <<--- 成功:100%成功率
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 0 0.00
-- < 5 3875 1.95
-- < 10 22978 11.59
-- < 20 53000 26.74
-- < 50 87575 44.18
-- < 100 6204 3.13
-- < 200 6468 3.26
-- < 500 11963 6.03
-- < 1000 5637 2.84
-- >= 1000 534 0.27
在存储层 failover 过程中,PhxQueue 和 Kafka 的入队成功率均有一定时长的下降,PhxQueue 的入队成功率在66% ~ 100%,Kafka 的入队成功率在0% ~ 33%;
PhxQueue 开启换队列重试特性后,failover 过程中入队成功率保持在90+%;
PhxQueue 和 Kafka 均能自动切换 master,最终入队成功率完全恢复。
PhxQueue 在存储层做了很多的努力:实现了 master 自动切换,且仍然保证线性一致,切换期间仍然高可用;保证了同步刷盘的吞吐,其性能不亚于异步刷盘;。
另外实现了大部分队列实用特性,例如出入队顺序一致、多订阅、限速、消息重放等,适用于各种业务场景。
目前 PhxQueue 已在微信内部大规模使用,也正式开源。
我们将保持 PhxQueue 开源版本与内部版本的一致,欢迎读者试用并反馈意见。
开源地址:https://github.com/Tencent/phxqueue
原文来自:微信后台团队
声明:所有来源为“聚合数据”的内容信息,未经本网许可,不得转载!如对内容有异议或投诉,请与我们联系。邮箱:marketing@think-land.com
通过企业关键词查询企业涉讼详情,如裁判文书、开庭公告、执行公告、失信公告、案件流程等等。
IP反查域名是通过IP查询相关联的域名信息的功能,它提供IP地址历史上绑定过的域名信息。
结合权威身份认证的精准人脸风险查询服务,提升人脸应用及身份认证生态的安全性。人脸风险情报库,覆盖范围广、准确性高,数据权威可靠。
全国城市和站点空气质量查询,污染物浓度及空气质量分指数、空气质量指数、首要污染物及空气质量级别、健康指引及建议采取的措施等。
输入手机号和拦截等级,查看是否是风险号码