加入收藏 | 设为首页 | 会员中心 | 我要投稿 源码网 (https://www.900php.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Windows > 正文

每天处理千亿级日志量,Kafka是如何做到的?

发布时间:2019-12-26 18:31:20 所属栏目:Windows 来源:站长网
导读:副标题#e# 之前为大家分享了不少 Kafka 原理解析类的干货,今天咱们一起来看看 360 基于 Kafka 千亿级数据量的深度实践! 图片来自 Pexels 本文主要围绕如下内容分享: 消息队列选型 Kafka 在 360 商业化的现状 Kafka Client 框架 数据高可用 负载均衡 鉴权

每天处理千亿级日志量,Kafka是如何做到的?

我们实现了两个框架:

LogProducer,支持 at least once。

LogConsumer,支持 at least once 和 exactly once 两种语意,其中 exactly once 需要业务去实现 Rollback 接口。

每天处理千亿级日志量,Kafka是如何做到的?

LogProducer 框架的大体思路是通过内存队列将日志发送到 Kafka,当 Kafka 或网络不可用的情况下会写本地磁盘,同时会有一个线程去实时检测 Kafka 或者网络的可用情况,如果恢复就会加载磁盘日志并发送到 Kafka。

我们还支持一种共享内存的策略来代替内存,使用共享内存是为了减少重启过程中日志的丢失数。

每天处理千亿级日志量,Kafka是如何做到的?

LogConsumer 的框架实现,通过 Blocking Queue 将 Consumer 线程和 Worker 线程解耦,因为现实情况是消费逻辑很简单,但是处理逻辑会很复杂。

这样就可以对 Consumer 线程和 Worker 线程做不同的配置,同时通过 Blocking Queue 还可以实现反压机制。

比如 Worker 处理不过来了,这时候 Blocking Queue 就会满,反压到 Consumer 线程会停止消费。

同时我们在 Worker 线程接口里面会提供接口让用户提交到 global offsetmap。

如上图我们提供三个组合接口,如果在业务处理与 Commit 中实现了业务端 Rollback 逻辑, 那么就是 exactly once 语义,默认是 at least once 语义。

数据高可用

之前讲过 Kafka 本身提供 Replica+ISR 的机制来保证数据高可用,但我们觉得这个可能还不够,所以我们还要支持 Rack Aware。

比如 Replica=3 的情况,确保三个副本在不同的物理 Rack 上,这样我们最多能容忍两个物理机架同时出问题而数据仍可用,我们 Rack Aware 方案是与负载均衡方案一起做掉的,具体后面会讲。

每天处理千亿级日志量,Kafka是如何做到的?

值得注意的是 Kafka 官方也支持 Rack Aware,通过在 Broker 端配置 broker.rack 参数可实现。

但有一个限制,必须为每个 Rack 分配数量相同的 Brokers,否则会导致 Replica 分配倾斜,实际情况是 IDC 的 Rack 是很多的,分配到的物理机分布也可能很随机。

一个可以参考的解决思路是采用虚拟 Rack Group 的概念,比如维护 3 个虚拟 Rack Group,申请到的物理机加入到这 3 个 Group 中,并确保 Rack Group 间分配的物理机数量一致。

当然 Rack Group 间物理机不应存在有相同物理 Rack 的情况。

负载均衡

Kafka 的负载均衡功能在 Confluent 商业版本才支持,负载均衡本质上来说是 Replica 分配均匀问题。

我们一开始想通过经典一致性 Hash 来解决,如下图:

每天处理千亿级日志量,Kafka是如何做到的?

然后我们发现经典一次性 Hash 不能满足我们的需求,比如要加一个节点 node5,只能分担节点 node2 的部分负载,不能做全局节点的负载均衡。

每天处理千亿级日志量,Kafka是如何做到的?

于是我们基于虚拟节点的一次性 Hash 的算法实现了一个方案,如图所示:相同的颜色对应同一个物理机,Hash 环上的都是虚拟节点。

这里有四个物理节点,其中 node4 是我们新加的节点。通过虚拟节点可以把物理节点的负载足够均衡地分散出去,所以当我把 node4 加到 Hash 环上的时候,分担了所有物理机的负载。

算法实现的步骤分为两个大的步骤:

①新建 hash circle:通过 vnode_str(比如 hostname-v0)做一个 MD5 的 Hash,得到虚拟节点的 vnode_key,再用 ring 字典来保存虚拟节点到物理节点的映射,同时将 vnode_key 加入到 sorted_keys 的 list 中。

②在 Hash 环中分配 Replica:将(topic_name+partition_num+replica_num)作为 Key 用相同的 MD5 Hash 算法得到 replica_key。

接着二分查找该 replica_key 在 sorted_keys 中的 Position, 最后用 Ring 字典来映射到物理机 Node,至此 Replica 分配完成。

每天处理千亿级日志量,Kafka是如何做到的?

我们基于这个算法解决三个问题:

添加物理节点只需迁移很小一部分数据。

对不同配置的物理机做权重设置,可以支持异构集群的部署。

实现 Replica 的 Rack Aware,物理节点上面会有 Rack 信息,在为 Replica 分配物理节点的时候会记录已经分配的 Rack 信息。

如果有重复的情况,就会把 vnode_key 找到 Position 的位置 +1 找下一个物理节点,我们会确保三个 Replica 的物理 Rack 一定是不一样的(假如 Replica=3)。

(编辑:源码网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读