news 2026/6/8 12:07:52

【Kafka源码解读和使用指南】第14篇:Kafka分区器源码解析——消息去哪个分区,有学问!

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【Kafka源码解读和使用指南】第14篇:Kafka分区器源码解析——消息去哪个分区,有学问!

上一篇【第13篇】Kafka序列化器深度解析——自定义Serializer不再是难题
下一篇【第15篇】Kafka集群元数据源码解析——生产者如何"认识"整个集群


摘要

消息经过序列化变成byte[]之后,下一步就是决定发往哪个分区。这一决定看似简单,实则影响深远——分对了负载均衡吞吐翻倍,分错了热点分区全线崩溃。Kafka的默认分区策略用Hash+RoundRobin双剑合璧,2.4版本推出的Sticky Partitioner更是在延迟和批量之间找到了精细平衡。本文将深入源码剖析分区器的工作原理,从DefaultPartitioner到StickyPartitioner,再到手把手教你实现一个按业务Key路由的自定义分区器。读完这篇,分区不再看运气。


一、分区器在KafkaProducer中的位置

先回顾分区器在整个发送链路中的位置——它在消息序列化之后、进入RecordAccumulator之前:

KafkaProducer.send() 调用链: Interceptors.onSend() // ① 拦截器处理 │ ▼ waitOnMetadata() // ② 等待集群元数据就绪 │ ▼ Serializer.serialize() // ③ 序列化Key和Value │ ▼ Partitioner.partition() // ④ 选择目标分区 ← 本文主角 │ ▼ RecordAccumulator.append() // ⑤ 放入缓冲区

从调用链可以看出,分区器需要依赖两个输入:

  1. 已序列化的Key(byte[]):用于计算Hash值
  2. 集群元数据(Cluster对象):需要知道Topic有多少个分区

二、Partitioner接口——只需要实现partition()方法

publicinterfacePartitionerextendsConfigurable,Closeable{/** * 选择目标分区 * @param topic Topic名称 * @param key 消息Key(Java对象,未序列化) * @param keyBytes 已序列化的Key(byte数组) * @param value 消息Value(未序列化) * @param valueBytes 已序列化的Value * @param cluster 集群元数据快照 * @return 分区编号 */intpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster);voidclose();}

注意区分两个概念:

  • key(Object):原始Key对象,还没经过序列化
  • keyBytes(byte[]):已经序列化好的Key,可直接用于Hash计算

KafkaProducer调用时,如果ProducerRecord指定了partition字段(即record.partition() != null),就直接用指定的分区,不会调用Partitioner。只有没指定分区时才会走Partitioner.partition()。


三、DefaultPartitioner源码解析——经典的双模式策略

3.1 核心源码

publicclassDefaultPartitionerimplementsPartitioner{// Counter初始化为随机数,避免重启后所有消息都去同一个分区privatefinalAtomicIntegercounter=newAtomicInteger(newRandom().nextInt());// 并发安全的StickyPartition缓存privatefinalConcurrentMap<String,Integer>stickyPartitionCache=newConcurrentHashMap<>();publicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){// 获取Topic的分区信息List<PartitionInfo>partitions=cluster.partitionsForTopic(topic);intnumPartitions=partitions.size();if(keyBytes==null){// 情况一:消息没有Key —— Sticky分区策略(2.4+)returnstickyPartitionCache.computeIfAbsent(topic,t->{// 先找可用分区(有Leader的分区)List<PartitionInfo>availablePartitions=cluster.availablePartitionsForTopic(t);if(availablePartitions.isEmpty()){// 没有可用分区,退化为RoundRobinintnextValue=counter.getAndIncrement();returnDefaultPartitioner.toPositive(nextValue)%numPartitions;}else{// 选择一个可用分区并"粘住"intpart=DefaultPartitioner.toPositive(counter.getAndIncrement())%availablePartitions.size();returnavailablePartitions.get(part).partition();}});}else{// 情况二:消息有Key —— Hash取模// murmur2是一种高效的、低碰撞率的哈希算法returnDefaultPartitioner.toPositive(Utils.murmur2(keyBytes))%numPartitions;}}// 将负数转为正数(取绝对值的等价操作)staticinttoPositive(intnumber){returnnumber&0x7fffffff;}}

3.2 两种策略图解

【DefaultPartitioner 分区策略】 消息有Key ──► murmur2(Key) % 分区数 ──► 固定分区 (相同Key → 相同分区 → 顺序保证) 消息无Key ──► Sticky策略 ──► 同一个"批次"粘在同一个可用分区 (2.4+) 批次满后切换到新分区 ──► RoundRobin ──► counter++ % 分区数(逐条轮询) (2.3及之前) 无批量优化,可能产生大量小批次

3.3 为什么counter要用AtomicInteger

KafkaProducer是线程安全的,多个业务线程可能同时调用send()。DefaultPartitioner必须也是线程安全的。这就是为什么用AtomicInteger而不是普通的int——两个线程并发调用counter.getAndIncrement()时,不会出现计数错误。

3.4 toPositive()方法:负数转正数

number & 0x7fffffff这个位掩码操作是为了把负数转成正数。murmur2()可能返回负数(因为返回类型是int,包含符号位),但分区编号必须是≥0的整数。

负数: 1xxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx & 掩码: 0xxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx ────────────────────────────────────────── 结果: 0xxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx ← 永远是正数

四、Sticky Partitioner——2.4版本的性能优化利器

4.1 问题:老版本RoundRobin的痛点

在Kafka 2.3及之前,没有Key的消息使用RoundRobin策略——每条消息随机选一个分区。这会导致什么问题?

【RoundRobin策略产生大量小批次】 Topic: orders (3个分区) msg1 → P0 msg2 → P1 RecordAccumulator中的状态: msg3 → P2 P0: [msg1] ← 只有1条消息就凑满一个批次? msg4 → P0 P1: [msg2] ← 每条消息单独开Batch? msg5 → P1 P2: [msg3] msg6 → P2 结果:每个分区的Batch都只有少量消息 → 发送许多小请求 → 网络开销大

4.2 Sticky策略的优化

Sticky策略的思想是:"粘"在同一个分区上,直到当前Batch满了,再换下一个分区。

【Sticky策略批量优化效果】 msg1 → P0 msg2 → P0 ← 粘住P0 RecordAccumulator中的状态: msg3 → P0 ← 继续粘 P0: [msg1, msg2, msg3, msg4] ← 大Batch msg4 → P0 ← Batch满了! P1: [msg5, msg6, msg7] msg5 → P1 ← 切换到P1 P2: [msg8, msg9] msg6 → P1 msg7 → P1 msg8 → P2 msg9 → P2 结果:每个分区攒了更大的Batch → 减少网络请求 → 吞吐量提升

StickyPartitionCache的具体实现中就一个ConcurrentHashMap<String, Integer>,Key是Topic名,Value是粘住的Partition编号。当Batch满了被Sender取走之后,下次再append新消息时,会重新选一个分区。

4.3 对比总结

对比维度RoundRobin (旧)Sticky (新,2.4+)
分区选择逐条轮询粘住分区,Batch满后切换
Batch填充率低(每个分区各攒一点)高(每个分区攒满再走)
请求数量多(小Batch多)少(大Batch少)
网络开销
消息延迟低(及时发送)略高(等待凑Batch)
适用场景低延迟要求高吞吐要求

五、自定义分区器实战——按业务Key路由

5.1 场景:用户消息优先处理分区

假设你有一个topic叫"user-events",有6个分区。你希望VIP用户的消息发往低编号分区(P0-P1),普通用户消息发往高编号分区(P4-P5),中间分区用于系统消息。

/** * 自定义分区器:VIP用户优先分区 * VIP用户 → P0, P1 * 系统消息 → P2, P3 * 普通用户 → P4, P5 */publicclassVipAwarePartitionerimplementsPartitioner{privatestaticfinalSet<String>VIP_USERS=newHashSet<>(Arrays.asList("vip_001","vip_002","vip_003"// VIP用户白名单));privatestaticfinalStringSYSTEM_KEY="__SYSTEM__";@Overridepublicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){List<PartitionInfo>partitions=cluster.partitionsForTopic(topic);intnumPartitions=partitions.size();// 把Key转成字符串StringkeyStr=(keyBytes!=null)?newString(keyBytes):"";if(SYSTEM_KEY.equals(keyStr)){// 系统消息 → P2, P3// 用简单的随机分配intbase=2;intoffset=ThreadLocalRandom.current().nextInt(2);returnbase+offset;}elseif(VIP_USERS.contains(keyStr)){// VIP用户 → P0, P1// 用Hash保证同一VIP用户消息有序inthash=Math.abs(Utils.murmur2(keyBytes));returnhash%2;// P0或P1}else{// 普通用户 → P4, P5// 也用Hash,同一用户的消息在同一分区inthash=Math.abs(Utils.murmur2(keyBytes));return4+(hash%2);// P4或P5}}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?>configs){}}

5.2 配置使用

Propertiesprops=newProperties();props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.VipAwarePartitioner");// 指定自定义分区器props.put("bootstrap.servers","localhost:9092");// ... 其他配置KafkaProducer<String,String>producer=newKafkaProducer<>(props);// VIP用户消息自动路由到P0或P1producer.send(newProducerRecord<>("user-events","vip_001","VIP用户登录"));// 普通用户消息自动路由到P4或P5producer.send(newProducerRecord<>("user-events","normal_user_123","普通用户点击"));

六、分区数与吞吐量的关系——数学不小了

【分区数与吞吐量的关系图】 吞吐量(TPS) ▲ │ ┌────────────────────── │ ┌─────┘ ← 达到瓶颈(磁盘/网络) │ ┌─────┘ │ ┌────┘ ← 线性增长区间 │ ┌────┘ │ ┌──┘ └─┴────┬────┬────┬────┬────┬────┬────► 分区数 1 3 6 9 12 15 18 分区太少 ──► 无法充分利用集群能力 分区太多 ──► 元数据开销大、文件句柄多、Leader选举慢

经验法则

  • 分区数 =max(总吞吐量需求 / 单分区吞吐量, 消费者实例数)
  • 单分区吞吐量一般:~10MB/s 写,~50MB/s 读
  • 分区总数(所有Topic)建议不超过Broker数量的4000倍

七、分区器选型决策

场景推荐策略配置
需要消息顺序Key HashDefaultPartitioner+ 带Key的消息
高吞吐、不关心顺序StickyDefaultPartitioner(默认)
按业务规则路由自定义Partitionerpartitioner.class=xxx
指定分区发送直接指定分区ProducerRecord中指定partition
均匀分布无Key消息RoundRobin需实现自定义Partitioner

本篇小结

分区器看似简单,实则内涵丰富:

  • DefaultPartitioner是双模式:有Key走murmur2哈希(保证同Key顺序),无Key走Sticky(保证批量效率)。Kafka 2.4的Sticky优化是一个典型的"用稍高延迟换更高吞吐"的trade-off案例
  • 自定义分区器的关键是理解输入参数——你拿到的是已序列化的keyBytes和集群元数据,足以实现任意复杂的分区逻辑
  • 分区数量不是越多越好,需要根据吞吐量需求和消费者并发数综合计算
  • 尽量让分区在各个Broker上均匀分布,避免热点——下一篇我们讲集群元数据,看看Producer是怎么知道这些拓扑信息的

上一篇【第13篇】Kafka序列化器深度解析——自定义Serializer不再是难题
下一篇【第15篇】Kafka集群元数据源码解析——生产者如何"认识"整个集群


版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/8 12:04:17

UVa 424 Integer Inquiry

题目描述 题目要求计算多个大整数的和。输入包含最多 100100100 行&#xff0c;每行一个非负整数&#xff08;可能非常大&#xff0c;长度不超过 100100100 位&#xff09;&#xff0c;以单独一行的一个 000 表示输入结束。输出这些整数的总和。 输入格式 输入包含若干行&#…

作者头像 李华
网站建设 2026/6/8 12:03:18

FPGA做示波器?我用EGO1开发板+XADC+VGA,实现了心电信号的简易显示系统

基于EGO1开发板的心电信号可视化系统设计与实现 在医疗电子和生物信号处理领域&#xff0c;实时可视化心电信号对于教学演示和原型验证具有重要意义。本文将详细介绍如何利用Xilinx EGO1 FPGA开发板内置的XADC模块和VGA显示接口&#xff0c;构建一个低成本的心电信号采集与显示…

作者头像 李华
网站建设 2026/6/8 12:03:14

手把手教你申请SRRC型号核准:从工信部网站注册到拿证的全流程避坑指南

SRRC型号核准实战指南&#xff1a;中小企业高效取证全流程解析第一次接触SRRC认证的工程师们&#xff0c;往往会被各种专业术语和流程绕得晕头转向。作为深耕无线电认证领域多年的技术顾问&#xff0c;我见过太多企业因为不熟悉规则而耽误产品上市周期。本文将用最直白的语言&a…

作者头像 李华
网站建设 2026/6/8 12:03:12

AIOps 智能运维:从异常检测到根因分析的自动化实践

AIOps 智能运维&#xff1a;从异常检测到根因分析的自动化实践一、运维的"告警疲劳"&#xff1a;每天 1000 条告警&#xff0c;99% 是噪音 云原生环境下的运维面临"告警爆炸"问题。一个中等规模的 K8s 集群&#xff0c;每天可能产生上千条告警——CPU 使用…

作者头像 李华
网站建设 2026/6/8 12:03:12

MM配置实战:评估类(OMSK)与物料类型的标准映射关系解析

1. 评估类(OMSK)与物料类型的基础概念 第一次接触SAP MM模块的评估类配置时&#xff0c;我也曾被各种专业术语绕得头晕。直到有次在客户现场&#xff0c;看到仓库管理员对着系统里一堆物料发愁&#xff0c;才真正理解评估类的重要性。简单来说&#xff0c;**评估类(OMSK)**就像…

作者头像 李华