SpringBoot整合RocketMQ
# SpringBoot 整合 RocketMQ
SpringBoot 提供了快捷操作 RocketMQ 的 RocketMQTemplate
对象。
# 1、引入依赖
注意依赖的版本需要和 RocketMQ 的版本相同。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
1
2
3
4
5
2
3
4
5
# 2、编写配置文件
application.properties
版本:
# 应用名称
spring.application.name=demo11-message-spring-rocketmq
# 应⽤服务 WEB 访问端⼝
server.port=9111
# nameserver地址
rocketmq.name-server=192.168.207.129:9876
# 配置⽣产者组
rocketmq.producer.group=producer-demo-group1
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
application.yml
版本:
spring:
application:
name: demo11-message-spring-rocketmq
server:
port: 9111
rocketmq:
# nameserver地址
name-server: 192.168.207.129:9876
producer:
# 配置⽣产者组
group: producer-demo-group1
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 3、编写生产者发送普通消息
@Slf4j
@Component
public class MyProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息
*
* @param topic 主题
* @param message 消息
*/
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 4、编写单元测试发送消息
@Slf4j
@SpringBootTest
class MyProducerTest {
@Resource
private MyProducer myProducer;
@Test
void testSendMessage() {
String topic = "my-boot-topic";
String message = "hello rocketmq springboot message!";
myProducer.sendMessage(topic, message);
log.info("消息发送成功!");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
如果遇到报错:
sendDefaultImpl
呼叫超时
Caused by: org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:717)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1426)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:370)
at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:688)
1
2
3
4
5
2
3
4
5
原因分析:
- 启动 namesrv,broke 没有指定 ip 或者是没开启。
- 或者 ip 配置错误
运行成功后,可视化控制台:
# 5、创建消费者程序
新建一个模块,在里面编写消费者代码(也可以在生产者模块中编写)
引入依赖
编写配置文件
#(如果生产者和消费者在同一个模块,这块不用写) # 应用名称 spring.application.name=my-boot-consumer-demo # 应⽤服务 WEB 访问端⼝ server.port=8081 # nameserver地址 rocketmq.name-server=192.168.194.134:9876
1
2
3
4
5
6
7编写消费者类
@Slf4j @Component @RocketMQMessageListener(consumerGroup = "my-boot-consumer-group1",topic = "my-boot-topic") public class MyConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("收到的消息:{}", message); } }
1
2
3
4
5
6
7
8
9
10
11
运行结果:
# 6、发送事务消息
在生产者类里编写方法
/** * 发送事务消息 * * @param topic 主题 * @param msg 消息 * @throws InterruptedException 中断异常 */ public void sendMessageInTransaction(String topic, String msg) throws InterruptedException { String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { // 创建一个消息对象,并通过调用withPayload()方法,向消息对象中添加了一个负载,即要发送的字符串类型的数据 Message<String> message = MessageBuilder.withPayload(msg).build(); // topic 和 tag 整合成一个字符串 String destination = topic + ":" + tags[i % tags.length]; // 第一个destination是消息要发送的目的地topic,第二个destination是消息携带的业务数据 TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination); log.info("[sendMessageInTransaction]sendResult: {}", sendResult); // 暂停10毫秒,以模拟消息的发送和处理过程所需要的时间 Thread.sleep(10); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21重写事务监听器
@Slf4j @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate") public class MyTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { String destination = (String) arg; // 把spring的message转换成RocketMQ的message org.apache.rocketmq.common.message.Message message1 = RocketMQUtil.convertToRocketMessage( new StringMessageConverter(), "utf-8", destination, message ); // 获取message1上的tag对内容 String tags = message1.getTags(); if (StringUtils.contains(tags, "TagA")) { log.info("执行本地事务,提交事务"); // 返回提交事务状态,表示允许消费者消费该消息 return RocketMQLocalTransactionState.COMMIT; } else if (StringUtils.contains(tags, "TagB")) { log.info("执行本地事务,回滚事务"); // 返回回滚事务状态,表示该消息将被删除,不允许消费 return RocketMQLocalTransactionState.ROLLBACK; } else { log.info("执行本地事务,中间状态"); // 返回中间状态,表示需要回查才能确定状态 return RocketMQLocalTransactionState.UNKNOWN; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { return null; } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 7、编写单元测试发送事务消息
@Test
void testSendMessageInTransaction() throws InterruptedException {
String topic = "my-boot-topic";
String message = "hello rocketmq transaction springboot message";
myProducer.sendMessageInTransaction(topic, message);
log.info("事务消息发送成功!");
}
1
2
3
4
5
6
7
2
3
4
5
6
7
运行结果:
2025-05-20 11:42:51.071 INFO 17900 --- [ main] c.c.project.mq.MyTransactionListener : 执行本地事务,提交事务
2025-05-20 11:42:51.074 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducer : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4CEA0000, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=0], queueOffset=100]
2025-05-20 11:42:51.095 INFO 17900 --- [ main] c.c.project.mq.MyTransactionListener : 执行本地事务,回滚事务
2025-05-20 11:42:51.096 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducer : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4ECD0002, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=1], queueOffset=101]
2025-05-20 11:42:51.118 INFO 17900 --- [ main] c.c.project.mq.MyTransactionListener : 执行本地事务,中间状态
2025-05-20 11:42:51.118 INFO 17900 --- [MessageThread_1] com.chenmeng.project.mq.MyConsumer : 收到的消息:hello rocketmq transaction springboot message
2025-05-20 11:42:51.120 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducer : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4EE30006, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=2], queueOffset=102]
2025-05-20 11:42:51.164 INFO 17900 --- [ main] c.c.project.mq.MyTransactionListener : 执行本地事务,中间状态
2025-05-20 11:42:51.165 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducer : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4EFF000A, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=3], queueOffset=103]
2025-05-20 11:42:51.180 INFO 17900 --- [ main] c.c.project.mq.MyTransactionListener : 执行本地事务,中间状态
2025-05-20 11:42:51.181 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducer : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4F27000F, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=0], queueOffset=104]
2025-05-20 11:42:51.203 INFO 17900 --- [ main] c.c.project.mq.MyTransactionListener : 执行本地事务,提交事务
2025-05-20 11:42:51.204 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducer : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4F390012, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=1], queueOffset=105]
2025-05-20 11:42:51.219 INFO 17900 --- [ main] c.c.project.mq.MyTransactionListener : 执行本地事务,回滚事务
2025-05-20 11:42:51.220 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducer : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4F4E0015, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=2], queueOffset=106]
2025-05-20 11:42:51.233 INFO 17900 --- [MessageThread_2] com.chenmeng.project.mq.MyConsumer : 收到的消息:hello rocketmq transaction springboot message
2025-05-20 11:42:51.235 INFO 17900 --- [ main] c.c.project.mq.MyTransactionListener : 执行本地事务,中间状态
2025-05-20 11:42:51.236 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducer : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4F5E0018, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=3], queueOffset=107]
2025-05-20 11:42:51.249 INFO 17900 --- [ main] c.c.project.mq.MyTransactionListener : 执行本地事务,中间状态
2025-05-20 11:42:51.548 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducer : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C4F6E001E, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=0], queueOffset=108]
2025-05-20 11:42:51.562 INFO 17900 --- [ main] c.c.project.mq.MyTransactionListener : 执行本地事务,中间状态
2025-05-20 11:42:51.563 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducer : [sendMessageInTransaction]sendResult: SendResult [sendStatus=SEND_OK, msgId=240E06B001C00812000000000000085645EC61064425645C50A70021, offsetMsgId=null, messageQueue=MessageQueue [topic=my-boot-topic, brokerName=broker-a, queueId=1], queueOffset=109]
2025-05-20 11:42:51.573 INFO 17900 --- [ main] com.chenmeng.project.mq.MyProducerTest : 事务消息发送成功!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
上次更新: 2025/5/20 17:42:11