沉梦听雨的编程指南 沉梦听雨的编程指南
首页
  • 基础篇
  • 集合篇
  • 并发篇
  • JVM
  • 新特性
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 基础篇
  • MySql
  • Redis
  • 达梦数据库
  • Spring
  • SpringBoot
  • Mybatis
  • Shiro
  • 设计须知
  • UML画图
  • 权限校验
  • 设计模式
  • API网关
  • RPC
  • 消息队列
  • SpringCloud
  • 分布式事务
  • 云存储
  • 搜索引擎
  • 多媒体框架
  • 虚拟机
  • 开发工具篇
  • 工具库篇
  • 开发技巧篇
  • 工具类系列
  • 随笔
  • 前端环境搭建
  • HTML与CSS
  • JS学习
  • Vue3入门
  • Vue3进阶
  • 黑马Vue3
  • 脚手架搭建
  • 瑞吉外卖
  • 黑马点评
  • vue-blog
  • 沉梦接口开放平台
  • 用户中心
  • 聚合搜索平台
  • 仿12306项目
  • 壁纸小程序项目
  • RuoYi-Vue
  • 博客搭建
  • 网站收藏箱
  • 断墨寻径摘录
  • 费曼学习法
Github (opens new window)

沉梦听雨

时间是最好的浸渍剂,而沉淀是最好的提纯器🚀
首页
  • 基础篇
  • 集合篇
  • 并发篇
  • JVM
  • 新特性
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 基础篇
  • MySql
  • Redis
  • 达梦数据库
  • Spring
  • SpringBoot
  • Mybatis
  • Shiro
  • 设计须知
  • UML画图
  • 权限校验
  • 设计模式
  • API网关
  • RPC
  • 消息队列
  • SpringCloud
  • 分布式事务
  • 云存储
  • 搜索引擎
  • 多媒体框架
  • 虚拟机
  • 开发工具篇
  • 工具库篇
  • 开发技巧篇
  • 工具类系列
  • 随笔
  • 前端环境搭建
  • HTML与CSS
  • JS学习
  • Vue3入门
  • Vue3进阶
  • 黑马Vue3
  • 脚手架搭建
  • 瑞吉外卖
  • 黑马点评
  • vue-blog
  • 沉梦接口开放平台
  • 用户中心
  • 聚合搜索平台
  • 仿12306项目
  • 壁纸小程序项目
  • RuoYi-Vue
  • 博客搭建
  • 网站收藏箱
  • 断墨寻径摘录
  • 费曼学习法
Github (opens new window)
  • API网关

  • RPC

  • 消息队列

    • 内存

    • Redis

    • RocketMQ

    • Kafka

      • Kafka基础小结
      • Kafka的下载安装以及使用
      • SpringBoot使用Kafka示例
      • SpringCloud使用Kafka
      • Kafka实现延迟消息
        • 1. 使用不同的 Topic 分区(最常见)
        • 2. 采用定时任务 + 数据库存储
        • 3. 结合 Redis 的 ZSet(有序集合)
        • 4. 使用 Kafka Streams + State Store
        • 5. 结合 Flink 处理
        • 结论:如何选型?
        • 最佳实践
  • Spring Cloud

  • 分布式事务

  • 云存储

  • 搜索引擎

  • 多媒体框架

  • 虚拟机

  • 微服务
  • 消息队列
  • Kafka
沉梦听雨
2025-03-20
目录

Kafka实现延迟消息

# Kafka 实现延迟消息

Kafka 本身不支持原生的延迟消息(不像 RocketMQ 内置了延迟队列),但可以通过多种方式来实现延迟消息。常见的方案如下:

# 1. 使用不同的 Topic 分区(最常见)

思路:

  • 创建多个延迟队列 Topic,比如 delay-5s、delay-10s、delay-30s,代表不同延迟时间的队列。
  • 生产者按延迟时间把消息发送到对应的 Topic。
  • 消费者监听并处理这些 Topic,延迟对应的秒数之后【线程睡眠 Thread.sleep(delay)】转发到真正的业务主题 real-topic。

示例:

// 生产
String topic = "delay-10s"; // 发送到 10 秒延迟的队列
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message");
producer.send(record);

// 消费
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received delayed message: " + record.value());

                // 3. 模拟延迟(等待 10 秒)
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // 4. 发送到真正的业务 Topic
                ProducerRecord<String, String> forwardRecord = new ProducerRecord<>(REAL_TOPIC, record.key(), record.value());
                producer.send(forwardRecord);
                System.out.println("Message forwarded to real-topic: " + record.value());
            }
        }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  • 适用于:固定延迟时间的场景,比如 5s、10s、30s。
  • 优点:简单易用,不需要额外组件。
  • 缺点:如果延迟时间种类很多,Topic 可能会很多,管理复杂。

# 2. 采用定时任务 + 数据库存储

思路:

  • 生产者把消息存到 数据库(如 MySQL)或者 Redis,并记录目标执行时间。
  • 定时任务(如 Quartz、XXL-JOB)轮询数据库,判断是否到时间,然后再投递到 Kafka 进行消费。

示例:

  1. 生产者:

    // 先存到数据库,设置目标消费时间
    saveToDatabase("message1", System.currentTimeMillis() + 10000); // 10秒后发送
    
    1
    2
  2. 定时任务:

    List<Message> messages = queryReadyMessages(); // 查询到期的消息
    for (Message msg : messages) {
        producer.send(new ProducerRecord<>("real-topic", msg.getContent()));
        deleteFromDatabase(msg.getId()); // 发送后删除
    }
    
    1
    2
    3
    4
    5
  • 适用于:延迟时间不固定、消息量不大的情况。
  • 优点:可以精准控制延迟时间,灵活性强。
  • 缺点:依赖数据库或 Redis,效率受限。

# 3. 结合 Redis 的 ZSet(有序集合)

思路:

  • 生产者将消息存入 Redis 的 ZSet,score 设为 目标消费时间戳。
  • 消费者轮询 Redis,取出时间到期的消息,然后发送到 Kafka 进行消费。

示例:

  1. 生产者:

    // message:消息体,delayMillis:延迟时间
    public void addDelayTask(String message, long delayMillis) {
        long executeTime = System.currentTimeMillis() + delayMillis;
        redisTemplate.opsForZSet().add("delay-queue", message, executeTime);
        System.out.println("任务添加:" + message + ",执行时间:" + executeTime);
    }
    
    
    1
    2
    3
    4
    5
    6
    7
  2. 消费者(定时轮询取出到期消息):

    @Scheduled(fixedRate = 1000) // 每秒执行一次
    public void consumeDelayTask() {
        long now = System.currentTimeMillis();
        // 返回一个 Set<String> 集合,包含所有 score 在 [0, now] 之间的元素。
        Set<String> messages = redisTemplate.opsForZSet().rangeByScore("delay-queue", 0, now);
    
        for (String msg : messages) {
            producer.send(new ProducerRecord<>("real-topic", msg)); // 可根据场景选择发送到目标主题 或者 直接处理消息
            redisTemplate.opsForZSet().remove("delay-queue", msg); // 处理完删除
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
  • 适用于:高并发场景,且对小量延迟消息有需求。
  • 优点:基于内存操作,性能高。
  • 缺点:需要额外维护 Redis,消息量过大可能导致性能问题。

# 4. 使用 Kafka Streams + State Store

思路:

  • Kafka Streams 允许我们使用时间窗口,可以让消息在流处理中等一段时间,然后再投递到目标 Topic。

示例:

KStream<String, String> stream = builder.stream("input-topic");
stream
    .transform(() -> new DelayTransformer(10 * 1000)) // 延迟 10 秒
    .to("output-topic");
1
2
3
4
  • 适用于:流式计算场景,如订单超时处理。
  • 优点:适合与 Kafka 生态集成,流式处理友好。
  • 缺点:学习成本较高,对 Kafka Streams 有一定要求。

# 5. 结合 Flink 处理

如果你的架构里用到了 Flink,可以使用 Flink Timer 进行定时延迟处理:

  • 接收 Kafka 消息 → Flink 设置定时器 → 到时间后发送回 Kafka。
ctx.timerService().registerProcessingTimeTimer(timestamp + 10000); // 10s后触发
1
  • 适用于:大数据、实时计算场景。
  • 优点:可以结合流处理做复杂逻辑。
  • 缺点:需要 Flink 支持,架构要求高。

# 结论:如何选型?

方案 适用场景 优点 缺点
多个 Topic 固定延迟(5s、10s、30s) 简单易用 Topic 过多不好管理
数据库 + 定时任务 低吞吐、灵活时间 可靠性高 依赖数据库,效率受限
Redis ZSet 高并发、小量延迟消息 低延迟、性能高 Redis 容量受限
Kafka Streams 实时流处理 适配 Kafka 生态 学习成本较高
Flink Timer 大数据流处理 适配 Flink 需要 Flink

# 最佳实践

  • 如果是常见的业务(延迟 5s、10s):多 Topic 方案(简单易用)。
  • 如果消息量大且时间灵活:Redis ZSet(高效)。
  • 如果业务可靠性要求高:数据库 + 定时任务(可落地)。
  • 如果是流式计算:Kafka Streams / Flink(强大但复杂)。
上次更新: 2025/3/20 16:42:28
SpringCloud使用Kafka
基础知识篇

← SpringCloud使用Kafka 基础知识篇→

Theme by Vdoing | Copyright © 2023-2025 沉梦听雨 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式