Kafka 的基本工作原理
Kafka 的基本架构
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力,这离不开其架构设计
消息队列的本质就是生产者将消息发布,然后消费者消费其中的消息。
Kafka也一样,生产者将消息发布到主题,然后消费者订阅主题并消费其中的消息。
如下图
生产者将数据生产出来,交给 broker 进行存储,然后消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作,这个过程展开说明就是下面图片,体现了 Kafka 在分布式和高并发上的特点
其中有这么一个细节,就是 producer 产生了数据,就会自动的 push 到 broker 中,但是消费者 consumer 拉取就是主动的去使用 pull 拉取
多个 broker 协同合作,producer 和 consumer 部署在各个业务逻辑中被频繁的调用,三者通过 zookeeper 管理协调请求和转发,zookeeper作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。
Kafka的组件
上述的工作流程中的整个流程,会涉及到如下组件
代理节点 Broker
代理节点Broker 就代表单个 Kafka 服务器,是集群的 “物理节点”
- 每个 Broker 有唯一 ID(如 Broker 0、Broker 1),方便集群识别;
- Broker 不存储全局元数据(如 Topic 列表、分区分布),需依赖 ZooKeeper(旧版)或 KRaft(新版)管理;
- 客户端(生产者 / 消费者)只需连接任意一个 Broker,即可通过其获取集群全量信息,无需连接所有节点。
主题 Topic
消息的 “逻辑分类单元”,类似 “文件夹”
- Topic 是逻辑概念,不直接存储数据,数据实际存在其下的 Partition 中;
- Topic 需提前创建,创建时可指定分区数、副本数等核心参数,后续可动态调整分区数(但副本数调整需谨慎);
- 示例:电商系统中,“订单创建”“物流更新” 可分别作为两个 Topic,避免消息混杂。
分区 Partition
Topic 的 “物理分片”,是数据存储的最小单元
- 有序性:每个 Partition 内的消息按发送顺序生成唯一偏移量(Offset,类似 “行号”),仅保证单分区有序,不保证跨分区有序;
- 持久化:消息以日志文件(.log)形式存储在 Broker 本地磁盘,支持长期留存(可配置留存时间 / 大小);
- 副本机制:每个 Partition
的副本分为「Leader」和「Follower」:
- Leader:唯一处理读写请求的副本,客户端仅与 Leader 交互;
- Follower:同步 Leader 的日志,Leader 故障时,由 ZooKeeper/KRaft 从 Follower 中选举新 Leader,保证高可用;
- 通过多分区实现 “并行处理”(多个消费者可同时消费不同分区),通过多副本实现 “高可靠”(避免单节点故障丢失数据)。
副本 Replication
分区的备份,每一个分区都有多个副本,副本的作用是做备胎。每个分区可以有多个 副本(Replica),副本之间保存相同的消息数据(通过复制机制同步)。
- 当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。Follower和 Leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。通过 Leader 选举机制,在 Broker 故障时自动切换副本角色,保证分区服务不中断。
- 在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量。
- 消费者拉取消息时,只能从分区的 Leader 副本读取数据(Follower 仅用于同步,不提供读服务)。消费组的分区分配是基于 “Leader 副本所在的 Broker” 进行的,确保消费者与 Leader 之间的网络通信效率。
生产者 Producer
向 Kafka 发送消息的 “数据源头”
- 分区选择:发送消息时,可通过「分区键(Key)」指定分区(相同 Key 的消息会进入同一分区,保证顺序),无 Key 则按轮询 / 哈希自动分配;
- 性能优化:支持「批量发送」(累积一定数量 / 时间再发送)和「消息压缩」(如 Gzip、Snappy),减少网络 IO 和磁盘占用;
- 可靠性保障:可配置「acks 机制」(0 = 不等待确认,1 = 等待 Leader 确认,-1 = 等待所有副本确认),确保消息不丢失。
消费者 Consumer
从 Kafka 读取消息的 “数据终端”
- 消费模式:采用「拉取模式(Pull)」,即消费者主动向 Broker 请求消息(避免 Broker 推送过载);
- 偏移量(Offset)管理:消费者会记录已消费到的
Offset(默认保存在 Kafka 内置的
__consumer_offsets主题中),重启后可从上次中断的位置继续消费,避免重复消费或漏消费; - 注意:单个消费者可消费多个分区,但一个分区只能被同一消费者组内的一个消费者消费(避免重复消费)。
消费者组 Consumer Group
一组消费者的集合,实现 “负载均衡” 和 “消息广播”
- 核心规则:
- 若消费者组内消费者数量 ≤ 分区数:每个消费者分配 1~N 个分区(负载均衡);
- 若消费者组内消费者数量 > 分区数:多余消费者处于 “空闲状态”(无分区可消费);
- 典型场景:
- 负载均衡:同一服务的多个实例组成消费者组,共同消费一个 Topic,提高消费速度;
- 消息广播:不同服务(如 “订单服务”“统计服务”)分别创建消费者组,订阅同一 Topic,实现消息被多服务同时消费。
协调工具,旧版核心 ZooKeeper
管理 Kafka 集群元数据和分布式协调
- 旧版 Kafka(2.8 之前)依赖 ZooKeeper 实现:
- 存储元数据:Topic 列表、分区分布、Leader/Follower 角色、消费者组 Offset 等;
- 分布式协调:Broker 加入 / 退出集群的通知、Leader 选举触发等;
- 局限性:ZooKeeper 集群部署复杂、性能瓶颈明显,因此新版 Kafka(2.8+)推出 KRaft 模式,替代 ZooKeeper 实现元数据管理,降低部署成本。
KRaft(Kafka Raft,新版协调机制)这个后面再说
消息队列的通信模式和机制
Kafka 的通信流程由 4 个核心角色协同完成,每个角色职责明确,共同构成消息从产生到消费的全链路。
| 角色(Component) | 核心职责 | 通信中的作用 |
|---|---|---|
| Producer(生产者) | 发送消息到 Kafka 集群 | 消息的 “源头”,主动向 Broker 推送数据 |
| Broker(服务节点) | 存储消息、接收生产者请求、响应消费者请求 | 消息的 “中转站 / 仓库”,是通信的核心载体 |
| Consumer(消费者) | 从 Broker 拉取消息并处理 | 消息的 “终点”,主动从 Broker 获取数据 |
| ZooKeeper(协调者) | 管理 Broker 集群元数据、消费组偏移量(旧版) | 通信的 “指挥中心”,保证集群一致性 |
点对点模式
点对点模式通常是基于拉取和轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。
生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。
点对点模型的的优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。
基于 Topic 的 发布订阅 机制
机制工作流程
Kafka 没有采用简单的 “点对点” 通信,而是通过 Topic(主题) 实现 “发布 - 订阅” 模式,支持一对多、多对多的灵活通信。
发布订阅模式是一个基于消息送的消息传送模型,该模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者,由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息。
但是consumer1、consumer2、consumer3由于机器性能不一样,所以处理消息的能力也会不一样,但消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题!假设三个消费者处理速度分别是8M/s、5M/s、2M/s,如果队列推送的速度为5M/s,则consumer3无法承受!如果队列推送的速度为2M/s,则consumer1、consumer2会出现资源的极大浪费!这就涉及到后面的 Kafka 负载均衡。
那么,什么是基于 Topic 的 发布订阅 机制
首先,生产者发送消息时,必须指定消息所属的 Topic(如 “用户行为日志”“订单支付通知”),相当于给消息打了分类标签。
然后,消费者要获取消息,必须订阅对应的 Topic,只能消费自己订阅的 Topic 中的消息。一个 Topic 可以被多个消费组订阅,实现 “一条消息被多个系统消费”(如订单消息同时被 “物流系统”“财务系统” 消费)。
那么完整的通信流程以经典的 “用户下单” 场景为例,可拆解为 5 个步骤:
生产者发送消息:订单系统(生产者)生成 “订单创建” 消息,指定发送到 Topic “order_topic”,并设置消息的 Key(如用户 ID,用于将同一用户的订单路由到同一 Partition)。
Broker 接收与存储:Kafka 集群中的 Broker 接收消息,根据 Key 计算 Partition 编号,将消息追加到对应 Partition 的日志文件中,并返回 “写入成功” 响应给生产者。
消费者订阅与拉取:物流系统的消费者组(consumer_group_logistics)订阅 “order_topic”,组内的 2 个消费者分别向 Broker 拉取自己负责的 Partition 的消息(消费者主动拉取,而非 Broker 推送,避免消费能力不足导致的积压)。
消费者处理消息:消费者接收到消息后,调用物流接口创建物流单,处理完成后,向 Broker 提交自己的消费偏移量(如 “已消费到 Partition 1 的偏移量 100”)。
集群协调:ZooKeeper(或 Kafka 自身的 Controller,新版)实时监控 Broker 状态(如是否宕机)和消费组偏移量,若某个 Broker 宕机,会自动将其 Partition 分配给其他 Broker;若某个消费者宕机,会将其负责的 Partition 重新分配给组内其他消费者,保证通信不中断。
该模式的高可用和高并发特性
为了支撑高吞吐,Kafka 会将一个 Topic 拆分为多个 Partition(分区),每个 Partition 是独立的消息队列,通信时可并行处理
- 生产者可将消息均匀发送到不同 Partition,实现 “并行写入”,提升发送速度。
- 消费者组内的多个消费者,可分别消费不同 Partition 的消息(一个 Partition 只能被组内一个消费者消费),实现 “并行读取”,避免单消费者瓶颈。
- 分区是 Kafka 并行处理的最小单位:生产者可并行向多个分区写入消息,消费者可并行从多个分区读取消息,直接提升整体吞吐量。
Kafka的负载均衡就是在这里实现的,Kafka 将 Topic 的消息分散到多个分区,避免单分区的读写瓶颈(例如:一个 Topic 有 10 个分区,可支持 10 倍于单分区的并发写入)。
例如,一个 Topic 有 3 个 Partition,消费组有 3 个消费者,则每个消费者对应 1 个 Partition,并行拉取消息。
但是生产者发送消息时,需确定消息写入哪个分区,规则由
partitioner.class 配置(默认是
DefaultPartitioner):
- 若指定了 Partition 编号:直接写入该分区。
- 若未指定但指定了 Key:通过 Key 的哈希值对分区数取模,确保同一 Key 的消息进入同一分区(适合按用户 / 订单等维度聚合消息)。
- 若既无 Partition 也无 Key:采用轮询(Round-Robin)策略,均匀分配到各分区。
这里还会涉及到副本的概念,Kafka 的分区(Partition)和副本(Replica)是支撑其高吞吐、高可用的核心设计,两者通常协同工作
- 副本本质上是分区的备份,每个分区可以有多个 副本(Replica),副本之间保存相同的消息数据(通过复制机制同步)。一个分区的多个副本会被分配到不同的 Broker 上(由 Kafka Controller 负责分配),避免单个 Broker 宕机导致所有副本失效。示例:若集群有 3 个 Broker,一个分区配置 3 个副本(1 个 Leader + 2 个 Follower),则 3 个副本会分别存放在 3 个不同的 Broker 上。
- 副本分为两种角色:
- Leader 副本:负责处理该分区的所有读写请求(生产者写入、消费者读取都只与 Leader 交互)。消费者拉取消息时,只能从分区的 Leader 副本读取数据(Follower 仅用于同步,不提供读服务)。
- Follower 副本:仅负责从 Leader 同步消息数据,作为 “备胎”;若 Leader 宕机,某个 Follower 会被选举为新 Leader。
- 副本是高可用的核心机制,它能够避免因为分区的故障(单点故障)导致的数据丢失(例如:一个分区有 3 个副本,即使 1 个 Broker 宕机,另外 2 个副本仍能提供服务),他的分配选举和大部分分布式一样,通过 Leader 选举机制,在 Broker 故障时自动切换副本角色,保证分区服务不中断。
总结一下就是
- 分区是 Kafka 并行处理的核心,通过 “分而治之” 提升吞吐,同时保障单分区内的消息顺序。
- 副本是 Kafka 高可用的核心,通过 “冗余备份 + 自动故障转移” 避免数据丢失和服务中断。
而且 Kafka 允许根据业务需求,在 “可靠性” 和 “性能” 之间做权衡,提供 3 种通信语义
At Most Once(最多一次):消息可能丢失,不会重复。
机制:消费者拉取消息后,先提交 “消费偏移量”(记录已消费到的位置),再处理消息;若处理过程崩溃,未处理的消息会被跳过。
At Least Once(至少一次):消息不会丢失,可能重复。
机制:消费者先处理消息,处理完成后再提交偏移量;若提交前崩溃,重启后会重新拉取已处理过的消息。
Exactly Once(精确一次):消息不丢失、不重复,仅消费一次。
机制:通过 “事务” 实现(Kafka 0.11+ 支持),生产者发送消息和消费者提交偏移量原子化,要么都成功,要么都失败。
而 Kafka 通过消费组实现 “负载均衡” 与 “广播”
负载均衡(默认):同一消费组内的消费者,共同分摊 Topic 的 Partition 消费任务。
例:Topic 有 5 个 Partition,消费组有 2 个消费者,则一个消费者消费 3 个 Partition,另一个消费 2 个,避免单消费者压力过大。
广播(自定义):让每个消费者属于不同的消费组,每个消费组订阅同一个 Topic。
例:3 个消费组(物流组、财务组、风控组)都订阅 “订单 Topic”,则每个组都会收到全量订单消息,实现 “一条消息多系统共享”。
Kafka的持久化——基于日志的持久化通信
Kafka 如何设计他那个持久化
和其他消息队列一样,Kafka 也具有持久化的能力,那么
Kafka的消息是如何持久化的?它默认的存储机制是什么?
首先,Kafka 的消息不是 “用完即删”,而是以 日志文件(Log) 形式持久化存储在 Broker 的磁盘上,这种机制是基于日志的持久化通信
Kafka 的消息持久化主要依赖于它的一个核心组件:日志文件(logfiles)。Kafka 会将消息分成若干个段(segment),并将这些段保存在磁盘上。每条消息会被追加到当前的日志文件的末尾,Kafka默认通过顺序写入的方式来存储数据,这样的方式使得磁盘I/O效率非常高。另外,Kafka使用了零拷贝(zero-copy)技术来进一步提升效率。
Apache Kafka作为一款分布式流处理平台,其核心功能之一就是高效、可靠地持久化海量消息。
Kafka 是主打持久化的,Kafka的设计哲学是”将消息持久化作为首要特性”,这与许多传统消息系统形成鲜明对比,后者通常将消息持久化作为可选项或次要特性。
这个思路一定上有点类似 Redis 的 AOF
Kafka默认存储机制
Kafka默认采用分区日志(Partitioned Log)的存储机制,这是一种简单却非常有效的设计:
上面说了,Kafka 的日志并非单一文件,而是以 “分区为单位、分段存储” 的多层结构,这种设计既保证了顺序读写的高效性,又解决了大文件管理的性能问题。
考虑分区,日志也是可以分区的,而且消息存储的基本单位就是分区的日志,每个分区(Partition)对应一个独立的日志目录(存储路径由
log.dirs 配置,默认
/tmp/kafka-logs),目录名格式为
{topic}-{partition}(如 order_topic-0 表示
order_topic 的第 0 个分区)。
而分区日志是消息的 “逻辑容器”,所有发送到该分区的消息会按时间顺序追加到日志中,且每个消息在日志中对应唯一的偏移量(Offset)—— 本质是消息在日志中的 “字节位置索引”,用于标记消息的顺序和消费位置。
而且为避免单个日志文件过大(如几十 GB)导致的读写性能下降,Kafka 会将分区日志拆分为多个日志分段(Log Segment),每个分段是一组物理文件
- 数据文件(.log):存储实际的消息数据(二进制格式),文件名以该分段中第一条消息的偏移量命名(如
00000000000000000000.log表示从偏移量 0 开始的分段)。 - 索引文件(.index):消息偏移量到物理文件位置的映射索引(加速消息查找),格式为
{baseOffset}.index。 - 时间索引文件(.timeindex):消息时间戳到偏移量的映射索引(支持按时间范围查询消息),格式为
{baseOffset}.timeindex。 - 快照文件(.snapshot):用于记录分区的某些元数据(如事务相关信息)。
所以 Kafka 的存储目录结构通常如下
1 | /topic-name |
日志如何进行创建的,到什么时候,日志持久化文件会创建并且滚动?
当一个日志分段满足以下任一条件时,Kafka 会自动创建新的分段:
- 数据文件大小达到阈值(
log.segment.bytes,默认 1GB)。 - 数据文件创建时间超过阈值(
log.roll.hours,默认 7 天,避免长期不写的分段一直占用空间)。
这种 “分段滚动” 机制让每个分段文件大小可控,既保证了磁盘 IO 效率(小文件随机读写更快),又方便后续的数据清理(直接删除旧分段)。
消息写入和读取的流程
Kafka消息持久化的完整流程如下:
消息从生产者发送到最终写入磁盘,经历了 “内存缓冲→磁盘持久化” 的过程
- 生产者发送:生产者将消息批量发送给Broker
- 内存缓冲:Broker将消息追加到对应Partition的Page Cache中
- 确认响应:根据配置的ACK级别,Broker可能立即响应或等待刷盘后响应
- 磁盘持久化:操作系统异步或Kafka主动将数据从Page Cache刷入磁盘
欸我擦,Redis 的 AOF
呃呃,我展开说一下详细的步骤,那么消息写入的详细过程实际上是这样的
生产者发送消息:生产者将消息发送到分区的 Leader 副本,消息包含 Key、Value、时间戳、Headers 等元数据。
Leader 接收并写入页缓存:Leader 副本接收到消息后,先将消息追加到页缓存(Page Cache)—— 操作系统为磁盘文件分配的内存缓存,而非 Kafka 进程内的缓存。
注:页缓存由操作系统管理,相比进程内缓存,可减少 JVM GC 压力,且能利用操作系统的预读 / 写优化
异步刷盘(默认策略):Kafka 不会立即将页缓存中的消息写入磁盘,而是由操作系统根据 “脏页比例”“时间间隔” 等策略异步刷盘(
log.flush.interval.messages和log.flush.interval.ms)可配置强制刷盘阈值,但默认禁用,依赖 OS 自身机制)。这种 “先写内存、再异步刷盘” 的方式,避免了频繁磁盘 IO 对性能的影响,同时通过页缓存保证了消息在进程崩溃时不丢失(只要未断电,页缓存数据仍在)。
副本同步:Leader 写入消息后,Follower 副本会主动拉取消息并重复上述写入流程,最终所有 ISR 列表中的副本都存储相同的消息(通过
acks参数控制同步确认级别)。
实际上还会有一些可靠性保障的机制体现
- 持久化确认:通过
acks参数控制消息写入的可靠性,这个参数我忘了)貌似默认下,要求所有副本写入 - 而且Broker 重启时,会扫描所有日志分段文件,通过索引文件恢复消息的偏移量映射,保证消息顺序和消费位置的正确性。
- 消息写入后不会被修改(日志是 “追加” 的),且保留时间可配置,支持消费者 “从头消费”“从指定偏移量消费” 等回溯场景,如数据重放、故障恢复。
那么写入是这样的,读取反过来想就可以了
消费者从日志中读取消息时,Kafka 借助索引文件实现快速定位,这样避免了全文件扫描,保证了读取性能。
消费者指定偏移量
消费者通过
offset告知 Broker 要读取的消息位置(如 “从偏移量 1000 开始读取”)。定位日志分段
Broker 根据偏移量,通过二分查找确定该偏移量所在的日志分段(例如:偏移量 1500 落在
00000000000000001000.log分段中,因为该分段的起始偏移量是 1000,下一个分段起始偏移量是 2000)。通过索引定位物理位置
在分段的
.index文件中,查找小于等于目标偏移量的最大索引项,获取该消息在.log文件中的物理偏移量(字节位置)。读取消息
从
.log文件的物理偏移量处开始读取消息(支持批量读取,通过fetch.min.bytes和fetch.max.bytes控制批量大小),返回给消费者。
那么读取是如何保证高效率的呢?这涉及到了顺序读的关键设计
消费者通常按偏移量递增顺序读取消息(如 “从上次消费的位置继续读”),而日志分段是顺序追加的,因此磁盘 IO 以顺序读为主(顺序读性能接近内存)。
而且上面的索引加速,对于 “从指定偏移量 / 时间戳读取” 的随机读场景,可将查找时间从 O (n) 降至 O (log n)。
存储优化技术
Kafka通过多种技术优化存储性能
顺序写入:Kafka只进行追加(append-only)写入,避免了磁盘随机I/O,大幅提升吞吐量。
零拷贝:消费者读取时,数据直接从Page Cache通过DMA传输到网卡,减少CPU拷贝开销
而且上述的过程,都是支持批量的
日志的清理与 retention
注意,Kafka 的消息默认保留七天,单个Segment文件大小默认是 1GB
Kafka 日志会无限追加,若不清理会耗尽磁盘空间。通过配置 retention 策略,可自动删除过期数据或压缩重复数据。
时间基清理(Time-Based Retention)
当消息存储时间超过
log.retention.hours(默认 168 小时,即 7 天),或分段文件最后修改时间超过阈值时,整个分段会被删除。大小基清理(Size-Based Retention)
当分区日志总大小超过
log.retention.bytes(默认 1GB),会按 “从旧到新” 的顺序删除分段,直到总大小低于阈值。日志压缩(Log Compaction)
对于 “键值对” 消息(如用户配置、状态更新),保留相同 Key 的最新版本消息,删除旧版本(通过
log.cleanup.policy=compact启用)。
而且,Kafka 后台有专门的 Log Cleaner
线程池(log.cleaner.threads,默认 1
个),定期扫描符合清理条件的分区日志,执行删除或压缩操作,他不会影响正常的读写性能。
Kafka 的工作流程分析
重看偏移量
如何理解偏移量
在上面说了,Kafka 中消息都是面向 topic 的。生产者生产消息,消费者消费消息
在 Kafka 中,一个 topic 可以分为多个 partition,一个 partition 分为多个 segment,每个 segment 对应两个文件:.index 和 .log 文件
topic 是逻辑上的概念,而 patition 是物理上的概念,每个 patition 对应一个 log 文件,而 log 文件中存储的就是 producer生产的数据,patition 生产的数据会被不断的添加到 log 文件的末端,且每条数据都有自己的 offset。
消费组中的每个消费者,都是实时记录自己消费到哪个 offset,以便出错恢复,从上次的位置继续消费。
那么,这个偏移量,我们究竟要如何理解?是文件中的索引吗?
实际上,偏移量这个词有一定的迷惑性,容易让人“忘本”,偏移量 实际上是每条消息的 序号。它为每条消息提供了一个唯一的标识。通过偏移量,消费者可以准确地找到并读取特定的消息。
偏移量在 Topic 中是唯一的吗?
偏移量 仅在每个分区内是唯一的。偏移量是分区的本地概念,Topic-A 分区0的 offset=5 和 分区1的 offset=5 是完全不同的消息。
一个 Topic
可能有多个分区,每个分区的消息都有一个递增的偏移量。因此,Kafka
不需要对所有分区的消息偏移量进行全局同步。每个分区独立管理自己的偏移量,这不仅减少了复杂性,还降低了性能开销。Kafka
使用 64 位长整型(long)作为偏移量,其最大值为
2^63 - 1。
消息一旦被写入分区,它的偏移量就固定不变了。这样,消费者按照偏移量的顺序来读取消息。
而且偏移量是由 服务端 决定的。因为客户端无法了解其他客户端的偏移量情况,如果由客户端决定,那么则需要额外的同步成本来实现全局偏移量管理。所以,消息的偏移量是由 Kafka 服务端来维护的。
如果没有偏移量,会发生什么?
消费者从 Kafka 拉取消息后,Kafka 服务器就不知道这个消费者读到哪里了。如果这个消费者程序崩溃,重启后它要么从头开始消费(造成重复处理),要么不知道从哪开始,从而丢失消息。偏移量就是为了解决“消费者当前读到哪儿了”这个问题。
Kafka 是如何根据消息偏移量定位到文件中的位置
既然,我们都知道 Kafka 的消息存储在文件中,那么 Kafka 是如何根据消息偏移量定位到文件中的位置?
等等等等,Kafka 是如何知道要查找哪个文件?
别忘了上面说的,每个日志段文件的文件名就是该段的 起始偏移量。通过文件名,你可以知道消息存储在哪个文件中。接着,Kafka 通过该文件的索引文件来找到消息的具体位置,只不过这个文件名需要解析一下。
继续,这就要说一下,偏移量的一个工作机制了,索引文件
上面我们说了,Kafka 的每个分区(Partition)会被拆分为多个日志分段(Log Segment),每个分段对应一组文件,其中,就包含了索引文件。
每个索引条目大小为 8 字节,其中 4 个表示偏移量,4 个表示段中的字节位置。
Kafka 为每个分区的每个段维护一个索引文件,该文件记录了每个消息的偏移量及其对应的存储位置,通常是消息的起始位置,因为每条消息的结束位置可以通过消息头来确定。通过索引文件,Kafka 可以非常快速地定位到某个偏移量对应的消息存储位置。
定位的逻辑就是通过偏移量的大小,判断它属于哪个分段。
判断完属于哪个分段了,那么如何在索引文件中查找偏移量对应的物理位置,注意到每个分段的.index文件是稀疏索引,所以,Kafka
并非为每条消息都建立索引,而是每隔一定字节(由log.index.interval.bytes配置,默认
4KB)建立一条索引项,以此平衡存储空间和查询效率。
查询的逻辑就是在索引文件中通过二分查找,找到 “小于或等于目标 Offset 的最大索引项”。
拿到了拿到物理位置后,Kafka
会在.log数据文件中从该位置开始顺序扫描,直到找到目标
Offset 对应的消息。
这是因为稀疏索引的特性:索引项记录的是 “最近的前一个偏移量” 的位置,因此需要从该位置开始向后扫描,直到找到精确的 Offset。
下图展示了Kafka 基于偏移量查找数据的过程:
图中的Segment-0对应起始 Offset
为0的分段,包含0.index(索引文件)和0.log(数据消息文件);Segment-1对应起始
Offset
为6的分段,包含6.index和6.log。
若要查找Offset=3,由于0 ≤ 3 < 6,因此属于Segment-0(起始
Offset 为0的分段)。
由图中的索引格式,可以指定匹配到索引项(3, 756),即该消息在0.log文件中的物理位置是756字节处。
然后拿到物理位置756后,Kafka
会在0.log文件中从该位置开始顺序扫描,直到找到目标
Offset 对应的消息。
这种设计也让我联想到 Socket 通道 中的数据处理。Socket 通道也是连续的字节流,服务端根据 消息头 来解析报文的起始和结束位置,从而确定数据的边界。服务端可以连续地从字节流中提取完整的报文进行处理。
相同之处:
- 都是通过消息头来解析数据边界。
不同之处:
- Socket 通道中的数据是即时消费的,而 Kafka 的数据存储在磁盘中,等待被消费。
- Kafka 消费者并不是从头开始消费,而是从特定的偏移量位置开始消费,这需要额外的定位操作,即通过索引文件来定位消息的起始位置
试着理解一下,Kafka 如何高效读取多条消息?
假设我们要读取一批消息,比如 100 条。显然,Kafka 不会每次都查找索引文件并定位单独的消息偏移量。实际情况是,Kafka 会根据 起始消息的位置 和 最后一条消息的位置,一次性将这一段消息批量读取出来。通过这种方式,Kafka 避免了多次查询索引文件,从而显著提高了读取效率。
这种批量读取的实现,很类似上面的实现
消费者是如何与偏移量交互的
那么消费者是如何与偏移量交互的?引出了偏移量机制的核心问题, “消费者如何提交(Commit)偏移量”?
首先,偏移量由消费者管理,Kafka Broker 只负责存储消息和偏移量,但提交和跟踪消费进度的责任在消费者端
当前偏移量:消费者实例自己维护的一个值,表示它下一次将要拉取的消息的偏移量。例如,消费者刚读完 offset=5,那么它的当前偏移量就是 6。
已提交偏移量:
消费者将自己当前的消费进度(即当前偏移量)保存到 Kafka
的一个特殊主题(__consumer_offsets)中。这个被保存的值就是“已提交偏移量”。它标志着“这个消费者群组已经成功处理了该偏移量之前的所有消息”。
算法竞赛中有一种算法叫做 ST 表,你可以参考这个理解 Kafka 如何维护这个偏移量
偏移量通常是自动提交的,消费者客户端会定期自动将当前偏移量提交到 Kafka。
但是,这样就可能导致出现“至少一次”处理的问题
假设自动提交间隔是5秒。在第3秒时,消费者拉取了一批消息(offset 10-19)并开始处理。但在处理到 offset=15 时,消费者崩溃了。由于时间未到5秒,偏移量(还停留在上一次提交的 offset=10)没有被提交。当消费者重启后,它会从已提交的 offset=10 开始重新拉取消息,导致 offset=10 到 15 的消息被处理了两次。
所以,开发者通过调用 consumer.commitSync()(同步)或
consumer.commitAsync()(异步)来主动控制提交时机。
这样,工作流程就变成了这样
- 拉取一批消息。
- 处理这一批消息(例如,写入数据库、进行计算)。
- 在处理成功后,手动提交偏移量。
- 继续拉取下一批。
这种方式确保了:只有消息被成功处理后,偏移量才会被更新。 即使消费者在处理过程中崩溃,重启后它也会从最后一个已提交的偏移量开始,重新处理那些尚未提交的消息,从而保证消息不会丢失
这种也不是高枕无忧的,很明显,会导致重复处理的问题,一般使用需要结合严格的幂等性。
别急,当一个全新的消费者群组启动,或者它要读取的偏移量在 Kafka 中不存在时(比如因为数据过期被删除了),应该怎么办?
这时就需要祭出“偏移量重置策略”。
auto.offset.reset = earliest:从分区最早的消息开始消费(从0开始)。auto.offset.reset = latest(默认):从分区最新的消息开始消费(只消费启动后新来的消息)。auto.offset.reset = none:如果找不到偏移量,就抛出异常。
这个策略在新消费者群组第一次启动时尤其重要。
生产者(Producer)发送消息
找到这样一张好图,可以作为 Kafka 发送数据流程的总结
最开始,生产者要构建消息,将待发送的数据打包为消息对象,通常包含消息的 key 和 value,key 用于选择分区,而 value 是实际数据。
一般会对 Key 和 Value 进行序列化(如 JSON、Avro 序列化),便于网络传输和 Broker 存储。
然后,生产者根据预设的分区策略,确定当前消息要发送到 Topic 的哪个 Partition。分区选择可以是显式指定,也可以通过 Kafka 的分区器(分区选择策略)根据 key 值选择目标分区。
然后会在这里配置配置acks参数,选择不同的可靠性策略,供日志持久化那边决定咋整持久化的事情
acks=0:生产者发送消息后不等待 Broker 确认,直接返回成功,速度最快但可能丢消息。acks=1:仅等待消息写入 Partition 的 “首领副本(Leader)” 后,Broker 返回确认,兼顾速度和基本可靠性。acks=-1/all:等待消息写入 Leader 及所有 “跟随副本(Follower)” 后,Broker 返回确认,可靠性最高但延迟略高。
Broker 集群存储与同步消息
准备工作完成之后,producer 从 zookeeper
(老版本依赖zookeeper的原因)的 “/brokers/…/state”节点找到该 partition
的 leader broker,将消息发送给该 leader,若发送失败(如网络波动、Leader
宕机),会根据配置的重试次数(retries)自动重试,避免消息丢失。
在上述中,每个 Partition 会选举一个 Leader 副本,其余为 Follower 副本。Follower 副本,仅负责从 Leader 副本同步消息日志,当 Leader 宕机时,从 Follower 中重新选举新 Leader,实现高可用。这上面都说过了。
leader broker
代理节点接收到消息之后,就会开始进行持久化的操作,将消息按顺序追加到
Partition
的日志文件(.log)末尾,同时生成索引文件(.index、.timeindex),其中,log文件就实际是存储message的地方,消息靠
offset 唯一确定在parition内的位置。
ISR 机制会在这里体现,Follower 副本定期向 Leader 发送同步请求,拉取新写入的消息并追加到本地日志,然后 followers 节点会从 leader 节点 pull 消息(按照acks策略来),写入本地 log 后向 leader 发送 ACK,其中,ACK 应答机制保证消息不丢失
Broker 根据配置的 “日志保留策略”(如按时间保留 7 天、按文件大小保留),定期清理过期或超量的日志片段,避免磁盘占满。而且 kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能
有趣的小事情,就是如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。
消费者读取消息
消息存储在log文件后,消费者就可以进行消费了。
在讲消息队列通信的两种模式的时候讲到过点对点模式和发布订阅模式。Kafka采用的是发布订阅模式,消费者通过 “拉取模式” 主动从 Broker 获取消息,且同一消费者组内的消费逻辑确保消息不重复、不遗漏。
消费者在启动的时候,会指定所属的消费者组,并订阅一个或多个 Topic;Kafka 会为订阅的 Topic 分配对应的 Partition。
那么 Partition 如何分配的?消费者组内的 Consumer 会通过预设策略(如 Range、RoundRobin、Sticky)分配要消费的 Partition:
- Range:按 Partition 序号平均分配,若 Partition 数不整除 Consumer 数,前几个 Consumer 会多分配 1 个 Partition。
- RoundRobin:将 Partition 和 Consumer 按顺序轮询匹配,实现更均匀的分配。
- 同一 Consumer 组内,一个 Partition 仅分配给一个 Consumer;一个 Consumer 可分配多个 Partition。
然后,消费者通过 “拉取(Pull)” 模式向 Broker
请求消息,可通过配置fetch.min.bytes(最小拉取字节数)、fetch.max.wait.ms(最长等待时间)平衡吞吐量和延迟。
消费者每次拉取消息时,会指定当前要读取的 Offset(初始为最早 Offset 或最新 Offset),Broker 根据 Offset 返回对应的消息。
消费者处理完消息后,会提交 Offset(手动提交或自动提交),告知 Kafka “已消费到该位置”;若消费者宕机,重启后可从上次提交的 Offset 继续消费,避免重复或丢失。
消费者处理拉取到的消息(如写入数据库、触发业务逻辑),处理成功后提交 Offset。
若消费者组内有 Consumer 加入或退出(如扩容、宕机),Kafka 会触发 “重平衡(Rebalance)”,重新分配 Partition 给剩余 Consumer,确保消费不中断。
最后,让我们把偏移量机制放到完整的 Kafka 工作流程中看:
- 生产者 将消息发送到指定主题的某个分区。
- Broker 将消息追加到分区日志的末尾,并赋予一个递增的偏移量。
- 消费者 启动,并订阅一个主题。
- 消费者 向 Broker
查询它上次已提交的偏移量。如果是新群组,则根据
auto.offset.reset策略决定起始位置。 - 消费者 开始从该偏移量顺序拉取消息。
- 消费者 处理拉取到的消息(业务逻辑)。
- 消费者
在处理成功后(手动或自动)提交偏移量,将处理进度持久化到
__consumer_offsets主题。 - 重复步骤 5-7。







