kafka原理
要点
Kafka 产生背景,以及主要功能
Kafka 消息队列的设计思想以及基本组成元素
Kafka 内部核心步骤及原理
客户端生产/消费全流程剖析
Kafka 监控工具及其基本原理
一、Kafka 简介
- Kafka 背景
Kafka 最早是由 LinkedIn(领英) 公司开发的,作为其自身业务消息处理的基础系统,后 LinkedIn 公司将 Kafka 捐赠给 Apache,现在已经成为 Apache 的一个顶级项目,Kafka 作为一个高吞吐的分布式的消息系统,目前已经被很多公司应用在实际的业务中了,并且与许多数据处理框架相结合,比如 Hadoop,Spark 等。
- Kafka 是什么
Kafka 是一个流式处理平台,流处理平台具有三大关键功能:
发布和订阅记录流,类似于消息队列或企业消息系统。
以容错持久的方式存储记录流。
在记录流发生时对其进行处理。
Kafka 通常用于两大类应用程序:
构建实时流数据管道,在系统或应用程序之间可靠地获取数据。
构建实时流应用程序来转换或响应数据流。
二、Kafka 架构与组件介绍
# 消息队列
offset:日志位移,指日志在消息队列中的位置序号
producer:生产者,往消息队列后面追加写入消息
consumer:消费者,一个消费者拥有一个唯一的 GroupID,每个消费者相互独立,消费到的最新位置称为 consumer offset,每个消费者都有自己的位移,用于标识最新消费到哪了
LEO:全称 log end offset,消息队列最尾端的 offset,也是生产者将要写入日志的位置
# 主题(Topic)
如果有多份数据,将不便存放到同一个消息队列,因此引出了主题概念。主题将不同类型的消息区分开来,不同的生产者(producer)可以往不同主题存入数据,不同的消费者(consumer)也可以从不同的主题消费数据。
# 分区(Partition)
当一个主题数据较大时,我们的生产和消费将会遇到性能瓶颈,这时候就需要对主题(topic)进行拆分,以便于生产者(producer)和消费者(consumer)能够多线程写入和消费,于是就有了分区(partition)。
那么,分区是如何拆分的?
kafka 提供了轮询、随机、hash的规则以供生产者使用,轮巡和随机的拆法较为均衡,而 hash 则保证了具备相同特征消息的顺序性。每个分区(partiton)的消费和生产都是相对独立的,所以在顺序上没有保障。
至于 hash 的对象,这里需要先了解下消息的构成,kafka 的每条消息由 key 和 value 组成,hash 的对象则为 key,也就是说,相同 key 的消息会被拆分到同一个分区(partition),以此来保障某类消息的顺序性。
特性:分区只能扩,不能减。
现状:作业帮 kafka 集群当前默认 1 个主题 6 个分区,kafka 容器集群生产端使用 Hash 算法(key 为 pod+tp 保证同 pod 同类型日志被拆分到同一个分区中)
# 消费者组(Consumer-group)
为了加快消费的效率以及消费端的容灾,kafka 引入了消费者组(consumer-group)的概念,消费者组中有 N 个实例,一个分区(partition)只能同时属于一个消费者(consumer)实例,这样可以将消费的压力分摊到各实例上,以便更快的消费。同时,若消费者组中有实例宕机,kafka 会及时剔除故障实例,并重新分配消费方案,消费者组有以下三种分配策略:
这里假设消费者组订阅了 3 个 Topic,每个 Topic 都有 4 个分区。
- Range 分配策略
按照分区轮询分配,若消费者组订阅了多个 topic,可能导致序号靠前的分区压力较大的问题。
2. RoundRobin 分配策略
在 Range 策略的基础上进行改进,会根据消费者分配到的分区数量进行调度,均衡消费者的压力。如下图,这样分配就考虑到了消费者所被分配的分区数量的均衡。
3. Sticky 分配策略(0.11.X 版本引入)
在 RoundRobin 的基础上改进,适用于消费者数量变化时,能尽量保证之前的消费者消费相同分区。若消费者从 3 个变成 4 个,则会触发重新分配,若使用 RoundRobin 策略,则分配后只有极少数分区仍然在原来的消费者上。
而经过 Stickey 策略会尽量保证之前的关联关系。
以下三种情况时候,消费者组将会触发重平衡(Rebalance)
消费者数量变化,即有消费者加入组或者有消费者离组
消费者订阅 topic 数量变化
消费者订阅 topic 的分区数量发生变化,即服务端扩分区的情况
# 副本(Replication)
为提高 kafka 的高可用性,引入了副本(Replication)的概念,即一个分区有 N 个副本,副本分为 Leader 副本和 Follower 副本,Leader 副本负责对外提供服务,Follower 副本则只负责同步 Leader 副本的数据。
LEO:Log End Offset,下一个日志写入的位置的位移
AR:Assigned Replicas 指当前分区中的所有副本
ISR:In-Sync Replicas,同步副本队列,包括 Leader 和 Follower,replica.lag.time.max.ms(延迟时间)两个参数决定副本是否可以加入 ISR 副本队列
OSR:Out-Sync Replicas,非同步副本队列,与 leader 副本同步滞后过多的副本(不包括 leader 副本)组成 OSR
HW:High Water,高水位,一个分区的 HW 取所有 ISR 副本的最小 LEO 值,说白了 HW 就是同步副本队列中都同步到的位置+1,消费者只能消费 HW 之前的数据,这样可以保证数据不丢失。
当 Leader 副本宕机时,kafka 会选举 ISR 中其他的副本为 Leader,当 ISR 中没有任何一个副本可选时候,这时候就看参数unclean.leader.election.enable,若为 true 则可以将 OSR 中的副本选为 leader,但是会造成数据丢失的问题。若为 false 则不会选出 Leader 副本,但是会造成服务不可用的问题。
副本是为了避免服务器宕机或节点不可用的情况,如果多个副本分布在同一个节点上,若服务器宕机则无法应对,所以副本的数量(通过replication.factor参数设置)不能超过节点的数量,这里的节点就是 Broker。
现状:作业帮的 kafka 集群配置的副本数量为 2,即除了 leader 副本外,只有一份备份数据。unclean.leader.election.enable=true,即 leader 副本宕机后 ISR 中为空,也会将 OSR 中的副本选举出来当 Leader 副本。
# Broker
Broker 是 Kafka 的单节点,1 台机器上可以部署多个 Broker,但为了高可用以及稳定性,一般都是 1 台机器部署 1 个 broker。
主题(topic)下的多个分区可以分布在多台 broker 上,也可以分布在同一个 broker 上。
一个分区(partition)的副本必须分布在不同的 broker 上,当 leader 副本发生故障时,kafka 将会把副本同步队列(ISR)中的其他副本选为 leader 副本,继续对外服务。
上图中,共 3 个 Topic,分区下的副本数都是 3 个。
Topic1 有 3 个分区,Topic2 有 2 个分区,Topic3 有 1 个分区。
这三者逻辑关系图如下:
# Zookeeper
ZooKeeper 作为一个分布式的协调服务框架,主要用来解决分布式集群中,应用系统需要面对的各种通用的一致性问题。ZooKeeper 本身可以部署为一个集群,集群的各个节点之间可以通过选举来产生一个 Leader,选举遵循半数以上的原则,一个 Leader,N 个 Follower,Leader 负责读写,Follower 负责读。
在 Kafka 中 ZK 负责管理集群元数据,其管理的数据如下:
# Controller
Controller 是所有 Broker 的管理节点,负责管理整个集群中所有分区和副本状态,可以对其他普通 Broker 节点发号施令执行命令,同时负责更新元数据并向 Zookeeper 同步。
副本选举:当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。
更新集群 Broker 数据:当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。
元数据的管理:当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
三、核心步骤原理
# Controller 选举
在集群启动时,每个 Broker 都会去 ZK 创建一个/controller 临时节点,zookeeper 会保证有且仅有一个 broker 能创建成功,这个 broker 就会成为集群的总控器 controller。
注:Kafka2.8 版本后使用 Kraft 协议进行选举,不再使用 Zookeeper 进行元数据的管理
# 日志存储
日志存储是以 Topic+Partition 的维度进行存储的,一个 Partition 一个文件夹,文件夹下的结构主要包含索引文件(index)和日志文件(log),下图是 kube-txprod-ai-college-pdf-parse 的分区 1 的文件夹构成:
- log 文件
存储实际的消息数据,每个 .log 文件代表一个日志段(log segment),一个日志段默认 1G(log.segment.bytes 参数),文件名通常是该段的起始偏移量(offset)。
baseOffset:该段日志的起始 offset
lastOffset:该段日志的终止 offset
count:该段日志的条数
baseSequence:起始序列号
lastSequence:终止序列号
producerId:生产者 ID
producerEpoch:生产者版本号
partitionLeaderEpoch:分区版本号
isTransactional:是否开启事务
isControl:未知
position:消息在日志文件中的位移(file offset)
CreateTime: 创建时间
size:日志大小
magic:表示本次发布 kafka 服务程序协议版本号
conpresscodec:压缩算法
crc:消息校验值
isvalid:记录是否有效
offset:消息的偏移量
CreateTime:记录创建的时间
keysize:表示 key 的长度
valuesize:表示 value 的长度
sequence:消息的序列号(用于幂等机制)
headerKeys:消息 headers 头的 key
key:消息 key
payload:表示具体的消息
- index 文件
存储偏移量到物理文件位置的映射,这个文件是一个稀疏索引,允许 Kafka 快速查找特定偏移量的消息。通过这个索引,Kafka 可以避免从头开始扫描整个日志文件,从而提高查找效率。
- timeindex 文件
存储消息时间戳到物理文件位置的映射。这个文件允许 Kafka 根据时间戳快速查找消息。它是一个稀疏索引,类似于 .index 文件,但索引的是时间戳而不是偏移量。
- leader-epoch-checkpoint 文件
第三行的 3 和 0 分别表示 epoch 版本和该版本的起始 offset。
记录分区的领导者版本信息。这个文件包含了每个版本的起始偏移量。领导者纪元是 Kafka 用来跟踪分区领导者变化的机制。每次分区领导者发生变化时,纪元号会增加。这个文件帮助 Kafka 在领导者变更时进行数据恢复和一致性检查。
- partition.metadata 文件
存储分区的元数据信息。这个文件包含了分区的一些基本信息,如分区的版本号等。它帮助 Kafka 管理和维护分区的元数据。
# 日志读取
通过 offset 找到对应的 index 文件
用二分法通过 index 中的稀疏索引找到对应的 log position
继续用二分法在 log 文件中找到对应的 offset 日志
# 副本同步机制
拿 test 这个 topic 的分区 0 举例,leader 副本在 broker1 上,follower 副本在 broker0 上。
被选举为 Follower 副本的 Broker,会向 Leader 副本所在的 Broker 发起 fetcher 线程,线程的个数由参数num.replica.fetchers决定,它不会为每一个 Partition 都启动一个线程,而是对一个 Broker 发起 num.replica.fetchers 个线程。
构建 fetcher 请求,会设置 replicaId,该值会代表这个请求是来自副本同步,而不是 comsumer。
发起 fetcher 请求,进行,将拉取到的数据追加本地副本的日志文件中。同步后的文件日志大小等都一致,在无日志差量的情况下,MD5 值一致。
# 元数据同步
Controller 负责元数据的管理,有更新后会提交到 ZK,其他 Broker 将主动去 ZK 拉取元数据,同步到本地。
每台 Broker 的元数据都是全量的,便于分担客户端请求元数据的压力。
l%3
# Leader 副本选举
当分区的 Leader 副本崩溃时,Controller 负责选出新的 Leader 副本,它会按顺序将其他 ISR 中的可用副本选举成 Leader 副本,若 ISR(In-Sync Replications)中没有可用副本,则根据参数 unclean.leader.election.enable 来判断是否要选举 OSR(Out-Sync Replications)中的副本,参数为 True 则从未同步上的副本队列中选取 Leader。
# 消费组重平衡
# 触发与通知
触发重平衡(Rebalence)的三个条件:
组成员发生变化(消费者加入组或者离组)
订阅主题数量发生变化(新增/减少主题订阅)
订阅主体的分区数发生变化(订阅主题的分区数发生变化)
重平衡的通知机制:通过心跳线程通知
消费端参数heartbeat.interval.ms,含义为心跳间隔时间,同时也可以控制通知重平衡的频率,若需要更快感知重平衡则将该值变小。
参数session.timeout.ms,会话超时时间,若该时间内服务端未收到心跳包,则会自动剔除消费者。
# 消费者组状态机
状态机流转主要过程:
# 重平衡流程
首先获取订阅 Topic 的元数据,再找到消费组的协调者 Coordinator 是哪台 Broker,然后注册 Broker。
主要流程分为两步:(协调者是某台 Broker)
加入组,即 JoinGroup
等待协调者分配方案,即 SyncGroup
四、生产消费全流程
将以一条消息的生产到消费全流程,来分享 Kafka 内部做了什么。客户端以 client-go(sarama)为例。
# 生产端发送消息
初始化生产者,获取 kafka 集群的元数据,包括所有 Topic 列表,分区分布等。
根据生产端选择的分区策略,选择消息要发送的分区。
根据 Topic 和 Partition,从获取的元数据中找到 Leader 副本所在的 Broker,并发起连接,鉴权操作也在此验证,将消息发送到本地缓冲区。
触发阈值时批量发送缓冲区消息,触发批量消息发送有几种情况
消息大小大于等于 90KB 时
消息大小大于等于配置的 MaxMessageBytes 时
缓冲区消息数量大于等于配置的 MaxMessages 时
本次发送时间超过 Frequency 时
Kafka 接受到消息后落盘,一个请求内可能会有多个 Topic+Partition 的消息,按路径保存到服务端配置的日志目录,更新 log 文件,更新 index 文件,顺序写入
# 消费端消费消息
初始化消费者,获取元数据
根据消费组名称寻找消费组的 Coordinator,根据特殊主体__consumer_offset(专用于保存消费组相关信息),取 hash(groupID)%partitionNum 的分区中 Leader 副本所在 broker 作为 Coordinator 充当消费组的协调者
对 Coordinator 发起 JoinGroup 请求,上报自己要订阅的信息
对 Coordinator 发起 SyncGroup 请求,获取到消费方案
启动心跳包协程;获取被分配到的所有 Topic-Partition 的 Offset,并为每个 Partition 启动一个处理错误的协程;根据 broker 分组,将目的地为同一个 broker 的 Topic-Partiton 打包进请求参数,向 kafka 发起 poll 请求,获取数据后 consumer 会等所有的 Partition 都处理完手上的数据并标记完成后,再进行下一次请求
客户端 markMessage 标记消费完成,根据客户端配置自动/主动提交本地的 offset 到 Coordinator
鉴权验证过程
- 用户认证 Authentication(SSL or SASL):
客户端连接服务时,提供自身身份信息,服务端验证通过后才准许客户端连接。Kafka 提供了 4 种 security protocol 供选择,可以同时配置多个 listeners,同时支持多种协议接入。其中 SSL、SASL_PLAINTEXT、SASL_SSL 都支持权限控制。
PLAINTEXT:明文传输、不认证
SSL:加密传输、broker 从客户端证书中提取用户信息
SASL_PLAINTEXT:使用 SASL 机制认证,明文传输
SASL_SSL:使用 SASL 认证,并使用 SSL 加密传输
- 权限控制 Authorisation(ACL)
可以设置规则,允许或者禁止哪些用户可以对哪些资源进行什么操作。Kafka 提供了权限控制的接口(kafka.security.authorizer.Authorizer),并且提供了默认的实现,用 zookeeper 存储 acl 规则(默认存储路径 /kafka-acl)。可以通过 bin/kafka-acls.sh 脚本来管理 ACL 规则。
五、Kafka 监控与管理
# Console 控制台
kafka-console是一款轻量级的 Kafka 可视化管理平台(https://github.com/xxd763795151/kafka-console-ui (opens new window)),其支持功能如下:
多集群支持
集群信息
Topic 管理
消费组管理
消息管理
ACL
客户端限流
运维
底层原理即为调用 Kafka 提供的 AdminAPI 来实现对 Kafka 内部数据的查询以及管理,当前我们工单中的创建 Topic 以及 Topic 扩 P 均是通过 Console 代理调用了 Kafka 提供的 AdminAPI 来实现。Console 大部分功能与 Kafka 脚本执行达到的效果一致。
# Kafka 监控
Kafka 监控主要使用Kafka-exporter导出(https://github.com/danielqsj/kafka_exporter (opens new window)),由 Prometheus 进行采集,最后通过 Grafana 进行展示。
exporter 的原理是直接获取 Broker 的元数据,以及通过连接 zk 计算消费延迟等监控数据,主要使用了以下两个核心包:
sarama(核心) :go 实现的 kafka 客户端,连接 broker 获取相关的指标与元数据
kazoo:go 实现的 zk 客户端,连接 kafka 的 zk 集群,主要用于 zk 消费组的 lag 计算
六、参考文档
《Kafka 核心技术与实战》:https://learn.lianglianglee.com/%E4%B8%93%E6%A0%8F/Kafka%E6%A0%B8%E5%BF%83%E6%8A%80%E6%9C%AF%E4%B8%8E%E5%AE%9E%E6%88%98 (opens new window)
《Kafka exporter 调研与改进》:https://cloud.tencent.com/developer/article/1794971 (opens new window)
《Kafka 生产调优手册》:https://www.cnblogs.com/frankcui/p/16319440.html#_label3_0_3_0 (opens new window)
《Kafka 认证,鉴权,加密》:https://mapan1984.github.io/kafka-security/ (opens new window)