万字详解Kafka,快速上手

适合读者:已经能写一个普通 Spring Boot 后端项目,但对 Kafka、消息中间件、微服务事件驱动架构还不熟。

阅读目标:看完后能看懂公司代码里的 KafkaTemplate@KafkaListenergroup-idtopicpartitionoffset、重试、死信队列、幂等等常见设计,不至于接手代码时一脸懵。

参考资料主要来自官方文档:


1. 先建立一个直觉:Kafka 不是“高级版队列”,而是“公司事件流水账”

你可以先把 Kafka 想象成公司里的一本高速、分布式、可长期保存的“业务流水账”。

传统接口调用是:

1
用户下单 -> 订单服务直接调用库存服务 -> 调用支付服务 -> 调用优惠券服务 -> 调用通知服务

这种方式简单直观,但服务之间强耦合。订单服务要知道库存、支付、优惠券、通知服务的接口地址、超时策略、错误处理方式。一个下游慢了,上游也可能被拖慢。

Kafka 的事件驱动方式是:

1
2
3
4
5
6
7
用户下单
-> 订单服务保存订单
-> 订单服务发送 OrderCreatedEvent 到 Kafka
-> 库存服务消费这个事件扣库存
-> 优惠券服务消费这个事件核销优惠券
-> 通知服务消费这个事件发短信/推送
-> 数据服务消费这个事件做报表

订单服务只负责告诉世界:“订单创建了”。至于谁关心这件事,由其他服务自己订阅。

这就是 Kafka 在公司里的核心价值:

  • 解耦:生产者不需要知道消费者是谁。
  • 削峰:高峰期消息先进 Kafka,下游慢慢消费。
  • 异步化:主链路只做必要操作,非核心动作异步处理。
  • 可回放:消费者处理错了,可以从旧 offset 重新消费。
  • 多订阅方:同一个事件可以被多个团队、多个服务独立消费。
  • 数据基建:业务事件可以流向风控、推荐、搜索、数仓、监控。

Kafka 官方把它称为 event streaming platform。通俗点说,就是用事件把不同系统连接起来,并且这些事件不是“看完即删”,而是在 topic 中按保留策略存一段时间。


2. Kafka 的核心名词

2.1 Event / Record / Message:消息

Kafka 中一条消息通常叫 event、record 或 message。它一般包含:

  • key:消息键,用于分区路由,比如 orderIduserId
  • value:消息体,比如订单创建事件 JSON。
  • timestamp:时间戳。
  • headers:元数据,比如 traceId、tenantId、sourceApp。

示例:

1
2
3
4
5
6
7
8
{
"eventId": "7c91d5b8-7f5d-4cc2-b4ef-111111111111",
"eventType": "ORDER_CREATED",
"orderId": 10086,
"userId": 9527,
"amount": 35.50,
"occurredAt": "2026-04-25T10:30:00+08:00"
}

公司里常见习惯:

  • eventId 用来做幂等。
  • eventType 表示事件类型。
  • occurredAt 表示业务发生时间。
  • traceId 放在 header 或消息体里,方便链路追踪。

2.2 Topic:消息主题

Topic 是消息的逻辑分类。可以理解为一个业务事件流。

常见命名:

1
2
3
4
5
6
trade.order.created
trade.order.paid
trade.order.cancelled
user.profile.changed
coupon.used
inventory.stock.deducted

公司项目中 topic 往往不会随手起名,而是有规范:

  • 按业务域:tradeusercouponrisk
  • 按事件名:order.createdorder.paid
  • 按环境隔离:dev.trade.order.createdprod.trade.order.created,或者通过不同 Kafka 集群隔离。

不要把 topic 当成方法名。更推荐表达“发生了什么”,而不是“让谁做什么”。

好:

1
trade.order.created

一般:

1
send-coupon-message

前者是事件,后者像命令。事件更容易被多个下游复用。

2.3 Partition:分区

一个 topic 会被拆成多个 partition。分区是 Kafka 并行能力和顺序保证的关键。

1
2
3
4
Topic: trade.order.created
Partition 0: msg0, msg1, msg2...
Partition 1: msg0, msg1, msg2...
Partition 2: msg0, msg1, msg2...

Kafka 只保证同一个 partition 内消息有序,不保证整个 topic 全局有序。

如果你发送消息时指定 key,比如 orderId,Kafka 会根据 key 决定分区。相同 key 通常会进入同一个分区,因此同一个订单的事件能保持顺序。

这点在公司非常重要:

1
2
3
4
orderId=10086 的消息:
OrderCreated
OrderPaid
OrderShipped

如果都用 orderId 作为 key,它们大概率会在同一个 partition,消费者看到的顺序更稳定。

但注意:如果后期增加 topic 分区数,key 到 partition 的映射可能变化。对强顺序要求非常高的场景,需要提前设计分区数,或者通过业务锁、状态机兜底。

2.4 Producer:生产者

Producer 是发送消息到 Kafka 的应用。比如订单服务创建订单后,发送 OrderCreatedEvent

Spring Boot 项目里你经常会看到:

1
kafkaTemplate.send("trade.order.created", orderId.toString(), event);

第一个参数是 topic,第二个参数是 key,第三个参数是 value。

2.5 Consumer:消费者

Consumer 是从 Kafka 读取消息并处理的应用。比如优惠券服务监听订单创建事件,然后发新人券。

Spring Boot 项目里常见:

1
2
3
4
@KafkaListener(topics = "trade.order.created", groupId = "coupon-service")
public void onOrderCreated(OrderCreatedEvent event) {
couponService.issueCoupon(event.userId());
}

2.6 Consumer Group:消费者组

消费者组是 Kafka 最容易让新人迷糊的概念。

同一个 topic,可以被多个 group 独立消费。

1
2
3
4
5
6
7
8
9
10
11
Topic: trade.order.created

Group: coupon-service
coupon-service 实例 1
coupon-service 实例 2

Group: notification-service
notification-service 实例 1

Group: data-warehouse-sync
sync 实例 1

这里 coupon-servicenotification-servicedata-warehouse-sync 都能完整收到 trade.order.created 的消息。它们互不影响。

同一个 group 内,多个实例分摊 partition。

假设 topic 有 4 个 partition,coupon-service 部署 2 个实例:

1
2
coupon-service 实例 A: partition 0, partition 1
coupon-service 实例 B: partition 2, partition 3

如果部署 4 个实例:

1
2
3
4
实例 A: partition 0
实例 B: partition 1
实例 C: partition 2
实例 D: partition 3

如果部署 8 个实例,但只有 4 个 partition,那么最多 4 个实例能消费,另外 4 个会空闲。

结论:

  • 想让不同业务都收到同一批消息:用不同 group-id
  • 想让同一业务横向扩容:同一个 group-id 部署多个实例。
  • 消费并发上限基本受 partition 数影响。

2.7 Offset:消费进度

Offset 是分区内消息的位置。消费者处理到哪里了,需要记录。

例如:

1
2
3
4
partition 0:
offset 0: msgA
offset 1: msgB
offset 2: msgC

消费者消费完 offset=1,就会提交下一次要读的位置。Kafka 官方文档中特别强调,提交的 offset 应该是下一条要读取的消息位置。

公司里你会听到:

  • “这个 group 的 offset 卡住了”
  • “消费积压了”
  • “把 offset 回退一下重放”
  • “这个消费者重复消费了”

这些都和 offset 有关。


3. Kafka 在公司常见的 8 类使用场景

3.1 订单主链路解耦

下单成功后,订单服务只同步完成核心动作:

  • 校验参数。
  • 写订单表。
  • 写支付单或待支付状态。
  • 发送订单创建事件。

其他动作异步:

  • 发券。
  • 发短信。
  • 计算推荐特征。
  • 同步搜索索引。
  • 写数仓。
  • 风控审计。

这样订单接口不会被所有下游拖慢。

3.2 削峰填谷

大促时用户请求暴涨。比如支付成功后要发积分,如果积分服务扛不住瞬间流量,就可以让支付服务把 PaymentSuccessEvent 写入 Kafka。积分服务按自己能力慢慢消费。

这不代表 Kafka 能凭空解决所有容量问题。它只是把压力从“同步打爆下游”变成“消息积压后慢慢追”。你仍然要关注:

  • 消息积压量。
  • 消费速度。
  • 延迟 SLA。
  • 下游数据库写入能力。

3.3 异步通知

短信、邮件、App Push、站内信通常不应该阻塞主接口。主服务发送事件,通知服务消费事件。

3.4 数据同步

业务服务产生事件,搜索服务消费后更新 Elasticsearch,数据团队消费后写入数仓。

这类消费一般要求:

  • 可重放。
  • 可追踪。
  • 对消息 schema 变更敏感。

3.5 延迟重试和最终一致性

消费者调用下游失败,比如库存服务消费订单事件时数据库暂时不可用,可以稍后重试。

常见做法:

  • 本地短重试:消费线程里快速重试 2 到 3 次。
  • 延迟重试 topic:失败后写入 retry topic,过一段时间再消费。
  • 死信队列:多次失败后进入 DLT/DLQ,人工或平台处理。

3.6 日志和埋点采集

用户行为、接口日志、业务指标可以写入 Kafka,再由 Flink、Spark、ClickHouse、Hive 等系统处理。

3.7 事件驱动微服务

一个服务状态变化后发事件,其他服务订阅并更新自己的本地视图。

例如商品服务发:

1
item.price.changed

营销服务消费后更新活动价格缓存,搜索服务消费后更新索引,推荐服务消费后更新特征。

3.8 缓存失效和本地副本更新

配置、商品、商家状态变化后,通过 Kafka 广播事件,各服务清理缓存或更新本地缓存。


4. 公司代码里 Kafka 通常长什么样

4.1 Maven 依赖

Spring Boot 项目常用 spring-kafka

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

如果公司用了内部 starter,你可能看不到直接依赖,而是类似:

1
2
3
4
<dependency>
<groupId>com.company.middleware</groupId>
<artifactId>company-kafka-spring-boot-starter</artifactId>
</dependency>

内部 starter 通常帮你封装了:

  • bootstrap servers。
  • 认证信息。
  • topic 前缀。
  • 序列化配置。
  • 统一异常处理。
  • 监控埋点。
  • traceId 透传。

4.2 application.yml

一个比较典型的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
spring:
application:
name: order-service

kafka:
bootstrap-servers: localhost:9092

producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
linger.ms: 20
compression.type: zstd
spring.json.add.type.headers: false

consumer:
group-id: order-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
enable-auto-commit: false
auto-offset-reset: latest
properties:
spring.json.trusted.packages: "com.example.order.event"

listener:
ack-mode: record
concurrency: 3

逐项解释:

  • bootstrap-servers:Kafka broker 地址。客户端用它发现整个集群,不必写全所有 broker,但生产建议配置多个。
  • acks: all:生产者等待 ISR 副本确认,可靠性最高。
  • enable.idempotence: true:生产者幂等,降低重试导致重复写入的风险。Kafka 新版本默认会在不冲突配置下启用。
  • compression.type:消息压缩。业务 JSON 较大时常见。
  • group-id:消费者组名。
  • enable-auto-commit: false:业务处理成功后再提交 offset,更可控。
  • auto-offset-reset: latest:没有历史 offset 时从最新消息开始。测试或补偿任务可能用 earliest
  • ack-mode: record:一条消息处理成功提交一条,语义清晰。
  • concurrency:Spring Kafka listener 并发数,通常不要超过 topic partition 数太多。

4.3 事件对象 DTO

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.example.order.event;

import java.math.BigDecimal;
import java.time.OffsetDateTime;

public record OrderCreatedEvent(
String eventId,
Long orderId,
Long userId,
BigDecimal amount,
OffsetDateTime occurredAt
) {
}

实际公司里可能会加更多字段:

1
2
3
4
5
6
7
8
9
10
11
public record OrderCreatedEvent(
String eventId,
String eventVersion,
Long orderId,
Long userId,
BigDecimal amount,
String sourceApp,
String traceId,
OffsetDateTime occurredAt
) {
}

建议思考:

  • 这个事件是“事实”还是“命令”?
  • 下游需要哪些字段才能独立处理?
  • 字段以后能不能兼容增加?
  • 是否需要 eventVersion
  • 是否需要 eventId 做幂等?

5. Producer:怎么优雅地发送消息

5.1 最简单的发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.example.order.producer;

import com.example.order.event.OrderCreatedEvent;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class OrderEventProducer {

private static final String ORDER_CREATED_TOPIC = "trade.order.created";

private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;

public OrderEventProducer(KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendOrderCreated(OrderCreatedEvent event) {
String key = event.orderId().toString();
kafkaTemplate.send(ORDER_CREATED_TOPIC, key, event);
}
}

新人容易忽略 key。不要随便传 null。如果消息和订单相关,通常用 orderId;和用户相关,通常用 userId。这样同一个业务实体的消息更容易落到同一个 partition。

5.2 发送后处理成功/失败

Kafka 发送是异步的。Spring Kafka 的 KafkaTemplate#send 返回 CompletableFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.example.order.producer;

import com.example.order.event.OrderCreatedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class OrderEventProducer {

private static final Logger log = LoggerFactory.getLogger(OrderEventProducer.class);
private static final String ORDER_CREATED_TOPIC = "trade.order.created";

private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;

public OrderEventProducer(KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendOrderCreated(OrderCreatedEvent event) {
String key = event.orderId().toString();

kafkaTemplate.send(ORDER_CREATED_TOPIC, key, event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("send order created event failed, orderId={}, eventId={}",
event.orderId(), event.eventId(), ex);
return;
}

log.info("send order created event success, topic={}, partition={}, offset={}, orderId={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset(),
event.orderId());
});
}
}

注意:如果发送失败只是打日志,业务上可能丢事件。关键链路不要只写上面的简单代码,还要考虑“业务数据写库成功,但 Kafka 发送失败”怎么办。

5.3 关键问题:数据库写成功了,消息发送失败怎么办?

这是公司里非常常见的架构问题。

错误写法:

1
2
3
4
5
@Transactional
public void createOrder(CreateOrderCommand command) {
Order order = orderRepository.save(command.toOrder());
kafkaTemplate.send("trade.order.created", order.getId().toString(), event);
}

这个代码有几个问题:

  • 数据库事务提交成功后,Kafka 可能发送失败。
  • Kafka 发送成功后,数据库事务可能回滚。
  • send 是异步的,方法返回时不代表消息一定成功。

更稳妥的常见方案是 Outbox Pattern

1
2
3
4
5
6
7
8
1. 在同一个数据库事务里:
- 写 orders 表
- 写 outbox_event 表

2. 后台任务或 CDC 组件扫描 outbox_event:
- 发送 Kafka
- 成功后标记 SENT
- 失败后重试

示意表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE outbox_event (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
event_id VARCHAR(64) NOT NULL UNIQUE,
aggregate_type VARCHAR(64) NOT NULL,
aggregate_id VARCHAR(64) NOT NULL,
topic VARCHAR(128) NOT NULL,
message_key VARCHAR(128) NOT NULL,
payload JSON NOT NULL,
status VARCHAR(16) NOT NULL,
retry_count INT NOT NULL DEFAULT 0,
next_retry_time DATETIME NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
);

订单创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Transactional
public Long createOrder(CreateOrderCommand command) {
Order order = orderRepository.save(command.toOrder());

OrderCreatedEvent event = OrderCreatedEventFactory.from(order);

outboxEventRepository.save(OutboxEvent.pending(
event.eventId(),
"Order",
order.getId().toString(),
"trade.order.created",
order.getId().toString(),
event
));

return order.getId();
}

后台发布:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Scheduled(fixedDelay = 1000)
public void publishPendingEvents() {
List<OutboxEvent> events = outboxEventRepository.findReadyToPublish(100);

for (OutboxEvent event : events) {
try {
kafkaTemplate.send(event.getTopic(), event.getMessageKey(), event.getPayload())
.get(3, TimeUnit.SECONDS);
event.markSent();
} catch (Exception ex) {
event.markFailedAndScheduleRetry();
}
}

outboxEventRepository.saveAll(events);
}

这类代码你在大厂项目中经常会看到变体。名字可能不叫 outbox,可能叫:

  • event_publish
  • mq_message
  • reliable_message
  • transaction_message
  • domain_event

你接手代码时可以搜这些关键词。


6. Consumer:怎么消费消息

6.1 最简单的消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.example.coupon.consumer;

import com.example.order.event.OrderCreatedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class OrderCreatedConsumer {

private static final Logger log = LoggerFactory.getLogger(OrderCreatedConsumer.class);

private final CouponService couponService;

public OrderCreatedConsumer(CouponService couponService) {
this.couponService = couponService;
}

@KafkaListener(
topics = "trade.order.created",
groupId = "coupon-service"
)
public void onMessage(OrderCreatedEvent event) {
log.info("receive order created event, orderId={}, eventId={}",
event.orderId(), event.eventId());

couponService.issueAfterOrderCreated(event.userId(), event.orderId(), event.eventId());
}
}

你需要读懂三个点:

  • topics:监听哪个 topic。
  • groupId:当前服务属于哪个消费者组。
  • 方法参数:消息 value 反序列化后的对象。

6.2 消费者一定要考虑幂等

Kafka 消费大多数业务默认按 at least once 理解:消息不会轻易丢,但可能重复。

重复可能来自:

  • 消费者处理成功,但提交 offset 前宕机。
  • 消费者处理超时,被 group 踢出,partition 分配给其他实例。
  • 业务手动重放消息。
  • 生产者重试导致极端场景重复。
  • 你自己的补偿任务重复发送。

所以消费者业务逻辑必须幂等。

错误想法:

1
Kafka 不会重复,所以我直接插入一条发券记录。

正确想法:

1
Kafka 可能重复,所以我用 eventId 或 businessKey 做唯一约束。

例子:

1
2
3
4
5
6
7
8
9
CREATE TABLE coupon_issue_record (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
event_id VARCHAR(64) NOT NULL,
user_id BIGINT NOT NULL,
order_id BIGINT NOT NULL,
coupon_id BIGINT NOT NULL,
created_at DATETIME NOT NULL,
UNIQUE KEY uk_event_id (event_id)
);

业务代码:

1
2
3
4
5
6
7
8
9
@Transactional
public void issueAfterOrderCreated(Long userId, Long orderId, String eventId) {
if (couponIssueRecordRepository.existsByEventId(eventId)) {
return;
}

Coupon coupon = couponRepository.createForUser(userId);
couponIssueRecordRepository.save(new CouponIssueRecord(eventId, userId, orderId, coupon.getId()));
}

更常见的唯一键也可能是:

1
2
3
uk_order_coupon_scene(order_id, coupon_scene)
uk_user_activity(user_id, activity_id)
uk_event_id(event_id)

6.3 消费失败怎么办

消费失败不是小概率事件。常见失败包括:

  • JSON 反序列化失败。
  • 业务参数非法。
  • 下游 RPC 超时。
  • 数据库死锁。
  • Redis 超时。
  • 第三方接口失败。

一般处理策略:

1
2
3
可恢复异常:重试
不可恢复异常:进死信
未知异常:有限重试后进死信

Spring Kafka 可以配置 DefaultErrorHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.example.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
public class KafkaConsumerConfig {

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
KafkaTemplate<String, Object> kafkaTemplate
) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(defaultErrorHandler(kafkaTemplate));
return factory;
}

private DefaultErrorHandler defaultErrorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate,
(ConsumerRecord<?, ?> record, Exception ex) ->
new TopicPartition(record.topic() + ".DLT", record.partition())
);

DefaultErrorHandler handler = new DefaultErrorHandler(
recoverer,
new FixedBackOff(1000L, 2L)
);

handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
}

含义:

  • 每条失败消息间隔 1 秒重试。
  • 最多重试 2 次,加上首次消费共 3 次。
  • 仍失败则写入原 topic 对应的 .DLT
  • IllegalArgumentException 这类明确业务参数错误不重试,直接进死信。

死信 topic 不是垃圾桶,而是待处理问题池。公司里通常会有:

  • 死信告警。
  • 死信查询平台。
  • 人工修复后重投。
  • 定时补偿任务。

7. Producer 配置怎么取舍

7.1 acks

Kafka 官方文档中,acks 表示 producer 认为请求完成前需要多少确认。

常见值:

  • acks=0:不等 broker 确认,最快但最不可靠。
  • acks=1:leader 写入就确认,如果 follower 还没复制 leader 就挂,可能丢。
  • acks=all:ISR 副本确认,可靠性最强。

公司核心业务一般倾向:

1
2
3
4
spring:
kafka:
producer:
acks: all

对于日志埋点类、允许少量丢失的场景,可能为了吞吐做不同选择。

7.2 enable.idempotence

生产者幂等能避免某些重试导致的重复写入。Kafka 新版本默认在没有冲突配置时启用。

建议核心业务明确写:

1
2
3
4
5
spring:
kafka:
producer:
properties:
enable.idempotence: true

它要求:

  • acks=all
  • retries > 0
  • max.in.flight.requests.per.connection <= 5

不要把它理解成“业务绝对不会重复消费”。它主要解决 producer 到 Kafka 写入侧的重复问题,不替代消费者幂等。

7.3 compression.type

常见:

1
2
3
4
5
spring:
kafka:
producer:
properties:
compression.type: zstd

压缩能降低网络和磁盘压力,但会增加 CPU 开销。业务 JSON 比较大、吞吐较高时很常见。

7.4 linger.msbatch.size

Producer 会把消息攒成 batch 发送。

  • linger.ms:最多等多久凑 batch。
  • batch.size:batch 大小。

低延迟接口不宜设置太大。高吞吐日志类场景可以适当增大。

7.5 发送 key 的选择

这是架构思考题,不是代码细节。

订单事件:

1
kafkaTemplate.send(topic, orderId.toString(), event);

用户事件:

1
kafkaTemplate.send(topic, userId.toString(), event);

商家事件:

1
kafkaTemplate.send(topic, merchantId.toString(), event);

选择 key 时想清楚:

  • 哪个维度需要有序?
  • 哪个维度会不会特别热点?
  • partition 数能否支撑吞吐?
  • 后续是否可能按这个 key 做排查?

如果某个大商家、超级用户、热门活动产生大量消息,用它做 key 可能造成单 partition 热点。


8. Consumer 配置怎么取舍

8.1 group-id

group-id 决定消费进度归属。

同一个业务应用多个实例,应该使用同一个 group:

1
2
3
4
spring:
kafka:
consumer:
group-id: coupon-service

如果你本地调试不想影响测试环境公共 group,可以临时改成:

1
2
3
4
spring:
kafka:
consumer:
group-id: coupon-service-local-yourname

否则你本地启动服务可能抢走测试环境消息,师兄师姐会很快注意到。

8.2 enable.auto.commit

Kafka consumer 默认可以自动提交 offset。但公司业务通常更喜欢业务处理成功后再提交。

建议:

1
2
3
4
spring:
kafka:
consumer:
enable-auto-commit: false

Spring Kafka listener 配合 ack mode 处理 offset。

8.3 auto.offset.reset

当没有历史 offset,或者 offset 已经过期时,从哪里开始消费。

  • latest:从最新消息开始。线上常见,避免新 group 一启动就扫历史海量消息。
  • earliest:从最早消息开始。补偿、重建索引、测试常用。
  • none:没有 offset 直接报错,适合强控制场景。

新人本地调试常见疑惑:

1
我启动消费者了,为什么收不到以前发过的消息?

很可能是:

  • group 已经有 offset。
  • auto-offset-reset=latest
  • 消息已经过了 retention。
  • 你监听错 topic 或环境。

8.4 max.poll.records

一次 poll 最多拉多少条。

如果单条处理慢,不要拉太多,否则处理时间过长可能导致 rebalance。

1
2
3
4
spring:
kafka:
consumer:
max-poll-records: 100

8.5 max.poll.interval.ms

消费者两次 poll 之间允许的最大处理时间。业务处理非常慢时要注意这个配置,否则 Kafka 会认为消费者挂了,把 partition 分配给别人。

但更推荐优化消费逻辑:

  • 单条消息处理尽量短。
  • 慢任务拆出去。
  • 批量写库。
  • 控制 max.poll.records
  • 必要时增加 partition 和消费者实例。

9. Topic 设计:架构构思时先问这些问题

9.1 一个 topic 放一种事件,还是多种事件?

方案 A:一个 topic 一种事件

1
2
3
trade.order.created
trade.order.paid
trade.order.cancelled

优点:

  • 语义清晰。
  • 消费者订阅简单。
  • 权限、保留时间、监控容易做。

缺点:

  • topic 数量多。

方案 B:一个 topic 放同一聚合的多种事件

1
trade.order.events

消息里用 eventType 区分:

1
2
3
4
{
"eventType": "ORDER_CREATED",
"payload": {}
}

优点:

  • topic 少。
  • 同一个订单生命周期事件更集中。

缺点:

  • 消费者要过滤不关心的事件。
  • schema 管理更复杂。

新手建议:优先理解公司现有规范。没有规范时,核心业务更倾向“一类业务事实一个 topic”,不要过度合并。

9.2 分区数怎么估?

分区数影响:

  • 并行消费能力。
  • 单 partition 顺序。
  • broker 资源。
  • 后续扩容复杂度。

粗略思路:

1
2
3
4
需要消费吞吐 = 峰值消息量 / 可接受延迟
单消费者实例吞吐 = 压测得出
需要并发数 = 需要消费吞吐 / 单实例吞吐
partition 数 >= 需要并发数

例如:

1
2
3
4
峰值每秒 3000 条订单事件
单消费者实例每秒处理 500 条
至少需要 6 个并发消费者
topic partition 可以设置为 8 或 12

别随意设置 1000 个 partition。partition 太多会增加 broker、controller、文件句柄、rebalance 等成本。

9.3 retention 保留多久?

Topic 默认按时间保留消息。Kafka 4.2 文档中 retention.ms 默认是 7 天。

业务思考:

  • 普通业务事件:保留 3 到 7 天?
  • 关键交易事件:保留更久?
  • 日志埋点:看成本,可能 1 到 3 天。
  • compacted 状态 topic:可能用 cleanup.policy=compact 保留每个 key 最新值。

9.4 delete 还是 compact?

cleanup.policy=delete

1
按时间或大小删除旧消息

适合事件流:

1
订单创建、支付成功、用户点击

cleanup.policy=compact

1
同一个 key 只保留最新值

适合状态流:

1
2
3
商品当前价格
用户当前等级
门店当前营业状态

10. 微服务架构里 Kafka 放在哪一层

一个比较健康的事件驱动微服务结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
Controller
-> Application Service
-> Domain Service
-> Repository
-> Domain Event / Outbox

Kafka Publisher
-> 只负责把事件可靠发布出去

Kafka Consumer
-> 接收事件
-> 做幂等
-> 调用本服务 Application Service

10.1 不要在 Controller 里直接发 Kafka

不推荐:

1
2
3
4
5
6
@PostMapping("/orders")
public Long createOrder(@RequestBody CreateOrderRequest request) {
Order order = orderService.create(request);
kafkaTemplate.send("trade.order.created", order.getId().toString(), event);
return order.getId();
}

更推荐:

1
2
3
4
@PostMapping("/orders")
public Long createOrder(@RequestBody CreateOrderRequest request) {
return orderApplicationService.create(request.toCommand());
}

事件发布逻辑放在 application service、domain event、outbox publisher 中。

10.2 Consumer 不要写成巨大的业务垃圾桶

不推荐:

1
2
3
4
5
6
7
8
@KafkaListener(topics = "trade.order.created")
public void onMessage(OrderCreatedEvent event) {
// 200 行业务逻辑
// 查数据库
// 调 RPC
// 发短信
// 写缓存
}

更推荐:

1
2
3
4
@KafkaListener(topics = "trade.order.created", groupId = "coupon-service")
public void onMessage(OrderCreatedEvent event) {
couponApplicationService.handleOrderCreated(event);
}

Listener 只做:

  • 日志。
  • 参数基本校验。
  • trace 上下文。
  • 调用 application service。
  • 抛出异常让统一 error handler 处理。

10.3 Consumer 处理外部副作用时要小心

消费消息后可能会:

  • 写本地数据库。
  • 调下游 RPC。
  • 更新缓存。
  • 发送另一条 Kafka 消息。

每一步都可能失败。设计时要明确:

  • 哪一步必须成功?
  • 哪一步可以补偿?
  • 重复执行会怎样?
  • 失败后是否重试?
  • 重试是否会放大下游压力?
  • 是否需要死信?

11. 一套完整小例子:订单创建后给用户发券

11.1 架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
用户
|
v
Order Service
| 1. 写 orders 表
| 2. 写 outbox_event 表
v
Outbox Publisher
|
| send trade.order.created
v
Kafka
|
| groupId=coupon-service
v
Coupon Service
| 1. 根据 eventId 幂等
| 2. 创建优惠券
| 3. 记录 coupon_issue_record
v
MySQL

11.2 topic 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.example.kafka.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopicConfig {

@Bean
public NewTopic orderCreatedTopic() {
return new NewTopic("trade.order.created", 12, (short) 3);
}

@Bean
public NewTopic orderCreatedDltTopic() {
return new NewTopic("trade.order.created.DLT", 12, (short) 3);
}
}

说明:

  • 12 是 partition 数。
  • 3 是 replication factor。
  • 生产环境 topic 可能由平台统一创建,不一定允许业务代码自动创建。

11.3 事件对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.example.order.event;

import java.math.BigDecimal;
import java.time.OffsetDateTime;

public record OrderCreatedEvent(
String eventId,
String eventVersion,
Long orderId,
Long userId,
BigDecimal amount,
OffsetDateTime occurredAt
) {
}

11.4 订单服务写 outbox

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.example.order.application;

import com.example.order.domain.Order;
import com.example.order.event.OrderCreatedEvent;
import com.example.order.outbox.OutboxEvent;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderApplicationService {

private final OrderRepository orderRepository;
private final OutboxEventRepository outboxEventRepository;

public OrderApplicationService(
OrderRepository orderRepository,
OutboxEventRepository outboxEventRepository
) {
this.orderRepository = orderRepository;
this.outboxEventRepository = outboxEventRepository;
}

@Transactional
public Long createOrder(CreateOrderCommand command) {
Order order = Order.create(command.userId(), command.amount());
orderRepository.save(order);

OrderCreatedEvent event = new OrderCreatedEvent(
UUID.randomUUID().toString(),
"1.0",
order.getId(),
order.getUserId(),
order.getAmount(),
OffsetDateTime.now()
);

outboxEventRepository.save(OutboxEvent.pending(
event.eventId(),
"trade.order.created",
event.orderId().toString(),
event
));

return order.getId();
}
}

11.5 Outbox 发布器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.example.order.outbox;

import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
public class OutboxPublisher {

private static final Logger log = LoggerFactory.getLogger(OutboxPublisher.class);

private final OutboxEventRepository outboxEventRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;

public OutboxPublisher(
OutboxEventRepository outboxEventRepository,
KafkaTemplate<String, Object> kafkaTemplate
) {
this.outboxEventRepository = outboxEventRepository;
this.kafkaTemplate = kafkaTemplate;
}

@Scheduled(fixedDelay = 1000)
@Transactional
public void publish() {
List<OutboxEvent> events = outboxEventRepository.findReadyToPublish(100);

for (OutboxEvent event : events) {
try {
kafkaTemplate.send(event.getTopic(), event.getMessageKey(), event.getPayload())
.get(3, TimeUnit.SECONDS);
event.markSent();
} catch (Exception ex) {
log.error("publish outbox event failed, eventId={}", event.getEventId(), ex);
event.markRetryLater();
}
}
}
}

真实项目中还会考虑:

  • 多实例并发扫描时的抢锁。
  • 失败退避。
  • 最大发送次数。
  • 发送中的状态。
  • 批量更新。
  • CDC 替代轮询。

11.6 优惠券服务消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example.coupon.consumer;

import com.example.order.event.OrderCreatedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class OrderCreatedConsumer {

private static final Logger log = LoggerFactory.getLogger(OrderCreatedConsumer.class);

private final CouponApplicationService couponApplicationService;

public OrderCreatedConsumer(CouponApplicationService couponApplicationService) {
this.couponApplicationService = couponApplicationService;
}

@KafkaListener(
topics = "trade.order.created",
groupId = "coupon-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void onOrderCreated(OrderCreatedEvent event) {
log.info("consume order created event, orderId={}, userId={}, eventId={}",
event.orderId(), event.userId(), event.eventId());

couponApplicationService.issueCouponForOrder(event);
}
}

11.7 幂等发券

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.example.coupon.application;

import com.example.order.event.OrderCreatedEvent;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class CouponApplicationService {

private final CouponRepository couponRepository;
private final CouponIssueRecordRepository issueRecordRepository;

public CouponApplicationService(
CouponRepository couponRepository,
CouponIssueRecordRepository issueRecordRepository
) {
this.couponRepository = couponRepository;
this.issueRecordRepository = issueRecordRepository;
}

@Transactional
public void issueCouponForOrder(OrderCreatedEvent event) {
if (issueRecordRepository.existsByEventId(event.eventId())) {
return;
}

Coupon coupon = Coupon.newUserOrderCoupon(event.userId(), event.orderId());
couponRepository.save(coupon);

issueRecordRepository.save(CouponIssueRecord.of(
event.eventId(),
event.userId(),
event.orderId(),
coupon.getId()
));
}
}

数据库唯一约束兜底:

1
2
ALTER TABLE coupon_issue_record
ADD UNIQUE KEY uk_event_id (event_id);

不要只靠 existsByEventId,并发下可能两个线程同时判断不存在。唯一键是最终防线。


12. 手动提交 offset:什么时候需要

大部分 Spring Kafka 项目不需要你手写 Acknowledgment。但你可能会在代码里看到:

1
2
3
4
5
6
7
8
9
@KafkaListener(topics = "trade.order.created", groupId = "coupon-service")
public void onMessage(OrderCreatedEvent event, Acknowledgment ack) {
try {
couponApplicationService.issueCouponForOrder(event);
ack.acknowledge();
} catch (Exception ex) {
throw ex;
}
}

这表示业务显式确认后才提交 offset。

对应配置类似:

1
2
3
4
spring:
kafka:
listener:
ack-mode: manual

要注意:

  • 成功处理后再 ack。
  • 不要 catch 异常后直接 ack,否则失败消息会被跳过。
  • 复杂批量消费时,ack 粒度要特别谨慎。

13. 批量消费:吞吐更高,但复杂度也更高

批量消费适合:

  • 写数仓。
  • 批量入库。
  • 日志处理。
  • 对单条实时性要求不高的场景。

配置:

1
2
3
4
5
6
spring:
kafka:
listener:
type: batch
consumer:
max-poll-records: 500

代码:

1
2
3
4
@KafkaListener(topics = "user.behavior.events", groupId = "behavior-writer")
public void onMessages(List<UserBehaviorEvent> events) {
behaviorRepository.batchInsert(events);
}

风险:

  • 一批里一条失败,整批怎么处理?
  • 如何定位失败消息?
  • 如何部分提交?
  • 是否会导致重复写入更多?

新人优先掌握单条消费。批量消费读懂即可,改动时要谨慎。


14. 顺序性:Kafka 能保证什么,不能保证什么

Kafka 能保证:

  • 同一个 partition 内有序。
  • 同一个 key 通常进入同一个 partition。
  • 同一个消费者处理某个 partition 时按 offset 顺序读取。

Kafka 不能天然保证:

  • 整个 topic 全局有序。
  • 跨 topic 有序。
  • 下游处理完成顺序一定和读取顺序一致,尤其你自己开线程池时。

如果业务要求“同一个订单事件严格有序”,你要:

  • producer 用 orderId 作为 key。
  • 不随意增加 partition。
  • consumer 不要把同一订单事件扔进无序线程池。
  • 用订单状态机兜底,例如不能从 CREATED 直接跳到 SHIPPED

状态机兜底非常重要,因为分布式系统里只靠消息顺序很脆。


15. 事务和 Exactly Once:先理解边界,不要迷信

Kafka 支持事务,Spring Kafka 也支持 Exactly Once Semantics。它主要适合:

1
read from Kafka -> process -> write to Kafka

例如流处理应用从 topic A 读,处理后写 topic B,并把 offset 和写入放进同一个 Kafka 事务。

但很多业务是:

1
read from Kafka -> write MySQL / call RPC / update Redis

这时 Kafka 事务不能自动保证外部系统 exactly once。你仍然需要:

  • 数据库唯一键。
  • 幂等表。
  • 状态机。
  • outbox。
  • 补偿任务。

所以公司里更常见的落地原则是:

1
Kafka 提供 at least once + 业务幂等 = 最终一致

新人不必一上来追求 exactly once。先把幂等、重试、死信、监控做好。


16. Schema 演进:消息字段怎么改才不炸下游

公司里 Kafka topic 可能被多个服务消费。你随便改字段,可能让别的团队消费者挂掉。

16.1 兼容性原则

推荐:

  • 新增字段:给默认值,消费者不依赖也能跑。
  • 废弃字段:先保留,标注 deprecated,等所有消费者升级后再删。
  • 枚举新增:消费者要有 unknown 分支。
  • 字段改名:不要直接改,先新增字段双写。

危险:

  • 删除字段。
  • 改字段类型,比如 LongString
  • 改语义但不改版本。
  • 复用老字段表达新含义。

16.2 eventVersion

简单项目可以加:

1
2
3
{
"eventVersion": "1.0"
}

消费者按版本处理:

1
2
3
4
5
if ("1.0".equals(event.eventVersion())) {
handleV1(event);
} else {
throw new IllegalArgumentException("unsupported event version: " + event.eventVersion());
}

更成熟的公司可能使用 Avro / Protobuf / Schema Registry。你实习时先看项目里是否有:

  • .proto
  • .avsc
  • schema-registry
  • SpecificRecord
  • KafkaAvroSerializer
  • ProtobufSerializer

17. 监控和排障:线上出问题时看什么

17.1 消费积压 lag

Lag 大致表示:

1
某个 partition 最新 offset - 当前 group 已提交 offset

lag 上升说明消费跟不上生产。

常见原因:

  • 下游数据库慢。
  • 消费者异常反复重试。
  • 消费者实例数不够。
  • partition 数不够。
  • 某个 key 热点导致单 partition 堵住。
  • 消费者频繁 rebalance。

17.2 生产失败率

关注:

  • send error 数。
  • timeout。
  • record too large。
  • authorization failed。
  • unknown topic。

17.3 消费失败率

关注:

  • listener exception。
  • retry 次数。
  • DLT 消息数。
  • 反序列化失败。

17.4 Rebalance

Rebalance 是消费者组重新分配 partition。偶尔发生正常,频繁发生就有问题。

常见原因:

  • 服务频繁重启。
  • 单条消息处理太久。
  • max.poll.interval.ms 太小。
  • GC 停顿。
  • 网络抖动。

17.5 日志里一定要带关键信息

Producer 日志建议带:

1
topic, key, eventId, businessId, traceId

Consumer 日志建议带:

1
topic, partition, offset, key, groupId, eventId, businessId, traceId

Spring Kafka listener 可以接收 ConsumerRecord

1
2
3
4
5
6
7
8
9
10
11
12
13
@KafkaListener(topics = "trade.order.created", groupId = "coupon-service")
public void onMessage(ConsumerRecord<String, OrderCreatedEvent> record) {
OrderCreatedEvent event = record.value();

log.info("consume event, topic={}, partition={}, offset={}, key={}, eventId={}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
event.eventId());

couponApplicationService.issueCouponForOrder(event);
}

18. 本地怎么练习

18.1 Docker Compose 启动 Kafka

下面是一个单节点 Kafka 示例,适合本地学习。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
services:
kafka:
image: apache/kafka:4.0.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

启动:

1
docker compose up -d

18.2 创建 topic

1
2
3
4
5
6
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic trade.order.created \
--partitions 3 \
--replication-factor 1

查看 topic:

1
2
3
4
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic trade.order.created

18.3 命令行生产和消费

生产:

1
2
3
4
5
docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic trade.order.created \
--property parse.key=true \
--property key.separator=:

输入:

1
10086:{"eventId":"e1","orderId":10086,"userId":9527,"amount":35.50}

消费:

1
2
3
4
5
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic trade.order.created \
--group coupon-service-local \
--from-beginning

18.4 看 consumer group

1
2
3
4
docker exec -it kafka /opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group coupon-service-local

重点看:

1
2
3
CURRENT-OFFSET
LOG-END-OFFSET
LAG

19. Spring Boot 测试怎么写

Spring 官方提供 @EmbeddedKafka,可以在测试中启动嵌入式 Kafka。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.example.coupon.consumer;

import com.example.order.event.OrderCreatedEvent;
import java.math.BigDecimal;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(
topics = "trade.order.created",
partitions = 3,
bootstrapServersProperty = "spring.kafka.bootstrap-servers"
)
class OrderCreatedConsumerTest {

@Autowired
private KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;

@Test
void shouldIssueCouponWhenOrderCreated() {
OrderCreatedEvent event = new OrderCreatedEvent(
UUID.randomUUID().toString(),
"1.0",
10086L,
9527L,
new BigDecimal("35.50"),
OffsetDateTime.now()
);

kafkaTemplate.send("trade.order.created", event.orderId().toString(), event);

// 实际项目中可以用 Awaitility 等待异步消费完成,然后断言数据库记录。
}
}

异步测试建议用 Awaitility:

1
2
3
4
await().atMost(Duration.ofSeconds(5))
.untilAsserted(() -> {
assertThat(issueRecordRepository.existsByEventId(event.eventId())).isTrue();
});

20. 接手公司 Kafka 代码时的阅读路线

20.1 先找配置

搜索:

1
2
3
4
5
6
7
spring.kafka
bootstrap-servers
group-id
KafkaProperties
ConcurrentKafkaListenerContainerFactory
DefaultErrorHandler
DeadLetterPublishingRecoverer

看清楚:

  • 连接哪个 Kafka 集群。
  • consumer group 是什么。
  • 序列化方式是什么。
  • 是否自动提交 offset。
  • 重试和死信怎么配置。
  • 并发数是多少。

20.2 再找生产者

搜索:

1
2
3
KafkaTemplate
.send(
ProducerRecord

每个发送点问:

  • 发送到哪个 topic?
  • key 是什么?
  • 消息体是什么?
  • 发送失败怎么处理?
  • 是否在事务里?
  • 是否有 outbox 或可靠消息机制?

20.3 再找消费者

搜索:

1
2
3
4
@KafkaListener
MessageListener
ConsumerRecord
Acknowledgment

每个消费者问:

  • topic 是什么?
  • groupId 是什么?
  • 是否批量消费?
  • 是否手动 ack?
  • 是否幂等?
  • 失败后重试还是进死信?
  • 处理逻辑是否调用外部系统?

20.4 最后看运维和平台约束

问师兄师姐:

  • topic 由谁创建?
  • 分区数能不能改?
  • 消息保留多久?
  • 有没有统一 MQ 平台?
  • 死信在哪里看?
  • 消息能不能重投?
  • 消费积压在哪看?
  • 是否有 topic 命名规范?
  • 是否有 schema 规范?

21. 常见坑位清单

坑 1:本地消费者抢了测试环境消息

原因:本地用了和测试服务一样的 group-id

解决:

1
2
3
4
spring:
kafka:
consumer:
group-id: coupon-service-local-yourname

坑 2:以为 Kafka 不会重复消费

Kafka 业务落地通常按至少一次处理。消费者必须幂等。

坑 3:消费失败后 catch 住异常不抛

错误:

1
2
3
4
5
try {
handle(event);
} catch (Exception ex) {
log.error("failed", ex);
}

这样 listener 可能认为处理成功并提交 offset,消息就丢给业务了。

更合理:

1
2
3
4
5
6
try {
handle(event);
} catch (Exception ex) {
log.error("failed", ex);
throw ex;
}

让统一错误处理器决定重试和死信。

坑 4:没有 key,导致同一业务实体消息乱序

订单事件尽量用 orderId 作为 key,用户事件尽量用 userId

坑 5:把慢 RPC 放在消费线程里无限重试

这样会阻塞 partition,导致积压。应该有限重试,失败进 retry topic 或 DLT。

坑 6:改消息字段不通知下游

Kafka topic 可能有很多消费者。字段变更要兼容,最好有 schema 规范和版本管理。

坑 7:partition 数小于期望并发

同一个 consumer group 内,消费并发上限受 partition 数限制。只有 3 个 partition,部署 20 个实例也不会有 20 个实例同时消费这个 topic。

坑 8:死信队列无人处理

DLT 不是终点。要有告警、排查、修复、重投机制。


22. 面试和实习中常被问的问题

22.1 Kafka 为什么快?

可以从这些角度答:

  • 顺序写磁盘。
  • Page Cache。
  • 批量发送和批量拉取。
  • 零拷贝相关优化。
  • 分区并行。
  • 消费者按 offset 顺序读取,状态维护轻。

不用一上来背源码,先讲清楚设计思路。

22.2 Kafka 如何保证消息不丢?

生产侧:

  • acks=all
  • retries > 0
  • enable.idempotence=true
  • 关键业务使用 outbox 或事务消息方案

Broker 侧:

  • replication factor 通常大于 1
  • min.insync.replicas 合理配置
  • 禁止不安全 leader 选举

消费侧:

  • 业务处理成功后再提交 offset
  • 消费失败重试
  • 死信兜底

22.3 Kafka 如何保证顺序?

  • Kafka 只保证单 partition 内顺序。
  • 相同 key 通常进入同一 partition。
  • 如果要同一订单有序,用 orderId 做 key。
  • 不要在消费者内部破坏顺序。
  • 用业务状态机兜底。

22.4 Kafka 会不会重复消费?

会。按至少一次理解。消费者必须幂等。

22.5 消息积压怎么办?

先定位:

  • 是所有 partition 都积压,还是某个 partition 积压?
  • 消费者是否报错重试?
  • 下游 DB/RPC 是否慢?
  • 消费者实例数是否不足?
  • partition 数是否不足?
  • 是否有热点 key?

再处理:

  • 修复异常。
  • 扩消费者实例。
  • 提高批处理效率。
  • 优化下游。
  • 必要时扩 partition。
  • 对热点 key 做业务拆分。

22.6 Kafka 和 RabbitMQ 有什么区别?

粗略理解:

  • RabbitMQ 更像传统消息队列,强调队列、路由、确认、复杂路由模型。
  • Kafka 更像分布式事件日志,强调高吞吐、持久化、可回放、多订阅方、流处理。

公司选择不一定是技术绝对优劣,也和历史包袱、平台建设、团队经验有关。


23. 给入职前 2 周的学习路线

第 1 到 2 天:概念和命令行

  • 理解 topic、partition、offset、consumer group。
  • 用 Docker 启动 Kafka。
  • 用命令行生产消费消息。
  • 查看 consumer group lag。

第 3 到 5 天:Spring Boot 生产消费

  • 写一个 producer API。
  • 写一个 @KafkaListener
  • 用 JSON 序列化对象。
  • eventId 并做幂等。
  • 故意抛异常观察重试。

第 6 到 8 天:可靠消息和 outbox

  • 写订单表和 outbox 表。
  • 本地实现定时发布 outbox。
  • 模拟 Kafka 发送失败。
  • 理解“数据库事务”和“消息发送”之间的不一致问题。

第 9 到 11 天:重试、死信、补偿

  • 配置 DefaultErrorHandler
  • 配置 DLT。
  • 写一个死信重投接口或脚本。
  • 学会看失败日志里的 topic、partition、offset。

第 12 到 14 天:架构复盘

  • 画一个订单事件驱动架构图。
  • 练习回答:为什么要异步?为什么要幂等?为什么会重复?积压怎么办?
  • 看一个开源 Spring Kafka demo 或公司入职资料。

24. 你在公司里应该怎么问问题

刚入职时不要害怕问,但问题要尽量具体。

不太好的问法:

1
Kafka 这里我看不懂。

更好的问法:

1
2
3
我看到 coupon-service 监听 trade.order.created,groupId 是 coupon-service。
这个消费者里用 eventId 做了唯一键幂等,失败后会进 trade.order.created.DLT。
我想确认一下:这个 DLT 是平台自动告警,还是需要我们服务自己做重投入口?

这种问法会让别人觉得你已经认真读过代码,只是缺少上下文。


25. 最后总结:记住这张脑图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Kafka 公司落地

核心概念
topic: 事件分类
partition: 并行和顺序单位
offset: 消费进度
group-id: 消费者组
key: 分区路由和局部顺序

生产者
KafkaTemplate
key 选择
acks=all
enable.idempotence=true
发送失败处理
outbox 保证数据库和消息最终一致

消费者
@KafkaListener
业务幂等
成功后提交 offset
有限重试
DLT 兜底
监控 lag

架构
事件表达事实
主链路做核心动作
下游异步订阅
最终一致
schema 兼容演进

排障
topic / partition / offset / key
group lag
retry count
DLT
traceId

你现在不需要把 Kafka 所有底层细节都背下来。实习接手业务代码,最重要的是形成这几个条件反射:

  1. 看到 producer,问:消息是否可靠发出?key 选得对不对?
  2. 看到 consumer,问:是否幂等?失败怎么处理?
  3. 看到 group-id,问:这是独立订阅还是同组扩容?
  4. 看到 partition,问:并发和顺序是否符合业务?
  5. 看到 offset,问:处理成功和提交进度之间有没有风险?
  6. 看到 DLT,问:有没有告警和重投机制?

把这些想清楚,你就已经比“只会调 API”的新手稳很多了。