Kafka实现延迟消息
# Kafka 实现延迟消息
Kafka 本身不支持原生的延迟消息(不像 RocketMQ 内置了延迟队列),但可以通过多种方式来实现延迟消息。常见的方案如下:
# 1. 使用不同的 Topic 分区(最常见)
思路:
- 创建多个延迟队列 Topic,比如
delay-5s
、delay-10s
、delay-30s
,代表不同延迟时间的队列。 - 生产者按延迟时间把消息发送到对应的 Topic。
- 消费者监听并处理这些 Topic,延迟对应的秒数之后【线程睡眠
Thread.sleep(delay)
】转发到真正的业务主题real-topic
。
示例:
// 生产
String topic = "delay-10s"; // 发送到 10 秒延迟的队列
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message");
producer.send(record);
// 消费
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received delayed message: " + record.value());
// 3. 模拟延迟(等待 10 秒)
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 4. 发送到真正的业务 Topic
ProducerRecord<String, String> forwardRecord = new ProducerRecord<>(REAL_TOPIC, record.key(), record.value());
producer.send(forwardRecord);
System.out.println("Message forwarded to real-topic: " + record.value());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
- 适用于:固定延迟时间的场景,比如
5s、10s、30s
。 - 优点:简单易用,不需要额外组件。
- 缺点:如果延迟时间种类很多,Topic 可能会很多,管理复杂。
# 2. 采用定时任务 + 数据库存储
思路:
- 生产者把消息存到 数据库(如 MySQL)或者 Redis,并记录目标执行时间。
- 定时任务(如 Quartz、XXL-JOB)轮询数据库,判断是否到时间,然后再投递到 Kafka 进行消费。
示例:
生产者:
// 先存到数据库,设置目标消费时间 saveToDatabase("message1", System.currentTimeMillis() + 10000); // 10秒后发送
1
2定时任务:
List<Message> messages = queryReadyMessages(); // 查询到期的消息 for (Message msg : messages) { producer.send(new ProducerRecord<>("real-topic", msg.getContent())); deleteFromDatabase(msg.getId()); // 发送后删除 }
1
2
3
4
5
- 适用于:延迟时间不固定、消息量不大的情况。
- 优点:可以精准控制延迟时间,灵活性强。
- 缺点:依赖数据库或 Redis,效率受限。
# 3. 结合 Redis 的 ZSet(有序集合)
思路:
- 生产者将消息存入 Redis 的 ZSet,
score
设为 目标消费时间戳。 - 消费者轮询 Redis,取出时间到期的消息,然后发送到 Kafka 进行消费。
示例:
生产者:
// message:消息体,delayMillis:延迟时间 public void addDelayTask(String message, long delayMillis) { long executeTime = System.currentTimeMillis() + delayMillis; redisTemplate.opsForZSet().add("delay-queue", message, executeTime); System.out.println("任务添加:" + message + ",执行时间:" + executeTime); }
1
2
3
4
5
6
7消费者(定时轮询取出到期消息):
@Scheduled(fixedRate = 1000) // 每秒执行一次 public void consumeDelayTask() { long now = System.currentTimeMillis(); // 返回一个 Set<String> 集合,包含所有 score 在 [0, now] 之间的元素。 Set<String> messages = redisTemplate.opsForZSet().rangeByScore("delay-queue", 0, now); for (String msg : messages) { producer.send(new ProducerRecord<>("real-topic", msg)); // 可根据场景选择发送到目标主题 或者 直接处理消息 redisTemplate.opsForZSet().remove("delay-queue", msg); // 处理完删除 } }
1
2
3
4
5
6
7
8
9
10
11
- 适用于:高并发场景,且对小量延迟消息有需求。
- 优点:基于内存操作,性能高。
- 缺点:需要额外维护 Redis,消息量过大可能导致性能问题。
# 4. 使用 Kafka Streams + State Store
思路:
- Kafka Streams 允许我们使用时间窗口,可以让消息在流处理中等一段时间,然后再投递到目标 Topic。
示例:
KStream<String, String> stream = builder.stream("input-topic");
stream
.transform(() -> new DelayTransformer(10 * 1000)) // 延迟 10 秒
.to("output-topic");
1
2
3
4
2
3
4
- 适用于:流式计算场景,如订单超时处理。
- 优点:适合与 Kafka 生态集成,流式处理友好。
- 缺点:学习成本较高,对 Kafka Streams 有一定要求。
# 5. 结合 Flink 处理
如果你的架构里用到了 Flink,可以使用 Flink Timer 进行定时延迟处理:
- 接收 Kafka 消息 → Flink 设置定时器 → 到时间后发送回 Kafka。
ctx.timerService().registerProcessingTimeTimer(timestamp + 10000); // 10s后触发
1
- 适用于:大数据、实时计算场景。
- 优点:可以结合流处理做复杂逻辑。
- 缺点:需要 Flink 支持,架构要求高。
# 结论:如何选型?
方案 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
多个 Topic | 固定延迟(5s、10s、30s) | 简单易用 | Topic 过多不好管理 |
数据库 + 定时任务 | 低吞吐、灵活时间 | 可靠性高 | 依赖数据库,效率受限 |
Redis ZSet | 高并发、小量延迟消息 | 低延迟、性能高 | Redis 容量受限 |
Kafka Streams | 实时流处理 | 适配 Kafka 生态 | 学习成本较高 |
Flink Timer | 大数据流处理 | 适配 Flink | 需要 Flink |
# 最佳实践
- 如果是常见的业务(延迟 5s、10s):多 Topic 方案(简单易用)。
- 如果消息量大且时间灵活:Redis ZSet(高效)。
- 如果业务可靠性要求高:数据库 + 定时任务(可落地)。
- 如果是流式计算:Kafka Streams / Flink(强大但复杂)。
上次更新: 2025/3/20 16:42:28