沉梦听雨的编程指南 沉梦听雨的编程指南
首页
  • 基础篇
  • 集合篇
  • 并发篇
  • JVM
  • 新特性
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 基础篇
  • MySql
  • Redis
  • 达梦数据库
  • Spring
  • SpringBoot
  • Mybatis
  • Shiro
  • 设计须知
  • UML画图
  • 权限校验
  • 设计模式
  • API网关
  • RPC
  • 消息队列
  • SpringCloud
  • 分布式事务
  • 云存储
  • 搜索引擎
  • 多媒体框架
  • 虚拟机
  • 开发工具篇
  • 工具库篇
  • 开发技巧篇
  • 工具类系列
  • 随笔
  • 前端环境搭建
  • HTML与CSS
  • JS学习
  • Axios入门
  • Vue Router入门
  • Pinia入门
  • Vue3入门
  • Vue3进阶
  • 黑马Vue3
  • 脚手架搭建
  • 瑞吉外卖
  • 黑马点评
  • vue-blog
  • 沉梦接口开放平台
  • 用户中心
  • 聚合搜索平台
  • 仿12306项目
  • 壁纸小程序项目
  • RuoYi-Vue
  • 博客搭建
  • 网站收藏箱
  • 断墨寻径摘录
  • 费曼学习法
Github (opens new window)

沉梦听雨

时间是最好的浸渍剂,而沉淀是最好的提纯器🚀
首页
  • 基础篇
  • 集合篇
  • 并发篇
  • JVM
  • 新特性
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 基础篇
  • MySql
  • Redis
  • 达梦数据库
  • Spring
  • SpringBoot
  • Mybatis
  • Shiro
  • 设计须知
  • UML画图
  • 权限校验
  • 设计模式
  • API网关
  • RPC
  • 消息队列
  • SpringCloud
  • 分布式事务
  • 云存储
  • 搜索引擎
  • 多媒体框架
  • 虚拟机
  • 开发工具篇
  • 工具库篇
  • 开发技巧篇
  • 工具类系列
  • 随笔
  • 前端环境搭建
  • HTML与CSS
  • JS学习
  • Axios入门
  • Vue Router入门
  • Pinia入门
  • Vue3入门
  • Vue3进阶
  • 黑马Vue3
  • 脚手架搭建
  • 瑞吉外卖
  • 黑马点评
  • vue-blog
  • 沉梦接口开放平台
  • 用户中心
  • 聚合搜索平台
  • 仿12306项目
  • 壁纸小程序项目
  • RuoYi-Vue
  • 博客搭建
  • 网站收藏箱
  • 断墨寻径摘录
  • 费曼学习法
Github (opens new window)
  • API网关

  • RPC

  • 消息队列

    • 内存

    • Redis

    • RocketMQ

      • RocketMQ入门
      • SpringBoot整合RocketMQ
        • 1、引入依赖
        • 2、编写配置文件
        • 3、编写生产者发送普通消息
        • 4、编写单元测试发送消息
        • 5、创建消费者程序
        • 6、发送事务消息
        • 7、编写单元测试发送事务消息
      • RocketMQ源码解析
    • Kafka

  • Spring Cloud

  • 分布式事务

  • 云存储

  • 搜索引擎

  • 多媒体框架

  • 虚拟机

  • 微服务
  • 消息队列
  • RocketMQ
沉梦听雨
2023-07-11
目录

SpringBoot整合RocketMQ

# SpringBoot 整合 RocketMQ

SpringBoot 提供了快捷操作 RocketMQ 的 RocketMQTemplate 对象。

# 1、引入依赖

注意依赖的版本需要和 RocketMQ 的版本相同。

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
1
2
3
4
5

# 2、编写配置文件

application.properties 版本:

# 应用名称
spring.application.name=demo11-message-spring-rocketmq
# 应⽤服务 WEB 访问端⼝
server.port=9111
# nameserver地址
rocketmq.name-server=192.168.207.129:9876
# 配置⽣产者组
rocketmq.producer.group=producer-demo-group1
1
2
3
4
5
6
7
8

application.yml 版本:

spring:
  application:
    name: demo11-message-spring-rocketmq
server:
  port: 9111

rocketmq:
  # nameserver地址
  name-server: 192.168.207.129:9876
  producer:
    # 配置⽣产者组
    group: producer-demo-group1
1
2
3
4
5
6
7
8
9
10
11
12

# 3、编写生产者发送普通消息

@Slf4j
@Component
public class MyProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送消息
     *
     * @param topic   主题
     * @param message 消息
     */
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 4、编写单元测试发送消息

@Slf4j
@SpringBootTest
class MyProducerTest {

    @Resource
    private MyProducer myProducer;

    @Test
    void testSendMessage() {
        String topic = "my-boot-topic";
        String message = "hello rocketmq springboot message!";
        myProducer.sendMessage(topic, message);
        log.info("消息发送成功!");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

如果遇到报错:

  • sendDefaultImpl 呼叫超时
Caused by: org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:717)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1426)
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:370)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:688)
1
2
3
4
5

原因分析:

  • 启动 namesrv,broke 没有指定 ip 或者是没开启。
  • 或者 ip 配置错误

image

运行成功后,可视化控制台:

image

# 5、创建消费者程序

新建一个模块,在里面编写消费者代码(也可以在生产者模块中编写)

  1. 引入依赖

  2. 编写配置文件

    #(如果生产者和消费者在同一个模块,这块不用写)
    # 应用名称
    spring.application.name=my-boot-consumer-demo
    # 应⽤服务 WEB 访问端⼝
    server.port=8081
    # nameserver地址
    rocketmq.name-server=192.168.194.134:9876
    
    1
    2
    3
    4
    5
    6
    7
  3. 编写消费者类

    @Slf4j
    @Component
    @RocketMQMessageListener(consumerGroup = "my-boot-consumer-group1",topic = "my-boot-topic")
    public class MyConsumer implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String message) {
            log.info("收到的消息:{}", message);
        }
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

运行结果:

image

# 6、发送事务消息

  1. 在生产者类里编写方法

        /**
         * 发送事务消息
         *
         * @param topic 主题
         * @param msg   消息
         * @throws InterruptedException 中断异常
         */
        public void sendMessageInTransaction(String topic, String msg) throws InterruptedException {
            String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 10; i++) {
                // 创建一个消息对象,并通过调用withPayload()方法,向消息对象中添加了一个负载,即要发送的字符串类型的数据
                Message<String> message = MessageBuilder.withPayload(msg).build();
                // topic 和 tag 整合成一个字符串
                String destination = topic + ":" + tags[i % tags.length];
                // 第一个destination是消息要发送的目的地topic,第二个destination是消息携带的业务数据
                TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
                log.info("[sendMessageInTransaction]sendResult: {}", sendResult);
                // 暂停10毫秒,以模拟消息的发送和处理过程所需要的时间
                Thread.sleep(10);
            }
        }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
  2. 重写事务监听器

    @Slf4j
    @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
    public class MyTransactionListener implements RocketMQLocalTransactionListener {
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
            String destination = (String) arg;
            // 把spring的message转换成RocketMQ的message
            org.apache.rocketmq.common.message.Message message1 = RocketMQUtil.convertToRocketMessage(
                    new StringMessageConverter(),
                    "utf-8",
                    destination,
                    message
            );
            // 获取message1上的tag对内容
            String tags = message1.getTags();
            if (StringUtils.contains(tags, "TagA")) {
                log.info("执行本地事务,提交事务");
                // 返回提交事务状态,表示允许消费者消费该消息
                return RocketMQLocalTransactionState.COMMIT;
            } else if (StringUtils.contains(tags, "TagB")) {
                log.info("执行本地事务,回滚事务");
                // 返回回滚事务状态,表示该消息将被删除,不允许消费
                return RocketMQLocalTransactionState.ROLLBACK;
            } else {
                log.info("执行本地事务,中间状态");
                // 返回中间状态,表示需要回查才能确定状态
                return RocketMQLocalTransactionState.UNKNOWN;
            }
        }
    
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            return null;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36

# 7、编写单元测试发送事务消息

    @Test
    void testSendMessageInTransaction() throws InterruptedException {
        String topic = "my-boot-topic";
        String message = "hello rocketmq transaction springboot message";
        myProducer.sendMessageInTransaction(topic, message);
        log.info("事务消息发送成功!");
    }
1
2
3
4
5
6
7

运行结果:

2025-05-20 11:42:51.071  INFO 17900 --- [           main] c.c.project.mq.MyTransactionListener     : 执行本地事务,提交事务
2025-05-20 11:42:51.074  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducer       : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4CEA0000, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=0], queueOffset=100]
2025-05-20 11:42:51.095  INFO 17900 --- [           main] c.c.project.mq.MyTransactionListener     : 执行本地事务,回滚事务
2025-05-20 11:42:51.096  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducer       : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4ECD0002, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=1], queueOffset=101]
2025-05-20 11:42:51.118  INFO 17900 --- [           main] c.c.project.mq.MyTransactionListener     : 执行本地事务,中间状态
2025-05-20 11:42:51.118  INFO 17900 --- [MessageThread_1] com.chenmeng.project.mq.MyConsumer       : 收到的消息:hello rocketmq transaction springboot message
2025-05-20 11:42:51.120  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducer       : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4EE30006, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=2], queueOffset=102]
2025-05-20 11:42:51.164  INFO 17900 --- [           main] c.c.project.mq.MyTransactionListener     : 执行本地事务,中间状态
2025-05-20 11:42:51.165  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducer       : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4EFF000A, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=3], queueOffset=103]
2025-05-20 11:42:51.180  INFO 17900 --- [           main] c.c.project.mq.MyTransactionListener     : 执行本地事务,中间状态
2025-05-20 11:42:51.181  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducer       : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4F27000F, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=0], queueOffset=104]
2025-05-20 11:42:51.203  INFO 17900 --- [           main] c.c.project.mq.MyTransactionListener     : 执行本地事务,提交事务
2025-05-20 11:42:51.204  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducer       : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4F390012, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=1], queueOffset=105]
2025-05-20 11:42:51.219  INFO 17900 --- [           main] c.c.project.mq.MyTransactionListener     : 执行本地事务,回滚事务
2025-05-20 11:42:51.220  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducer       : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4F4E0015, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=2], queueOffset=106]
2025-05-20 11:42:51.233  INFO 17900 --- [MessageThread_2] com.chenmeng.project.mq.MyConsumer       : 收到的消息:hello rocketmq transaction springboot message
2025-05-20 11:42:51.235  INFO 17900 --- [           main] c.c.project.mq.MyTransactionListener     : 执行本地事务,中间状态
2025-05-20 11:42:51.236  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducer       : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4F5E0018, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=3], queueOffset=107]
2025-05-20 11:42:51.249  INFO 17900 --- [           main] c.c.project.mq.MyTransactionListener     : 执行本地事务,中间状态
2025-05-20 11:42:51.548  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducer       : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4F6E001E, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=0], queueOffset=108]
2025-05-20 11:42:51.562  INFO 17900 --- [           main] c.c.project.mq.MyTransactionListener     : 执行本地事务,中间状态
2025-05-20 11:42:51.563  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducer       : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C50A70021, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=1], queueOffset=109]
2025-05-20 11:42:51.573  INFO 17900 --- [           main] com.chenmeng.project.mq.MyProducerTest   : 事务消息发送成功!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
上次更新: 2025/5/20 17:42:11
RocketMQ入门
RocketMQ源码解析

← RocketMQ入门 RocketMQ源码解析→

Theme by Vdoing | Copyright © 2023-2025 沉梦听雨 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式