kafka原理

2025/7/4 kafka

要点

  1. Kafka 产生背景,以及主要功能

  2. Kafka 消息队列的设计思想以及基本组成元素

  3. Kafka 内部核心步骤及原理

  4. 客户端生产/消费全流程剖析

  5. Kafka 监控工具及其基本原理

一、Kafka 简介

  1. Kafka 背景

Kafka 最早是由 LinkedIn(领英) 公司开发的,作为其自身业务消息处理的基础系统,后 LinkedIn 公司将 Kafka 捐赠给 Apache,现在已经成为 Apache 的一个顶级项目,Kafka 作为一个高吞吐的分布式的消息系统,目前已经被很多公司应用在实际的业务中了,并且与许多数据处理框架相结合,比如 Hadoop,Spark 等。

  1. Kafka 是什么

Kafka 是一个流式处理平台,流处理平台具有三大关键功能:

  • 发布和订阅记录流,类似于消息队列或企业消息系统。

  • 以容错持久的方式存储记录流。

  • 在记录流发生时对其进行处理。

Kafka 通常用于两大类应用程序:

  • 构建实时流数据管道,在系统或应用程序之间可靠地获取数据。

  • 构建实时流应用程序来转换或响应数据流。

二、Kafka 架构与组件介绍

# 消息队列

图片

  • offset:日志位移,指日志在消息队列中的位置序号

  • producer:生产者,往消息队列后面追加写入消息

  • consumer:消费者,一个消费者拥有一个唯一的 GroupID,每个消费者相互独立,消费到的最新位置称为 consumer offset,每个消费者都有自己的位移,用于标识最新消费到哪了

  • LEO:全称 log end offset,消息队列最尾端的 offset,也是生产者将要写入日志的位置

# 主题(Topic)

图片 如果有多份数据,将不便存放到同一个消息队列,因此引出了主题概念。主题将不同类型的消息区分开来,不同的生产者(producer)可以往不同主题存入数据,不同的消费者(consumer)也可以从不同的主题消费数据。

# 分区(Partition)

图片 当一个主题数据较大时,我们的生产和消费将会遇到性能瓶颈,这时候就需要对主题(topic)进行拆分,以便于生产者(producer)和消费者(consumer)能够多线程写入和消费,于是就有了分区(partition)

那么,分区是如何拆分的?

kafka 提供了轮询随机hash的规则以供生产者使用,轮巡和随机的拆法较为均衡,而 hash 则保证了具备相同特征消息的顺序性。每个分区(partiton)的消费和生产都是相对独立的,所以在顺序上没有保障。

至于 hash 的对象,这里需要先了解下消息的构成,kafka 的每条消息由 key 和 value 组成,hash 的对象则为 key,也就是说,相同 key 的消息会被拆分到同一个分区(partition),以此来保障某类消息的顺序性。

特性:分区只能扩,不能减。

现状:作业帮 kafka 集群当前默认 1 个主题 6 个分区,kafka 容器集群生产端使用 Hash 算法(key 为 pod+tp 保证同 pod 同类型日志被拆分到同一个分区中)

# 消费者组(Consumer-group)

图片 为了加快消费的效率以及消费端的容灾,kafka 引入了消费者组(consumer-group)的概念,消费者组中有 N 个实例,一个分区(partition)只能同时属于一个消费者(consumer)实例,这样可以将消费的压力分摊到各实例上,以便更快的消费。同时,若消费者组中有实例宕机,kafka 会及时剔除故障实例,并重新分配消费方案,消费者组有以下三种分配策略:

这里假设消费者组订阅了 3 个 Topic,每个 Topic 都有 4 个分区。

图片

  1. Range 分配策略

按照分区轮询分配,若消费者组订阅了多个 topic,可能导致序号靠前的分区压力较大的问题。

图片 2. RoundRobin 分配策略

在 Range 策略的基础上进行改进,会根据消费者分配到的分区数量进行调度,均衡消费者的压力。如下图,这样分配就考虑到了消费者所被分配的分区数量的均衡。

图片 3. Sticky 分配策略(0.11.X 版本引入)

在 RoundRobin 的基础上改进,适用于消费者数量变化时,能尽量保证之前的消费者消费相同分区。若消费者从 3 个变成 4 个,则会触发重新分配,若使用 RoundRobin 策略,则分配后只有极少数分区仍然在原来的消费者上。

图片 而经过 Stickey 策略会尽量保证之前的关联关系。

图片

以下三种情况时候,消费者组将会触发重平衡(Rebalance)

  1. 消费者数量变化,即有消费者加入组或者有消费者离组

  2. 消费者订阅 topic 数量变化

  3. 消费者订阅 topic 的分区数量发生变化,即服务端扩分区的情况

# 副本(Replication)

图片 为提高 kafka 的高可用性,引入了副本(Replication)的概念,即一个分区有 N 个副本,副本分为 Leader 副本和 Follower 副本,Leader 副本负责对外提供服务,Follower 副本则只负责同步 Leader 副本的数据。

  • LEO:Log End Offset,下一个日志写入的位置的位移

  • AR:Assigned Replicas 指当前分区中的所有副本

  • ISR:In-Sync Replicas,同步副本队列,包括 Leader 和 Follower,replica.lag.time.max.ms(延迟时间)两个参数决定副本是否可以加入 ISR 副本队列

  • OSR:Out-Sync Replicas,非同步副本队列,与 leader 副本同步滞后过多的副本(不包括 leader 副本)组成 OSR

  • HW:High Water,高水位,一个分区的 HW 取所有 ISR 副本的最小 LEO 值,说白了 HW 就是同步副本队列中都同步到的位置+1,消费者只能消费 HW 之前的数据,这样可以保证数据不丢失。

图片

  • 当 Leader 副本宕机时,kafka 会选举 ISR 中其他的副本为 Leader,当 ISR 中没有任何一个副本可选时候,这时候就看参数unclean.leader.election.enable,若为 true 则可以将 OSR 中的副本选为 leader,但是会造成数据丢失的问题。若为 false 则不会选出 Leader 副本,但是会造成服务不可用的问题。

  • 副本是为了避免服务器宕机或节点不可用的情况,如果多个副本分布在同一个节点上,若服务器宕机则无法应对,所以副本的数量(通过replication.factor参数设置)不能超过节点的数量,这里的节点就是 Broker。

现状:作业帮的 kafka 集群配置的副本数量为 2,即除了 leader 副本外,只有一份备份数据。unclean.leader.election.enable=true,即 leader 副本宕机后 ISR 中为空,也会将 OSR 中的副本选举出来当 Leader 副本。

# Broker

图片 Broker 是 Kafka 的单节点,1 台机器上可以部署多个 Broker,但为了高可用以及稳定性,一般都是 1 台机器部署 1 个 broker。

主题(topic)下的多个分区可以分布在多台 broker 上,也可以分布在同一个 broker 上。

一个分区(partition)的副本必须分布在不同的 broker 上,当 leader 副本发生故障时,kafka 将会把副本同步队列(ISR)中的其他副本选为 leader 副本,继续对外服务。

上图中,共 3 个 Topic,分区下的副本数都是 3 个。

Topic1 有 3 个分区,Topic2 有 2 个分区,Topic3 有 1 个分区。

这三者逻辑关系图如下:

图片

# Zookeeper

ZooKeeper 作为一个分布式的协调服务框架,主要用来解决分布式集群中,应用系统需要面对的各种通用的一致性问题。ZooKeeper 本身可以部署为一个集群,集群的各个节点之间可以通过选举来产生一个 Leader,选举遵循半数以上的原则,一个 Leader,N 个 Follower,Leader 负责读写,Follower 负责读。

在 Kafka 中 ZK 负责管理集群元数据,其管理的数据如下:

图片 图片

# Controller

Controller 是所有 Broker 的管理节点,负责管理整个集群中所有分区和副本状态,可以对其他普通 Broker 节点发号施令执行命令,同时负责更新元数据并向 Zookeeper 同步。

  • 副本选举:当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。

  • 更新集群 Broker 数据:当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。

  • 元数据的管理:当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。

三、核心步骤原理

# Controller 选举

在集群启动时,每个 Broker 都会去 ZK 创建一个/controller 临时节点,zookeeper 会保证有且仅有一个 broker 能创建成功,这个 broker 就会成为集群的总控器 controller。

图片 注:Kafka2.8 版本后使用 Kraft 协议进行选举,不再使用 Zookeeper 进行元数据的管理

# 日志存储

日志存储是以 Topic+Partition 的维度进行存储的,一个 Partition 一个文件夹,文件夹下的结构主要包含索引文件(index)和日志文件(log),下图是 kube-txprod-ai-college-pdf-parse 的分区 1 的文件夹构成:

图片

  • log 文件

存储实际的消息数据,每个 .log 文件代表一个日志段(log segment),一个日志段默认 1G(log.segment.bytes 参数),文件名通常是该段的起始偏移量(offset)。

图片 baseOffset:该段日志的起始 offset

lastOffset:该段日志的终止 offset

count:该段日志的条数

baseSequence:起始序列号

lastSequence:终止序列号

producerId:生产者 ID

producerEpoch:生产者版本号

partitionLeaderEpoch:分区版本号

isTransactional:是否开启事务

isControl:未知

position:消息在日志文件中的位移(file offset)

CreateTime: 创建时间

size:日志大小

magic:表示本次发布 kafka 服务程序协议版本号

conpresscodec:压缩算法

crc:消息校验值

isvalid:记录是否有效

offset:消息的偏移量

CreateTime:记录创建的时间

keysize:表示 key 的长度

valuesize:表示 value 的长度

sequence:消息的序列号(用于幂等机制)

headerKeys:消息 headers 头的 key

key:消息 key

payload:表示具体的消息

  • index 文件

存储偏移量到物理文件位置的映射,这个文件是一个稀疏索引,允许 Kafka 快速查找特定偏移量的消息。通过这个索引,Kafka 可以避免从头开始扫描整个日志文件,从而提高查找效率。

图片

  • timeindex 文件

存储消息时间戳到物理文件位置的映射。这个文件允许 Kafka 根据时间戳快速查找消息。它是一个稀疏索引,类似于 .index 文件,但索引的是时间戳而不是偏移量。

  • leader-epoch-checkpoint 文件

图片 第三行的 3 和 0 分别表示 epoch 版本和该版本的起始 offset。

记录分区的领导者版本信息。这个文件包含了每个版本的起始偏移量。领导者纪元是 Kafka 用来跟踪分区领导者变化的机制。每次分区领导者发生变化时,纪元号会增加。这个文件帮助 Kafka 在领导者变更时进行数据恢复和一致性检查。

  • partition.metadata 文件

存储分区的元数据信息。这个文件包含了分区的一些基本信息,如分区的版本号等。它帮助 Kafka 管理和维护分区的元数据。

# 日志读取

  1. 通过 offset 找到对应的 index 文件

  2. 用二分法通过 index 中的稀疏索引找到对应的 log position

  3. 继续用二分法在 log 文件中找到对应的 offset 日志

图片

# 副本同步机制

拿 test 这个 topic 的分区 0 举例,leader 副本在 broker1 上,follower 副本在 broker0 上。

图片

  • 被选举为 Follower 副本的 Broker,会向 Leader 副本所在的 Broker 发起 fetcher 线程,线程的个数由参数num.replica.fetchers决定,它不会为每一个 Partition 都启动一个线程,而是对一个 Broker 发起 num.replica.fetchers 个线程。

  • 构建 fetcher 请求,会设置 replicaId,该值会代表这个请求是来自副本同步,而不是 comsumer。

  • 发起 fetcher 请求,进行,将拉取到的数据追加本地副本的日志文件中。同步后的文件日志大小等都一致,在无日志差量的情况下,MD5 值一致。

图片

# 元数据同步

Controller 负责元数据的管理,有更新后会提交到 ZK,其他 Broker 将主动去 ZK 拉取元数据,同步到本地。

每台 Broker 的元数据都是全量的,便于分担客户端请求元数据的压力。

图片l%3

# Leader 副本选举

当分区的 Leader 副本崩溃时,Controller 负责选出新的 Leader 副本,它会按顺序将其他 ISR 中的可用副本选举成 Leader 副本,若 ISR(In-Sync Replications)中没有可用副本,则根据参数 unclean.leader.election.enable 来判断是否要选举 OSR(Out-Sync Replications)中的副本,参数为 True 则从未同步上的副本队列中选取 Leader。

图片

# 消费组重平衡

# 触发与通知

触发重平衡(Rebalence)的三个条件:

  1. 组成员发生变化(消费者加入组或者离组)

  2. 订阅主题数量发生变化(新增/减少主题订阅)

  3. 订阅主体的分区数发生变化(订阅主题的分区数发生变化)

重平衡的通知机制:通过心跳线程通知

消费端参数heartbeat.interval.ms,含义为心跳间隔时间,同时也可以控制通知重平衡的频率,若需要更快感知重平衡则将该值变小。

参数session.timeout.ms,会话超时时间,若该时间内服务端未收到心跳包,则会自动剔除消费者。

# 消费者组状态机

图片 状态机流转主要过程:

图片

# 重平衡流程

首先获取订阅 Topic 的元数据,再找到消费组的协调者 Coordinator 是哪台 Broker,然后注册 Broker。

主要流程分为两步:(协调者是某台 Broker)

  1. 加入组,即 JoinGroup

  2. 等待协调者分配方案,即 SyncGroup

图片

四、生产消费全流程

将以一条消息的生产到消费全流程,来分享 Kafka 内部做了什么。客户端以 client-go(sarama)为例。

# 生产端发送消息

图片

  1. 初始化生产者,获取 kafka 集群的元数据,包括所有 Topic 列表,分区分布等。

  2. 根据生产端选择的分区策略,选择消息要发送的分区。

  3. 根据 Topic 和 Partition,从获取的元数据中找到 Leader 副本所在的 Broker,并发起连接,鉴权操作也在此验证,将消息发送到本地缓冲区

  4. 触发阈值时批量发送缓冲区消息,触发批量消息发送有几种情况

    1. 消息大小大于等于 90KB 时

    2. 消息大小大于等于配置的 MaxMessageBytes 时

    3. 缓冲区消息数量大于等于配置的 MaxMessages 时

    4. 本次发送时间超过 Frequency 时

  5. Kafka 接受到消息后落盘,一个请求内可能会有多个 Topic+Partition 的消息,按路径保存到服务端配置的日志目录,更新 log 文件,更新 index 文件,顺序写入

# 消费端消费消息

图片

  1. 初始化消费者,获取元数据

  2. 根据消费组名称寻找消费组的 Coordinator,根据特殊主体__consumer_offset(专用于保存消费组相关信息),取 hash(groupID)%partitionNum 的分区中 Leader 副本所在 broker 作为 Coordinator 充当消费组的协调者

  3. 对 Coordinator 发起 JoinGroup 请求,上报自己要订阅的信息

  4. 对 Coordinator 发起 SyncGroup 请求,获取到消费方案

  5. 启动心跳包协程;获取被分配到的所有 Topic-Partition 的 Offset,并为每个 Partition 启动一个处理错误的协程;根据 broker 分组,将目的地为同一个 broker 的 Topic-Partiton 打包进请求参数,向 kafka 发起 poll 请求,获取数据后 consumer 会等所有的 Partition 都处理完手上的数据并标记完成后,再进行下一次请求

  6. 客户端 markMessage 标记消费完成,根据客户端配置自动/主动提交本地的 offset 到 Coordinator

鉴权验证过程

图片

  1. 用户认证 Authentication(SSL or SASL):

客户端连接服务时,提供自身身份信息,服务端验证通过后才准许客户端连接。Kafka 提供了 4 种 security protocol 供选择,可以同时配置多个 listeners,同时支持多种协议接入。其中 SSL、SASL_PLAINTEXT、SASL_SSL 都支持权限控制。

  • PLAINTEXT:明文传输、不认证

  • SSL:加密传输、broker 从客户端证书中提取用户信息

  • SASL_PLAINTEXT:使用 SASL 机制认证,明文传输

  • SASL_SSL:使用 SASL 认证,并使用 SSL 加密传输

  1. 权限控制 Authorisation(ACL)

可以设置规则,允许或者禁止哪些用户可以对哪些资源进行什么操作。Kafka 提供了权限控制的接口(kafka.security.authorizer.Authorizer),并且提供了默认的实现,用 zookeeper 存储 acl 规则(默认存储路径 /kafka-acl)。可以通过 bin/kafka-acls.sh 脚本来管理 ACL 规则。

五、Kafka 监控与管理

# Console 控制台

图片 kafka-console是一款轻量级的 Kafka 可视化管理平台(https://github.com/xxd763795151/kafka-console-ui (opens new window)),其支持功能如下:

  • 多集群支持

  • 集群信息

  • Topic 管理

  • 消费组管理

  • 消息管理

  • ACL

  • 客户端限流

  • 运维

底层原理即为调用 Kafka 提供的 AdminAPI 来实现对 Kafka 内部数据的查询以及管理,当前我们工单中的创建 Topic 以及 Topic 扩 P 均是通过 Console 代理调用了 Kafka 提供的 AdminAPI 来实现。Console 大部分功能与 Kafka 脚本执行达到的效果一致。

# Kafka 监控

图片 Kafka 监控主要使用Kafka-exporter导出(https://github.com/danielqsj/kafka_exporter (opens new window)),由 Prometheus 进行采集,最后通过 Grafana 进行展示。

exporter 的原理是直接获取 Broker 的元数据,以及通过连接 zk 计算消费延迟等监控数据,主要使用了以下两个核心包:

  • sarama(核心) :go 实现的 kafka 客户端,连接 broker 获取相关的指标与元数据

  • kazoo:go 实现的 zk 客户端,连接 kafka 的 zk 集群,主要用于 zk 消费组的 lag 计算

六、参考文档

Last Updated: 2025/9/7
只爱西经
林一