万字详解Kafka,快速上手
适合读者:已经能写一个普通 Spring Boot 后端项目,但对 Kafka、消息中间件、微服务事件驱动架构还不熟。
阅读目标:看完后能看懂公司代码里的 KafkaTemplate、@KafkaListener、group-id、topic、partition、offset、重试、死信队列、幂等等常见设计,不至于接手代码时一脸懵。
参考资料主要来自官方文档:
1. 先建立一个直觉:Kafka 不是“高级版队列”,而是“公司事件流水账” 你可以先把 Kafka 想象成公司里的一本高速、分布式、可长期保存的“业务流水账”。
传统接口调用是:
1 用户下单 -> 订单服务直接调用库存服务 -> 调用支付服务 -> 调用优惠券服务 -> 调用通知服务
这种方式简单直观,但服务之间强耦合。订单服务要知道库存、支付、优惠券、通知服务的接口地址、超时策略、错误处理方式。一个下游慢了,上游也可能被拖慢。
Kafka 的事件驱动方式是:
1 2 3 4 5 6 7 用户下单 -> 订单服务保存订单 -> 订单服务发送 OrderCreatedEvent 到 Kafka -> 库存服务消费这个事件扣库存 -> 优惠券服务消费这个事件核销优惠券 -> 通知服务消费这个事件发短信/推送 -> 数据服务消费这个事件做报表
订单服务只负责告诉世界:“订单创建了”。至于谁关心这件事,由其他服务自己订阅。
这就是 Kafka 在公司里的核心价值:
解耦 :生产者不需要知道消费者是谁。
削峰 :高峰期消息先进 Kafka,下游慢慢消费。
异步化 :主链路只做必要操作,非核心动作异步处理。
可回放 :消费者处理错了,可以从旧 offset 重新消费。
多订阅方 :同一个事件可以被多个团队、多个服务独立消费。
数据基建 :业务事件可以流向风控、推荐、搜索、数仓、监控。
Kafka 官方把它称为 event streaming platform。通俗点说,就是用事件把不同系统连接起来,并且这些事件不是“看完即删”,而是在 topic 中按保留策略存一段时间。
2. Kafka 的核心名词 2.1 Event / Record / Message:消息 Kafka 中一条消息通常叫 event、record 或 message。它一般包含:
key:消息键,用于分区路由,比如 orderId、userId。
value:消息体,比如订单创建事件 JSON。
timestamp:时间戳。
headers:元数据,比如 traceId、tenantId、sourceApp。
示例:
1 2 3 4 5 6 7 8 { "eventId" : "7c91d5b8-7f5d-4cc2-b4ef-111111111111" , "eventType" : "ORDER_CREATED" , "orderId" : 10086 , "userId" : 9527 , "amount" : 35.50 , "occurredAt" : "2026-04-25T10:30:00+08:00" }
公司里常见习惯:
eventId 用来做幂等。
eventType 表示事件类型。
occurredAt 表示业务发生时间。
traceId 放在 header 或消息体里,方便链路追踪。
2.2 Topic:消息主题 Topic 是消息的逻辑分类。可以理解为一个业务事件流。
常见命名:
1 2 3 4 5 6 trade.order.created trade.order.paid trade.order.cancelled user.profile.changed coupon.used inventory.stock.deducted
公司项目中 topic 往往不会随手起名,而是有规范:
按业务域:trade、user、coupon、risk。
按事件名:order.created、order.paid。
按环境隔离:dev.trade.order.created、prod.trade.order.created,或者通过不同 Kafka 集群隔离。
不要把 topic 当成方法名。更推荐表达“发生了什么”,而不是“让谁做什么”。
好:
一般:
前者是事件,后者像命令。事件更容易被多个下游复用。
2.3 Partition:分区 一个 topic 会被拆成多个 partition。分区是 Kafka 并行能力和顺序保证的关键。
1 2 3 4 Topic: trade.order.created Partition 0: msg0, msg1, msg2... Partition 1: msg0, msg1, msg2... Partition 2: msg0, msg1, msg2...
Kafka 只保证同一个 partition 内消息有序 ,不保证整个 topic 全局有序。
如果你发送消息时指定 key,比如 orderId,Kafka 会根据 key 决定分区。相同 key 通常会进入同一个分区,因此同一个订单的事件能保持顺序。
这点在公司非常重要:
1 2 3 4 orderId=10086 的消息: OrderCreated OrderPaid OrderShipped
如果都用 orderId 作为 key,它们大概率会在同一个 partition,消费者看到的顺序更稳定。
但注意:如果后期增加 topic 分区数,key 到 partition 的映射可能变化。对强顺序要求非常高的场景,需要提前设计分区数,或者通过业务锁、状态机兜底。
2.4 Producer:生产者 Producer 是发送消息到 Kafka 的应用。比如订单服务创建订单后,发送 OrderCreatedEvent。
Spring Boot 项目里你经常会看到:
1 kafkaTemplate.send("trade.order.created" , orderId.toString(), event);
第一个参数是 topic,第二个参数是 key,第三个参数是 value。
2.5 Consumer:消费者 Consumer 是从 Kafka 读取消息并处理的应用。比如优惠券服务监听订单创建事件,然后发新人券。
Spring Boot 项目里常见:
1 2 3 4 @KafkaListener(topics = "trade.order.created", groupId = "coupon-service") public void onOrderCreated (OrderCreatedEvent event) { couponService.issueCoupon(event.userId()); }
2.6 Consumer Group:消费者组 消费者组是 Kafka 最容易让新人迷糊的概念。
同一个 topic,可以被多个 group 独立消费。
1 2 3 4 5 6 7 8 9 10 11 Topic: trade.order.created Group: coupon-service coupon-service 实例 1 coupon-service 实例 2 Group: notification-service notification-service 实例 1 Group: data-warehouse-sync sync 实例 1
这里 coupon-service、notification-service、data-warehouse-sync 都能完整收到 trade.order.created 的消息。它们互不影响。
同一个 group 内,多个实例分摊 partition。
假设 topic 有 4 个 partition,coupon-service 部署 2 个实例:
1 2 coupon-service 实例 A: partition 0, partition 1 coupon-service 实例 B: partition 2, partition 3
如果部署 4 个实例:
1 2 3 4 实例 A: partition 0 实例 B: partition 1 实例 C: partition 2 实例 D: partition 3
如果部署 8 个实例,但只有 4 个 partition,那么最多 4 个实例能消费,另外 4 个会空闲。
结论:
想让不同业务都收到同一批消息:用不同 group-id。
想让同一业务横向扩容:同一个 group-id 部署多个实例。
消费并发上限基本受 partition 数影响。
2.7 Offset:消费进度 Offset 是分区内消息的位置。消费者处理到哪里了,需要记录。
例如:
1 2 3 4 partition 0: offset 0: msgA offset 1: msgB offset 2: msgC
消费者消费完 offset=1,就会提交下一次要读的位置。Kafka 官方文档中特别强调,提交的 offset 应该是下一条要读取的消息位置。
公司里你会听到:
“这个 group 的 offset 卡住了”
“消费积压了”
“把 offset 回退一下重放”
“这个消费者重复消费了”
这些都和 offset 有关。
3. Kafka 在公司常见的 8 类使用场景 3.1 订单主链路解耦 下单成功后,订单服务只同步完成核心动作:
校验参数。
写订单表。
写支付单或待支付状态。
发送订单创建事件。
其他动作异步:
发券。
发短信。
计算推荐特征。
同步搜索索引。
写数仓。
风控审计。
这样订单接口不会被所有下游拖慢。
3.2 削峰填谷 大促时用户请求暴涨。比如支付成功后要发积分,如果积分服务扛不住瞬间流量,就可以让支付服务把 PaymentSuccessEvent 写入 Kafka。积分服务按自己能力慢慢消费。
这不代表 Kafka 能凭空解决所有容量问题。它只是把压力从“同步打爆下游”变成“消息积压后慢慢追”。你仍然要关注:
消息积压量。
消费速度。
延迟 SLA。
下游数据库写入能力。
3.3 异步通知 短信、邮件、App Push、站内信通常不应该阻塞主接口。主服务发送事件,通知服务消费事件。
3.4 数据同步 业务服务产生事件,搜索服务消费后更新 Elasticsearch,数据团队消费后写入数仓。
这类消费一般要求:
可重放。
可追踪。
对消息 schema 变更敏感。
3.5 延迟重试和最终一致性 消费者调用下游失败,比如库存服务消费订单事件时数据库暂时不可用,可以稍后重试。
常见做法:
本地短重试:消费线程里快速重试 2 到 3 次。
延迟重试 topic:失败后写入 retry topic,过一段时间再消费。
死信队列:多次失败后进入 DLT/DLQ,人工或平台处理。
3.6 日志和埋点采集 用户行为、接口日志、业务指标可以写入 Kafka,再由 Flink、Spark、ClickHouse、Hive 等系统处理。
3.7 事件驱动微服务 一个服务状态变化后发事件,其他服务订阅并更新自己的本地视图。
例如商品服务发:
营销服务消费后更新活动价格缓存,搜索服务消费后更新索引,推荐服务消费后更新特征。
3.8 缓存失效和本地副本更新 配置、商品、商家状态变化后,通过 Kafka 广播事件,各服务清理缓存或更新本地缓存。
4. 公司代码里 Kafka 通常长什么样 4.1 Maven 依赖 Spring Boot 项目常用 spring-kafka:
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > </dependency > <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka-test</artifactId > <scope > test</scope > </dependency >
如果公司用了内部 starter,你可能看不到直接依赖,而是类似:
1 2 3 4 <dependency > <groupId > com.company.middleware</groupId > <artifactId > company-kafka-spring-boot-starter</artifactId > </dependency >
内部 starter 通常帮你封装了:
bootstrap servers。
认证信息。
topic 前缀。
序列化配置。
统一异常处理。
监控埋点。
traceId 透传。
4.2 application.yml 一个比较典型的配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 spring: application: name: order-service kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer acks: all retries: 3 properties: enable.idempotence: true linger.ms: 20 compression.type: zstd spring.json.add.type.headers: false consumer: group-id: order-service key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer enable-auto-commit: false auto-offset-reset: latest properties: spring.json.trusted.packages: "com.example.order.event" listener: ack-mode: record concurrency: 3
逐项解释:
bootstrap-servers:Kafka broker 地址。客户端用它发现整个集群,不必写全所有 broker,但生产建议配置多个。
acks: all:生产者等待 ISR 副本确认,可靠性最高。
enable.idempotence: true:生产者幂等,降低重试导致重复写入的风险。Kafka 新版本默认会在不冲突配置下启用。
compression.type:消息压缩。业务 JSON 较大时常见。
group-id:消费者组名。
enable-auto-commit: false:业务处理成功后再提交 offset,更可控。
auto-offset-reset: latest:没有历史 offset 时从最新消息开始。测试或补偿任务可能用 earliest。
ack-mode: record:一条消息处理成功提交一条,语义清晰。
concurrency:Spring Kafka listener 并发数,通常不要超过 topic partition 数太多。
4.3 事件对象 DTO 1 2 3 4 5 6 7 8 9 10 11 12 13 package com.example.order.event;import java.math.BigDecimal;import java.time.OffsetDateTime;public record OrderCreatedEvent ( String eventId, Long orderId, Long userId, BigDecimal amount, OffsetDateTime occurredAt ) {}
实际公司里可能会加更多字段:
1 2 3 4 5 6 7 8 9 10 11 public record OrderCreatedEvent ( String eventId, String eventVersion, Long orderId, Long userId, BigDecimal amount, String sourceApp, String traceId, OffsetDateTime occurredAt ) {}
建议思考:
这个事件是“事实”还是“命令”?
下游需要哪些字段才能独立处理?
字段以后能不能兼容增加?
是否需要 eventVersion?
是否需要 eventId 做幂等?
5. Producer:怎么优雅地发送消息 5.1 最简单的发送 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.example.order.producer;import com.example.order.event.OrderCreatedEvent;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Component public class OrderEventProducer { private static final String ORDER_CREATED_TOPIC = "trade.order.created" ; private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate; public OrderEventProducer (KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate) { this .kafkaTemplate = kafkaTemplate; } public void sendOrderCreated (OrderCreatedEvent event) { String key = event.orderId().toString(); kafkaTemplate.send(ORDER_CREATED_TOPIC, key, event); } }
新人容易忽略 key。不要随便传 null。如果消息和订单相关,通常用 orderId;和用户相关,通常用 userId。这样同一个业务实体的消息更容易落到同一个 partition。
5.2 发送后处理成功/失败 Kafka 发送是异步的。Spring Kafka 的 KafkaTemplate#send 返回 CompletableFuture。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.example.order.producer;import com.example.order.event.OrderCreatedEvent;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Component public class OrderEventProducer { private static final Logger log = LoggerFactory.getLogger(OrderEventProducer.class); private static final String ORDER_CREATED_TOPIC = "trade.order.created" ; private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate; public OrderEventProducer (KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate) { this .kafkaTemplate = kafkaTemplate; } public void sendOrderCreated (OrderCreatedEvent event) { String key = event.orderId().toString(); kafkaTemplate.send(ORDER_CREATED_TOPIC, key, event) .whenComplete((result, ex) -> { if (ex != null ) { log.error("send order created event failed, orderId={}, eventId={}" , event.orderId(), event.eventId(), ex); return ; } log.info("send order created event success, topic={}, partition={}, offset={}, orderId={}" , result.getRecordMetadata().topic(), result.getRecordMetadata().partition(), result.getRecordMetadata().offset(), event.orderId()); }); } }
注意:如果发送失败只是打日志,业务上可能丢事件。关键链路不要只写上面的简单代码,还要考虑“业务数据写库成功,但 Kafka 发送失败”怎么办。
5.3 关键问题:数据库写成功了,消息发送失败怎么办? 这是公司里非常常见的架构问题。
错误写法:
1 2 3 4 5 @Transactional public void createOrder (CreateOrderCommand command) { Order order = orderRepository.save(command.toOrder()); kafkaTemplate.send("trade.order.created" , order.getId().toString(), event); }
这个代码有几个问题:
数据库事务提交成功后,Kafka 可能发送失败。
Kafka 发送成功后,数据库事务可能回滚。
send 是异步的,方法返回时不代表消息一定成功。
更稳妥的常见方案是 Outbox Pattern 。
1 2 3 4 5 6 7 8 1. 在同一个数据库事务里: - 写 orders 表 - 写 outbox_event 表 2. 后台任务或 CDC 组件扫描 outbox_event: - 发送 Kafka - 成功后标记 SENT - 失败后重试
示意表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 CREATE TABLE outbox_event ( id BIGINT PRIMARY KEY AUTO_INCREMENT, event_id VARCHAR (64 ) NOT NULL UNIQUE , aggregate_type VARCHAR (64 ) NOT NULL , aggregate_id VARCHAR (64 ) NOT NULL , topic VARCHAR (128 ) NOT NULL , message_key VARCHAR (128 ) NOT NULL , payload JSON NOT NULL , status VARCHAR (16 ) NOT NULL , retry_count INT NOT NULL DEFAULT 0 , next_retry_time DATETIME NOT NULL , created_at DATETIME NOT NULL , updated_at DATETIME NOT NULL );
订单创建:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Transactional public Long createOrder (CreateOrderCommand command) { Order order = orderRepository.save(command.toOrder()); OrderCreatedEvent event = OrderCreatedEventFactory.from(order); outboxEventRepository.save(OutboxEvent.pending( event.eventId(), "Order" , order.getId().toString(), "trade.order.created" , order.getId().toString(), event )); return order.getId(); }
后台发布:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Scheduled(fixedDelay = 1000) public void publishPendingEvents () { List<OutboxEvent> events = outboxEventRepository.findReadyToPublish(100 ); for (OutboxEvent event : events) { try { kafkaTemplate.send(event.getTopic(), event.getMessageKey(), event.getPayload()) .get(3 , TimeUnit.SECONDS); event.markSent(); } catch (Exception ex) { event.markFailedAndScheduleRetry(); } } outboxEventRepository.saveAll(events); }
这类代码你在大厂项目中经常会看到变体。名字可能不叫 outbox,可能叫:
event_publish
mq_message
reliable_message
transaction_message
domain_event
你接手代码时可以搜这些关键词。
6. Consumer:怎么消费消息 6.1 最简单的消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.example.coupon.consumer;import com.example.order.event.OrderCreatedEvent;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Component public class OrderCreatedConsumer { private static final Logger log = LoggerFactory.getLogger(OrderCreatedConsumer.class); private final CouponService couponService; public OrderCreatedConsumer (CouponService couponService) { this .couponService = couponService; } @KafkaListener( topics = "trade.order.created", groupId = "coupon-service" ) public void onMessage (OrderCreatedEvent event) { log.info("receive order created event, orderId={}, eventId={}" , event.orderId(), event.eventId()); couponService.issueAfterOrderCreated(event.userId(), event.orderId(), event.eventId()); } }
你需要读懂三个点:
topics:监听哪个 topic。
groupId:当前服务属于哪个消费者组。
方法参数:消息 value 反序列化后的对象。
6.2 消费者一定要考虑幂等 Kafka 消费大多数业务默认按 at least once 理解:消息不会轻易丢,但可能重复。
重复可能来自:
消费者处理成功,但提交 offset 前宕机。
消费者处理超时,被 group 踢出,partition 分配给其他实例。
业务手动重放消息。
生产者重试导致极端场景重复。
你自己的补偿任务重复发送。
所以消费者业务逻辑必须幂等。
错误想法:
1 Kafka 不会重复,所以我直接插入一条发券记录。
正确想法:
1 Kafka 可能重复,所以我用 eventId 或 businessKey 做唯一约束。
例子:
1 2 3 4 5 6 7 8 9 CREATE TABLE coupon_issue_record ( id BIGINT PRIMARY KEY AUTO_INCREMENT, event_id VARCHAR (64 ) NOT NULL , user_id BIGINT NOT NULL , order_id BIGINT NOT NULL , coupon_id BIGINT NOT NULL , created_at DATETIME NOT NULL , UNIQUE KEY uk_event_id (event_id) );
业务代码:
1 2 3 4 5 6 7 8 9 @Transactional public void issueAfterOrderCreated (Long userId, Long orderId, String eventId) { if (couponIssueRecordRepository.existsByEventId(eventId)) { return ; } Coupon coupon = couponRepository.createForUser(userId); couponIssueRecordRepository.save(new CouponIssueRecord (eventId, userId, orderId, coupon.getId())); }
更常见的唯一键也可能是:
1 2 3 uk_order_coupon_scene(order_id, coupon_scene) uk_user_activity(user_id, activity_id) uk_event_id(event_id)
6.3 消费失败怎么办 消费失败不是小概率事件。常见失败包括:
JSON 反序列化失败。
业务参数非法。
下游 RPC 超时。
数据库死锁。
Redis 超时。
第三方接口失败。
一般处理策略:
1 2 3 可恢复异常:重试 不可恢复异常:进死信 未知异常:有限重试后进死信
Spring Kafka 可以配置 DefaultErrorHandler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.example.kafka.config;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.TopicPartition;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.listener.DefaultErrorHandler;import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;import org.springframework.util.backoff.FixedBackOff;@Configuration public class KafkaConsumerConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory ( ConsumerFactory<String, Object> consumerFactory, KafkaTemplate<String, Object> kafkaTemplate ) { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory <>(); factory.setConsumerFactory(consumerFactory); factory.setCommonErrorHandler(defaultErrorHandler(kafkaTemplate)); return factory; } private DefaultErrorHandler defaultErrorHandler (KafkaTemplate<String, Object> kafkaTemplate) { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer ( kafkaTemplate, (ConsumerRecord<?, ?> record, Exception ex) -> new TopicPartition (record.topic() + ".DLT" , record.partition()) ); DefaultErrorHandler handler = new DefaultErrorHandler ( recoverer, new FixedBackOff (1000L , 2L ) ); handler.addNotRetryableExceptions(IllegalArgumentException.class); return handler; } }
含义:
每条失败消息间隔 1 秒重试。
最多重试 2 次,加上首次消费共 3 次。
仍失败则写入原 topic 对应的 .DLT。
IllegalArgumentException 这类明确业务参数错误不重试,直接进死信。
死信 topic 不是垃圾桶,而是待处理问题池。公司里通常会有:
死信告警。
死信查询平台。
人工修复后重投。
定时补偿任务。
7. Producer 配置怎么取舍 7.1 acks Kafka 官方文档中,acks 表示 producer 认为请求完成前需要多少确认。
常见值:
acks=0:不等 broker 确认,最快但最不可靠。
acks=1:leader 写入就确认,如果 follower 还没复制 leader 就挂,可能丢。
acks=all:ISR 副本确认,可靠性最强。
公司核心业务一般倾向:
1 2 3 4 spring: kafka: producer: acks: all
对于日志埋点类、允许少量丢失的场景,可能为了吞吐做不同选择。
7.2 enable.idempotence 生产者幂等能避免某些重试导致的重复写入。Kafka 新版本默认在没有冲突配置时启用。
建议核心业务明确写:
1 2 3 4 5 spring: kafka: producer: properties: enable.idempotence: true
它要求:
acks=all
retries > 0
max.in.flight.requests.per.connection <= 5
不要把它理解成“业务绝对不会重复消费”。它主要解决 producer 到 Kafka 写入侧的重复问题,不替代消费者幂等。
7.3 compression.type 常见:
1 2 3 4 5 spring: kafka: producer: properties: compression.type: zstd
压缩能降低网络和磁盘压力,但会增加 CPU 开销。业务 JSON 比较大、吞吐较高时很常见。
7.4 linger.ms 和 batch.size Producer 会把消息攒成 batch 发送。
linger.ms:最多等多久凑 batch。
batch.size:batch 大小。
低延迟接口不宜设置太大。高吞吐日志类场景可以适当增大。
7.5 发送 key 的选择 这是架构思考题,不是代码细节。
订单事件:
1 kafkaTemplate.send(topic, orderId.toString(), event);
用户事件:
1 kafkaTemplate.send(topic, userId.toString(), event);
商家事件:
1 kafkaTemplate.send(topic, merchantId.toString(), event);
选择 key 时想清楚:
哪个维度需要有序?
哪个维度会不会特别热点?
partition 数能否支撑吞吐?
后续是否可能按这个 key 做排查?
如果某个大商家、超级用户、热门活动产生大量消息,用它做 key 可能造成单 partition 热点。
8. Consumer 配置怎么取舍 8.1 group-id group-id 决定消费进度归属。
同一个业务应用多个实例,应该使用同一个 group:
1 2 3 4 spring: kafka: consumer: group-id: coupon-service
如果你本地调试不想影响测试环境公共 group,可以临时改成:
1 2 3 4 spring: kafka: consumer: group-id: coupon-service-local-yourname
否则你本地启动服务可能抢走测试环境消息,师兄师姐会很快注意到。
8.2 enable.auto.commit Kafka consumer 默认可以自动提交 offset。但公司业务通常更喜欢业务处理成功后再提交。
建议:
1 2 3 4 spring: kafka: consumer: enable-auto-commit: false
Spring Kafka listener 配合 ack mode 处理 offset。
8.3 auto.offset.reset 当没有历史 offset,或者 offset 已经过期时,从哪里开始消费。
latest:从最新消息开始。线上常见,避免新 group 一启动就扫历史海量消息。
earliest:从最早消息开始。补偿、重建索引、测试常用。
none:没有 offset 直接报错,适合强控制场景。
新人本地调试常见疑惑:
很可能是:
group 已经有 offset。
auto-offset-reset=latest。
消息已经过了 retention。
你监听错 topic 或环境。
8.4 max.poll.records 一次 poll 最多拉多少条。
如果单条处理慢,不要拉太多,否则处理时间过长可能导致 rebalance。
1 2 3 4 spring: kafka: consumer: max-poll-records: 100
8.5 max.poll.interval.ms 消费者两次 poll 之间允许的最大处理时间。业务处理非常慢时要注意这个配置,否则 Kafka 会认为消费者挂了,把 partition 分配给别人。
但更推荐优化消费逻辑:
单条消息处理尽量短。
慢任务拆出去。
批量写库。
控制 max.poll.records。
必要时增加 partition 和消费者实例。
9. Topic 设计:架构构思时先问这些问题 9.1 一个 topic 放一种事件,还是多种事件? 方案 A:一个 topic 一种事件
1 2 3 trade.order.created trade.order.paid trade.order.cancelled
优点:
语义清晰。
消费者订阅简单。
权限、保留时间、监控容易做。
缺点:
方案 B:一个 topic 放同一聚合的多种事件
消息里用 eventType 区分:
1 2 3 4 { "eventType" : "ORDER_CREATED" , "payload" : { } }
优点:
缺点:
消费者要过滤不关心的事件。
schema 管理更复杂。
新手建议:优先理解公司现有规范。没有规范时,核心业务更倾向“一类业务事实一个 topic”,不要过度合并。
9.2 分区数怎么估? 分区数影响:
并行消费能力。
单 partition 顺序。
broker 资源。
后续扩容复杂度。
粗略思路:
1 2 3 4 需要消费吞吐 = 峰值消息量 / 可接受延迟 单消费者实例吞吐 = 压测得出 需要并发数 = 需要消费吞吐 / 单实例吞吐 partition 数 >= 需要并发数
例如:
1 2 3 4 峰值每秒 3000 条订单事件 单消费者实例每秒处理 500 条 至少需要 6 个并发消费者 topic partition 可以设置为 8 或 12
别随意设置 1000 个 partition。partition 太多会增加 broker、controller、文件句柄、rebalance 等成本。
9.3 retention 保留多久? Topic 默认按时间保留消息。Kafka 4.2 文档中 retention.ms 默认是 7 天。
业务思考:
普通业务事件:保留 3 到 7 天?
关键交易事件:保留更久?
日志埋点:看成本,可能 1 到 3 天。
compacted 状态 topic:可能用 cleanup.policy=compact 保留每个 key 最新值。
9.4 delete 还是 compact? cleanup.policy=delete:
适合事件流:
cleanup.policy=compact:
适合状态流:
10. 微服务架构里 Kafka 放在哪一层 一个比较健康的事件驱动微服务结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 Controller -> Application Service -> Domain Service -> Repository -> Domain Event / Outbox Kafka Publisher -> 只负责把事件可靠发布出去 Kafka Consumer -> 接收事件 -> 做幂等 -> 调用本服务 Application Service
10.1 不要在 Controller 里直接发 Kafka 不推荐:
1 2 3 4 5 6 @PostMapping("/orders") public Long createOrder (@RequestBody CreateOrderRequest request) { Order order = orderService.create(request); kafkaTemplate.send("trade.order.created" , order.getId().toString(), event); return order.getId(); }
更推荐:
1 2 3 4 @PostMapping("/orders") public Long createOrder (@RequestBody CreateOrderRequest request) { return orderApplicationService.create(request.toCommand()); }
事件发布逻辑放在 application service、domain event、outbox publisher 中。
10.2 Consumer 不要写成巨大的业务垃圾桶 不推荐:
1 2 3 4 5 6 7 8 @KafkaListener(topics = "trade.order.created") public void onMessage (OrderCreatedEvent event) { }
更推荐:
1 2 3 4 @KafkaListener(topics = "trade.order.created", groupId = "coupon-service") public void onMessage (OrderCreatedEvent event) { couponApplicationService.handleOrderCreated(event); }
Listener 只做:
日志。
参数基本校验。
trace 上下文。
调用 application service。
抛出异常让统一 error handler 处理。
10.3 Consumer 处理外部副作用时要小心 消费消息后可能会:
写本地数据库。
调下游 RPC。
更新缓存。
发送另一条 Kafka 消息。
每一步都可能失败。设计时要明确:
哪一步必须成功?
哪一步可以补偿?
重复执行会怎样?
失败后是否重试?
重试是否会放大下游压力?
是否需要死信?
11. 一套完整小例子:订单创建后给用户发券 11.1 架构图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 用户 | v Order Service | 1. 写 orders 表 | 2. 写 outbox_event 表 v Outbox Publisher | | send trade.order.created v Kafka | | groupId=coupon-service v Coupon Service | 1. 根据 eventId 幂等 | 2. 创建优惠券 | 3. 记录 coupon_issue_record v MySQL
11.2 topic 定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.example.kafka.config;import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class KafkaTopicConfig { @Bean public NewTopic orderCreatedTopic () { return new NewTopic ("trade.order.created" , 12 , (short ) 3 ); } @Bean public NewTopic orderCreatedDltTopic () { return new NewTopic ("trade.order.created.DLT" , 12 , (short ) 3 ); } }
说明:
12 是 partition 数。
3 是 replication factor。
生产环境 topic 可能由平台统一创建,不一定允许业务代码自动创建。
11.3 事件对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.example.order.event;import java.math.BigDecimal;import java.time.OffsetDateTime;public record OrderCreatedEvent ( String eventId, String eventVersion, Long orderId, Long userId, BigDecimal amount, OffsetDateTime occurredAt ) {}
11.4 订单服务写 outbox 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package com.example.order.application;import com.example.order.domain.Order;import com.example.order.event.OrderCreatedEvent;import com.example.order.outbox.OutboxEvent;import java.time.OffsetDateTime;import java.util.UUID;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;@Service public class OrderApplicationService { private final OrderRepository orderRepository; private final OutboxEventRepository outboxEventRepository; public OrderApplicationService ( OrderRepository orderRepository, OutboxEventRepository outboxEventRepository ) { this .orderRepository = orderRepository; this .outboxEventRepository = outboxEventRepository; } @Transactional public Long createOrder (CreateOrderCommand command) { Order order = Order.create(command.userId(), command.amount()); orderRepository.save(order); OrderCreatedEvent event = new OrderCreatedEvent ( UUID.randomUUID().toString(), "1.0" , order.getId(), order.getUserId(), order.getAmount(), OffsetDateTime.now() ); outboxEventRepository.save(OutboxEvent.pending( event.eventId(), "trade.order.created" , event.orderId().toString(), event )); return order.getId(); } }
11.5 Outbox 发布器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.example.order.outbox;import java.util.List;import java.util.concurrent.TimeUnit;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;@Component public class OutboxPublisher { private static final Logger log = LoggerFactory.getLogger(OutboxPublisher.class); private final OutboxEventRepository outboxEventRepository; private final KafkaTemplate<String, Object> kafkaTemplate; public OutboxPublisher ( OutboxEventRepository outboxEventRepository, KafkaTemplate<String, Object> kafkaTemplate ) { this .outboxEventRepository = outboxEventRepository; this .kafkaTemplate = kafkaTemplate; } @Scheduled(fixedDelay = 1000) @Transactional public void publish () { List<OutboxEvent> events = outboxEventRepository.findReadyToPublish(100 ); for (OutboxEvent event : events) { try { kafkaTemplate.send(event.getTopic(), event.getMessageKey(), event.getPayload()) .get(3 , TimeUnit.SECONDS); event.markSent(); } catch (Exception ex) { log.error("publish outbox event failed, eventId={}" , event.getEventId(), ex); event.markRetryLater(); } } } }
真实项目中还会考虑:
多实例并发扫描时的抢锁。
失败退避。
最大发送次数。
发送中的状态。
批量更新。
CDC 替代轮询。
11.6 优惠券服务消费 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.example.coupon.consumer;import com.example.order.event.OrderCreatedEvent;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Component public class OrderCreatedConsumer { private static final Logger log = LoggerFactory.getLogger(OrderCreatedConsumer.class); private final CouponApplicationService couponApplicationService; public OrderCreatedConsumer (CouponApplicationService couponApplicationService) { this .couponApplicationService = couponApplicationService; } @KafkaListener( topics = "trade.order.created", groupId = "coupon-service", containerFactory = "kafkaListenerContainerFactory" ) public void onOrderCreated (OrderCreatedEvent event) { log.info("consume order created event, orderId={}, userId={}, eventId={}" , event.orderId(), event.userId(), event.eventId()); couponApplicationService.issueCouponForOrder(event); } }
11.7 幂等发券 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.example.coupon.application;import com.example.order.event.OrderCreatedEvent;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;@Service public class CouponApplicationService { private final CouponRepository couponRepository; private final CouponIssueRecordRepository issueRecordRepository; public CouponApplicationService ( CouponRepository couponRepository, CouponIssueRecordRepository issueRecordRepository ) { this .couponRepository = couponRepository; this .issueRecordRepository = issueRecordRepository; } @Transactional public void issueCouponForOrder (OrderCreatedEvent event) { if (issueRecordRepository.existsByEventId(event.eventId())) { return ; } Coupon coupon = Coupon.newUserOrderCoupon(event.userId(), event.orderId()); couponRepository.save(coupon); issueRecordRepository.save(CouponIssueRecord.of( event.eventId(), event.userId(), event.orderId(), coupon.getId() )); } }
数据库唯一约束兜底:
1 2 ALTER TABLE coupon_issue_recordADD UNIQUE KEY uk_event_id (event_id);
不要只靠 existsByEventId,并发下可能两个线程同时判断不存在。唯一键是最终防线。
12. 手动提交 offset:什么时候需要 大部分 Spring Kafka 项目不需要你手写 Acknowledgment。但你可能会在代码里看到:
1 2 3 4 5 6 7 8 9 @KafkaListener(topics = "trade.order.created", groupId = "coupon-service") public void onMessage (OrderCreatedEvent event, Acknowledgment ack) { try { couponApplicationService.issueCouponForOrder(event); ack.acknowledge(); } catch (Exception ex) { throw ex; } }
这表示业务显式确认后才提交 offset。
对应配置类似:
1 2 3 4 spring: kafka: listener: ack-mode: manual
要注意:
成功处理后再 ack。
不要 catch 异常后直接 ack,否则失败消息会被跳过。
复杂批量消费时,ack 粒度要特别谨慎。
13. 批量消费:吞吐更高,但复杂度也更高 批量消费适合:
写数仓。
批量入库。
日志处理。
对单条实时性要求不高的场景。
配置:
1 2 3 4 5 6 spring: kafka: listener: type: batch consumer: max-poll-records: 500
代码:
1 2 3 4 @KafkaListener(topics = "user.behavior.events", groupId = "behavior-writer") public void onMessages (List<UserBehaviorEvent> events) { behaviorRepository.batchInsert(events); }
风险:
一批里一条失败,整批怎么处理?
如何定位失败消息?
如何部分提交?
是否会导致重复写入更多?
新人优先掌握单条消费。批量消费读懂即可,改动时要谨慎。
14. 顺序性:Kafka 能保证什么,不能保证什么 Kafka 能保证:
同一个 partition 内有序。
同一个 key 通常进入同一个 partition。
同一个消费者处理某个 partition 时按 offset 顺序读取。
Kafka 不能天然保证:
整个 topic 全局有序。
跨 topic 有序。
下游处理完成顺序一定和读取顺序一致,尤其你自己开线程池时。
如果业务要求“同一个订单事件严格有序”,你要:
producer 用 orderId 作为 key。
不随意增加 partition。
consumer 不要把同一订单事件扔进无序线程池。
用订单状态机兜底,例如不能从 CREATED 直接跳到 SHIPPED。
状态机兜底非常重要,因为分布式系统里只靠消息顺序很脆。
15. 事务和 Exactly Once:先理解边界,不要迷信 Kafka 支持事务,Spring Kafka 也支持 Exactly Once Semantics。它主要适合:
1 read from Kafka -> process -> write to Kafka
例如流处理应用从 topic A 读,处理后写 topic B,并把 offset 和写入放进同一个 Kafka 事务。
但很多业务是:
1 read from Kafka -> write MySQL / call RPC / update Redis
这时 Kafka 事务不能自动保证外部系统 exactly once。你仍然需要:
数据库唯一键。
幂等表。
状态机。
outbox。
补偿任务。
所以公司里更常见的落地原则是:
1 Kafka 提供 at least once + 业务幂等 = 最终一致
新人不必一上来追求 exactly once。先把幂等、重试、死信、监控做好。
16. Schema 演进:消息字段怎么改才不炸下游 公司里 Kafka topic 可能被多个服务消费。你随便改字段,可能让别的团队消费者挂掉。
16.1 兼容性原则 推荐:
新增字段:给默认值,消费者不依赖也能跑。
废弃字段:先保留,标注 deprecated,等所有消费者升级后再删。
枚举新增:消费者要有 unknown 分支。
字段改名:不要直接改,先新增字段双写。
危险:
删除字段。
改字段类型,比如 Long 改 String。
改语义但不改版本。
复用老字段表达新含义。
16.2 eventVersion 简单项目可以加:
1 2 3 { "eventVersion" : "1.0" }
消费者按版本处理:
1 2 3 4 5 if ("1.0" .equals(event.eventVersion())) { handleV1(event); } else { throw new IllegalArgumentException ("unsupported event version: " + event.eventVersion()); }
更成熟的公司可能使用 Avro / Protobuf / Schema Registry。你实习时先看项目里是否有:
.proto
.avsc
schema-registry
SpecificRecord
KafkaAvroSerializer
ProtobufSerializer
17. 监控和排障:线上出问题时看什么 17.1 消费积压 lag Lag 大致表示:
1 某个 partition 最新 offset - 当前 group 已提交 offset
lag 上升说明消费跟不上生产。
常见原因:
下游数据库慢。
消费者异常反复重试。
消费者实例数不够。
partition 数不够。
某个 key 热点导致单 partition 堵住。
消费者频繁 rebalance。
17.2 生产失败率 关注:
send error 数。
timeout。
record too large。
authorization failed。
unknown topic。
17.3 消费失败率 关注:
listener exception。
retry 次数。
DLT 消息数。
反序列化失败。
17.4 Rebalance Rebalance 是消费者组重新分配 partition。偶尔发生正常,频繁发生就有问题。
常见原因:
服务频繁重启。
单条消息处理太久。
max.poll.interval.ms 太小。
GC 停顿。
网络抖动。
17.5 日志里一定要带关键信息 Producer 日志建议带:
1 topic, key, eventId, businessId, traceId
Consumer 日志建议带:
1 topic, partition, offset, key, groupId, eventId, businessId, traceId
Spring Kafka listener 可以接收 ConsumerRecord:
1 2 3 4 5 6 7 8 9 10 11 12 13 @KafkaListener(topics = "trade.order.created", groupId = "coupon-service") public void onMessage (ConsumerRecord<String, OrderCreatedEvent> record) { OrderCreatedEvent event = record.value(); log.info("consume event, topic={}, partition={}, offset={}, key={}, eventId={}" , record.topic(), record.partition(), record.offset(), record.key(), event.eventId()); couponApplicationService.issueCouponForOrder(event); }
18. 本地怎么练习 18.1 Docker Compose 启动 Kafka 下面是一个单节点 Kafka 示例,适合本地学习。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 services: kafka: image: apache/kafka:4.0.0 container_name: kafka ports: - "9092:9092" environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_CONTROLLER_QUORUM_VOTERS: 1 @kafka:9093 KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
启动:
18.2 创建 topic 1 2 3 4 5 6 docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --create \ --topic trade.order.created \ --partitions 3 \ --replication-factor 1
查看 topic:
1 2 3 4 docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --describe \ --topic trade.order.created
18.3 命令行生产和消费 生产:
1 2 3 4 5 docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic trade.order.created \ --property parse.key=true \ --property key.separator=:
输入:
1 10086:{"eventId":"e1","orderId":10086,"userId":9527,"amount":35.50}
消费:
1 2 3 4 5 docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic trade.order.created \ --group coupon-service-local \ --from-beginning
18.4 看 consumer group 1 2 3 4 docker exec -it kafka /opt/kafka/bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --describe \ --group coupon-service-local
重点看:
1 2 3 CURRENT-OFFSET LOG-END-OFFSET LAG
19. Spring Boot 测试怎么写 Spring 官方提供 @EmbeddedKafka,可以在测试中启动嵌入式 Kafka。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.example.coupon.consumer;import com.example.order.event.OrderCreatedEvent;import java.math.BigDecimal;import java.time.OffsetDateTime;import java.util.UUID;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.test.context.EmbeddedKafka;@SpringBootTest @EmbeddedKafka( topics = "trade.order.created", partitions = 3, bootstrapServersProperty = "spring.kafka.bootstrap-servers" ) class OrderCreatedConsumerTest { @Autowired private KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate; @Test void shouldIssueCouponWhenOrderCreated () { OrderCreatedEvent event = new OrderCreatedEvent ( UUID.randomUUID().toString(), "1.0" , 10086L , 9527L , new BigDecimal ("35.50" ), OffsetDateTime.now() ); kafkaTemplate.send("trade.order.created" , event.orderId().toString(), event); } }
异步测试建议用 Awaitility:
1 2 3 4 await().atMost(Duration.ofSeconds(5 )) .untilAsserted(() -> { assertThat(issueRecordRepository.existsByEventId(event.eventId())).isTrue(); });
20. 接手公司 Kafka 代码时的阅读路线 20.1 先找配置 搜索:
1 2 3 4 5 6 7 spring.kafka bootstrap-servers group-id KafkaProperties ConcurrentKafkaListenerContainerFactory DefaultErrorHandler DeadLetterPublishingRecoverer
看清楚:
连接哪个 Kafka 集群。
consumer group 是什么。
序列化方式是什么。
是否自动提交 offset。
重试和死信怎么配置。
并发数是多少。
20.2 再找生产者 搜索:
1 2 3 KafkaTemplate .send( ProducerRecord
每个发送点问:
发送到哪个 topic?
key 是什么?
消息体是什么?
发送失败怎么处理?
是否在事务里?
是否有 outbox 或可靠消息机制?
20.3 再找消费者 搜索:
1 2 3 4 @KafkaListener MessageListener ConsumerRecord Acknowledgment
每个消费者问:
topic 是什么?
groupId 是什么?
是否批量消费?
是否手动 ack?
是否幂等?
失败后重试还是进死信?
处理逻辑是否调用外部系统?
20.4 最后看运维和平台约束 问师兄师姐:
topic 由谁创建?
分区数能不能改?
消息保留多久?
有没有统一 MQ 平台?
死信在哪里看?
消息能不能重投?
消费积压在哪看?
是否有 topic 命名规范?
是否有 schema 规范?
21. 常见坑位清单 坑 1:本地消费者抢了测试环境消息 原因:本地用了和测试服务一样的 group-id。
解决:
1 2 3 4 spring: kafka: consumer: group-id: coupon-service-local-yourname
坑 2:以为 Kafka 不会重复消费 Kafka 业务落地通常按至少一次处理。消费者必须幂等。
坑 3:消费失败后 catch 住异常不抛 错误:
1 2 3 4 5 try { handle(event); } catch (Exception ex) { log.error("failed" , ex); }
这样 listener 可能认为处理成功并提交 offset,消息就丢给业务了。
更合理:
1 2 3 4 5 6 try { handle(event); } catch (Exception ex) { log.error("failed" , ex); throw ex; }
让统一错误处理器决定重试和死信。
坑 4:没有 key,导致同一业务实体消息乱序 订单事件尽量用 orderId 作为 key,用户事件尽量用 userId。
坑 5:把慢 RPC 放在消费线程里无限重试 这样会阻塞 partition,导致积压。应该有限重试,失败进 retry topic 或 DLT。
坑 6:改消息字段不通知下游 Kafka topic 可能有很多消费者。字段变更要兼容,最好有 schema 规范和版本管理。
坑 7:partition 数小于期望并发 同一个 consumer group 内,消费并发上限受 partition 数限制。只有 3 个 partition,部署 20 个实例也不会有 20 个实例同时消费这个 topic。
坑 8:死信队列无人处理 DLT 不是终点。要有告警、排查、修复、重投机制。
22. 面试和实习中常被问的问题 22.1 Kafka 为什么快? 可以从这些角度答:
顺序写磁盘。
Page Cache。
批量发送和批量拉取。
零拷贝相关优化。
分区并行。
消费者按 offset 顺序读取,状态维护轻。
不用一上来背源码,先讲清楚设计思路。
22.2 Kafka 如何保证消息不丢? 生产侧:
acks=all
retries > 0
enable.idempotence=true
关键业务使用 outbox 或事务消息方案
Broker 侧:
replication factor 通常大于 1
min.insync.replicas 合理配置
禁止不安全 leader 选举
消费侧:
业务处理成功后再提交 offset
消费失败重试
死信兜底
22.3 Kafka 如何保证顺序?
Kafka 只保证单 partition 内顺序。
相同 key 通常进入同一 partition。
如果要同一订单有序,用 orderId 做 key。
不要在消费者内部破坏顺序。
用业务状态机兜底。
22.4 Kafka 会不会重复消费? 会。按至少一次理解。消费者必须幂等。
22.5 消息积压怎么办? 先定位:
是所有 partition 都积压,还是某个 partition 积压?
消费者是否报错重试?
下游 DB/RPC 是否慢?
消费者实例数是否不足?
partition 数是否不足?
是否有热点 key?
再处理:
修复异常。
扩消费者实例。
提高批处理效率。
优化下游。
必要时扩 partition。
对热点 key 做业务拆分。
22.6 Kafka 和 RabbitMQ 有什么区别? 粗略理解:
RabbitMQ 更像传统消息队列,强调队列、路由、确认、复杂路由模型。
Kafka 更像分布式事件日志,强调高吞吐、持久化、可回放、多订阅方、流处理。
公司选择不一定是技术绝对优劣,也和历史包袱、平台建设、团队经验有关。
23. 给入职前 2 周的学习路线 第 1 到 2 天:概念和命令行
理解 topic、partition、offset、consumer group。
用 Docker 启动 Kafka。
用命令行生产消费消息。
查看 consumer group lag。
第 3 到 5 天:Spring Boot 生产消费
写一个 producer API。
写一个 @KafkaListener。
用 JSON 序列化对象。
加 eventId 并做幂等。
故意抛异常观察重试。
第 6 到 8 天:可靠消息和 outbox
写订单表和 outbox 表。
本地实现定时发布 outbox。
模拟 Kafka 发送失败。
理解“数据库事务”和“消息发送”之间的不一致问题。
第 9 到 11 天:重试、死信、补偿
配置 DefaultErrorHandler。
配置 DLT。
写一个死信重投接口或脚本。
学会看失败日志里的 topic、partition、offset。
第 12 到 14 天:架构复盘
画一个订单事件驱动架构图。
练习回答:为什么要异步?为什么要幂等?为什么会重复?积压怎么办?
看一个开源 Spring Kafka demo 或公司入职资料。
24. 你在公司里应该怎么问问题 刚入职时不要害怕问,但问题要尽量具体。
不太好的问法:
更好的问法:
1 2 3 我看到 coupon-service 监听 trade.order.created,groupId 是 coupon-service。 这个消费者里用 eventId 做了唯一键幂等,失败后会进 trade.order.created.DLT。 我想确认一下:这个 DLT 是平台自动告警,还是需要我们服务自己做重投入口?
这种问法会让别人觉得你已经认真读过代码,只是缺少上下文。
25. 最后总结:记住这张脑图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 Kafka 公司落地 核心概念 topic: 事件分类 partition: 并行和顺序单位 offset: 消费进度 group-id: 消费者组 key: 分区路由和局部顺序 生产者 KafkaTemplate key 选择 acks=all enable.idempotence=true 发送失败处理 outbox 保证数据库和消息最终一致 消费者 @KafkaListener 业务幂等 成功后提交 offset 有限重试 DLT 兜底 监控 lag 架构 事件表达事实 主链路做核心动作 下游异步订阅 最终一致 schema 兼容演进 排障 topic / partition / offset / key group lag retry count DLT traceId
你现在不需要把 Kafka 所有底层细节都背下来。实习接手业务代码,最重要的是形成这几个条件反射:
看到 producer,问:消息是否可靠发出?key 选得对不对?
看到 consumer,问:是否幂等?失败怎么处理?
看到 group-id,问:这是独立订阅还是同组扩容?
看到 partition,问:并发和顺序是否符合业务?
看到 offset,问:处理成功和提交进度之间有没有风险?
看到 DLT,问:有没有告警和重投机制?
把这些想清楚,你就已经比“只会调 API”的新手稳很多了。