kafka原理

2025/7/4 kafka

要点

  1. Kafka 产生背景,以及主要功能

  2. Kafka 消息队列的设计思想以及基本组成元素

  3. Kafka 内部核心步骤及原理

  4. 客户端生产/消费全流程剖析

  5. Kafka 监控工具及其基本原理

一、Kafka 简介

  1. Kafka 背景

Kafka 最早是由 LinkedIn(领英) 公司开发的,作为其自身业务消息处理的基础系统,后 LinkedIn 公司将 Kafka 捐赠给 Apache,现在已经成为 Apache 的一个顶级项目,Kafka 作为一个高吞吐的分布式的消息系统,目前已经被很多公司应用在实际的业务中了,并且与许多数据处理框架相结合,比如 Hadoop,Spark 等。

  1. Kafka 是什么

Kafka 是一个流式处理平台,流处理平台具有三大关键功能:

  • 发布和订阅记录流,类似于消息队列或企业消息系统。

  • 以容错持久的方式存储记录流。

  • 在记录流发生时对其进行处理。

Kafka 通常用于两大类应用程序:

  • 构建实时流数据管道,在系统或应用程序之间可靠地获取数据。

  • 构建实时流应用程序来转换或响应数据流。

二、Kafka 架构与组件介绍

# 消息队列

图片

  • offset:日志位移,指日志在消息队列中的位置序号

  • producer:生产者,往消息队列后面追加写入消息

  • consumer:消费者,一个消费者拥有一个唯一的 GroupID,每个消费者相互独立,消费到的最新位置称为 consumer offset,每个消费者都有自己的位移,用于标识最新消费到哪了

  • LEO:全称 log end offset,消息队列最尾端的 offset,也是生产者将要写入日志的位置

# 主题(Topic)

图片 如果有多份数据,将不便存放到同一个消息队列,因此引出了主题概念。主题将不同类型的消息区分开来,不同的生产者(producer)可以往不同主题存入数据,不同的消费者(consumer)也可以从不同的主题消费数据。

# 分区(Partition)

图片 当一个主题数据较大时,我们的生产和消费将会遇到性能瓶颈,这时候就需要对主题(topic)进行拆分,以便于生产者(producer)和消费者(consumer)能够多线程写入和消费,于是就有了分区(partition)

那么,分区是如何拆分的?

kafka 提供了轮询随机hash的规则以供生产者使用,轮巡和随机的拆法较为均衡,而 hash 则保证了具备相同特征消息的顺序性。每个分区(partiton)的消费和生产都是相对独立的,所以在顺序上没有保障。

至于 hash 的对象,这里需要先了解下消息的构成,kafka 的每条消息由 key 和 value 组成,hash 的对象则为 key,也就是说,相同 key 的消息会被拆分到同一个分区(partition),以此来保障某类消息的顺序性。

特性:分区只能扩,不能减。

现状:作业帮 kafka 集群当前默认 1 个主题 6 个分区,kafka 容器集群生产端使用 Hash 算法(key 为 pod+tp 保证同 pod 同类型日志被拆分到同一个分区中)

# 消费者组(Consumer-group)

图片 为了加快消费的效率以及消费端的容灾,kafka 引入了消费者组(consumer-group)的概念,消费者组中有 N 个实例,一个分区(partition)只能同时属于一个消费者(consumer)实例,这样可以将消费的压力分摊到各实例上,以便更快的消费。同时,若消费者组中有实例宕机,kafka 会及时剔除故障实例,并重新分配消费方案,消费者组有以下三种分配策略:

这里假设消费者组订阅了 3 个 Topic,每个 Topic 都有 4 个分区。

图片

  1. Range 分配策略

按照分区轮询分配,若消费者组订阅了多个 topic,可能导致序号靠前的分区压力较大的问题。

图片 2. RoundRobin 分配策略

在 Range 策略的基础上进行改进,会根据消费者分配到的分区数量进行调度,均衡消费者的压力。如下图,这样分配就考虑到了消费者所被分配的分区数量的均衡。

图片 3. Sticky 分配策略(0.11.X 版本引入)

在 RoundRobin 的基础上改进,适用于消费者数量变化时,能尽量保证之前的消费者消费相同分区。若消费者从 3 个变成 4 个,则会触发重新分配,若使用 RoundRobin 策略,则分配后只有极少数分区仍然在原来的消费者上。

图片 而经过 Stickey 策略会尽量保证之前的关联关系。

图片

以下三种情况时候,消费者组将会触发重平衡(Rebalance)

  1. 消费者数量变化,即有消费者加入组或者有消费者离组

  2. 消费者订阅 topic 数量变化

  3. 消费者订阅 topic 的分区数量发生变化,即服务端扩分区的情况

# 副本(Replication)

图片 为提高 kafka 的高可用性,引入了副本(Replication)的概念,即一个分区有 N 个副本,副本分为 Leader 副本和 Follower 副本,Leader 副本负责对外提供服务,Follower 副本则只负责同步 Leader 副本的数据。

  • LEO:Log End Offset,下一个日志写入的位置的位移

  • AR:Assigned Replicas 指当前分区中的所有副本

  • ISR:In-Sync Replicas,同步副本队列,包括 Leader 和 Follower,replica.lag.time.max.ms(延迟时间)两个参数决定副本是否可以加入 ISR 副本队列

  • OSR:Out-Sync Replicas,非同步副本队列,与 leader 副本同步滞后过多的副本(不包括 leader 副本)组成 OSR

  • HW:High Water,高水位,一个分区的 HW 取所有 ISR 副本的最小 LEO 值,说白了 HW 就是同步副本队列中都同步到的位置+1,消费者只能消费 HW 之前的数据,这样可以保证数据不丢失。

图片

  • 当 Leader 副本宕机时,kafka 会选举 ISR 中其他的副本为 Leader,当 ISR 中没有任何一个副本可选时候,这时候就看参数unclean.leader.election.enable,若为 true 则可以将 OSR 中的副本选为 leader,但是会造成数据丢失的问题。若为 false 则不会选出 Leader 副本,但是会造成服务不可用的问题。

  • 副本是为了避免服务器宕机或节点不可用的情况,如果多个副本分布在同一个节点上,若服务器宕机则无法应对,所以副本的数量(通过replication.factor参数设置)不能超过节点的数量,这里的节点就是 Broker。

现状:作业帮的 kafka 集群配置的副本数量为 2,即除了 leader 副本外,只有一份备份数据。unclean.leader.election.enable=true,即 leader 副本宕机后 ISR 中为空,也会将 OSR 中的副本选举出来当 Leader 副本。

# Broker

图片 Broker 是 Kafka 的单节点,1 台机器上可以部署多个 Broker,但为了高可用以及稳定性,一般都是 1 台机器部署 1 个 broker。

主题(topic)下的多个分区可以分布在多台 broker 上,也可以分布在同一个 broker 上。

一个分区(partition)的副本必须分布在不同的 broker 上,当 leader 副本发生故障时,kafka 将会把副本同步队列(ISR)中的其他副本选为 leader 副本,继续对外服务。

上图中,共 3 个 Topic,分区下的副本数都是 3 个。

Topic1 有 3 个分区,Topic2 有 2 个分区,Topic3 有 1 个分区。

这三者逻辑关系图如下:

图片

# Zookeeper

ZooKeeper 作为一个分布式的协调服务框架,主要用来解决分布式集群中,应用系统需要面对的各种通用的一致性问题。ZooKeeper 本身可以部署为一个集群,集群的各个节点之间可以通过选举来产生一个 Leader,选举遵循半数以上的原则,一个 Leader,N 个 Follower,Leader 负责读写,Follower 负责读。

在 Kafka 中 ZK 负责管理集群元数据,其管理的数据如下:

图片 图片

# Controller

Controller 是所有 Broker 的管理节点,负责管理整个集群中所有分区和副本状态,可以对其他普通 Broker 节点发号施令执行命令,同时负责更新元数据并向 Zookeeper 同步。

  • 副本选举:当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。

  • 更新集群 Broker 数据:当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。

  • 元数据的管理:当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。

三、核心步骤原理

# Controller 选举

在集群启动时,每个 Broker 都会去 ZK 创建一个/controller 临时节点,zookeeper 会保证有且仅有一个 broker 能创建成功,这个 broker 就会成为集群的总控器 controller。

图片 注:Kafka2.8 版本后使用 Kraft 协议进行选举,不再使用 Zookeeper 进行元数据的管理

# 日志存储

日志存储是以 Topic+Partition 的维度进行存储的,一个 Partition 一个文件夹,文件夹下的结构主要包含索引文件(index)和日志文件(log),下图是 kube-txprod-ai-college-pdf-parse 的分区 1 的文件夹构成:

图片

  • log 文件

存储实际的消息数据,每个 .log 文件代表一个日志段(log segment),一个日志段默认 1G(log.segment.bytes 参数),文件名通常是该段的起始偏移量(offset)。

图片 baseOffset:该段日志的起始 offset

lastOffset:该段日志的终止 offset

count:该段日志的条数

baseSequence:起始序列号

lastSequence:终止序列号

producerId:生产者 ID

producerEpoch:生产者版本号

partitionLeaderEpoch:分区版本号

isTransactional:是否开启事务

isControl:未知

position:消息在日志文件中的位移(file offset)

CreateTime: 创建时间

size:日志大小

magic:表示本次发布 kafka 服务程序协议版本号

conpresscodec:压缩算法

crc:消息校验值

isvalid:记录是否有效

offset:消息的偏移量

CreateTime:记录创建的时间

keysize:表示 key 的长度

valuesize:表示 value 的长度

sequence:消息的序列号(用于幂等机制)

headerKeys:消息 headers 头的 key

key:消息 key

payload:表示具体的消息

  • index 文件

存储偏移量到物理文件位置的映射,这个文件是一个稀疏索引,允许 Kafka 快速查找特定偏移量的消息。通过这个索引,Kafka 可以避免从头开始扫描整个日志文件,从而提高查找效率。

图片

  • timeindex 文件

存储消息时间戳到物理文件位置的映射。这个文件允许 Kafka 根据时间戳快速查找消息。它是一个稀疏索引,类似于 .index 文件,但索引的是时间戳而不是偏移量。

  • leader-epoch-checkpoint 文件