SpringBoot使用Kafka示例
# SpringBoot 使用 Kafka 示例
# Spring-Kafka
在 Spring 生态中,提供了 Spring-Kafka (opens new window) 项目,让我们更简便的使用 Kafka 。
其官网介绍如下:
The Spring for Apache Kafka (spring-kafka) project applies core Spring concepts to the development of Kafka-based messaging solutions.
It provides a "template" as a high-level abstraction for sending messages.
It also provides support for Message-driven POJOs with
@KafkaListener
annotations and a "listener container".These libraries promote the use of dependency injection and declarative.
In all of these cases, you will see similarities to the JMS support in the Spring Framework and RabbitMQ support in Spring AMQP.
Spring for Apache Kafka (spring-kafka) 项目将 Spring 核心概念应用于基于 Kafka 的消息传递解决方案的开发。
它提供了一个 “模板” 作为发送消息的高级抽象。
它还通过
@KafkaListener
注解和“侦听器容器(listener container)”为消息驱动的 POJO 提供支持。这些库促进了依赖注入和声明的使用。
在所有这些用例中,你将看到 Spring Framework 中的 JMS 支持,以及和 Spring AMQP 中的 RabbitMQ 支持的相似之处。
- 注意,Spring-Kafka 是基于 Spring Message (opens new window) 来实现 Kafka 的发送端和接收端。
Features(功能特性)
- KafkaTemplate (opens new window)
- KafkaMessageListenerContainer (opens new window)
- @KafkaListener (opens new window)
- KafkaTransactionManager (opens new window)
spring-kafka-test
jar with embedded kafka server(带嵌入式 Kafka 服务器的spring-kafka-test
jar 包)
# 快速入门
# 引入依赖
<dependencies>
<!-- 引入 Spring-Kafka 依赖 -->
<!-- 已经内置 kafka-clients 依赖,所以无需重复引入 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 实现对 JSON 的自动化配置 -->
<!-- 因为,Kafka 对复杂对象的 Message 序列化时,我们会使用到 JSON -->
<!--
同时,spring-boot-starter-json 引入了 spring-boot-starter ,而 spring-boot-starter 又引入了 spring-boot-autoconfigure 。
spring-boot-autoconfigure 实现了 Spring-Kafka 的自动化配置
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<!-- 方便等会写单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 应用配置文件
核心内容:
spring:
# Kafka 配置项,对应 KafkaProperties 配置类
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
# Kafka Producer 配置项
producer:
acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
retries: 3 # 发送失败时,重试发送的次数
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
# Kafka Consumer 配置项
consumer:
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.chenmeng.project.message
# Kafka Consumer Listener 监听器配置
listener:
concurrency: 10 # 每个消费者监听器最大并发数,默认为 1 。可以通过设置 n ,这样对于每个监听器就会使用 n 个线程消费消息,提高整体消费速度。详细可参考博客 https://www.jianshu.com/p/ad0e5424edbd 理解。
logging:
level:
org:
springframework:
kafka: ERROR # spring-kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别
apache:
kafka: ERROR # kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 消息类
/**
* 示例 01 的 Message 消息
*
* @author chenmeng
*/
@Data
public class Demo01Message {
public static final String TOPIC = "DEMO_01";
/**
* 编号
*/
private Integer id;
}
/**
* 示例 04 的 Message 消息
*
* @author chenmeng
*/
@Data
public class Demo04Message {
public static final String TOPIC = "DEMO_04";
/**
* 编号
*/
private Integer id;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 生产者
@Component
public class Demo01Producer {
@Resource
private KafkaTemplate<Object, Object> kafkaTemplate;
/**
* 同步发送消息
*
* @param id
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public SendResult<Object, Object> syncSend(Integer id) throws ExecutionException, InterruptedException {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// 同步发送消息
return kafkaTemplate.send(Demo01Message.TOPIC, message).get();
}
/**
* 异步发送消息
*
* @param id
* @return 一个可以通过监听执行结果的 Future 增强
*/
public ListenableFuture<SendResult<Object, Object>> asyncSend(Integer id) {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// 异步发送消息
return kafkaTemplate.send(Demo01Message.TOPIC, message);
}
}
@Component
public class Demo04Producer {
@Resource
private KafkaTemplate<Object, Object> kafkaTemplate;
public SendResult<Object, Object> syncSend(Integer id) throws ExecutionException, InterruptedException {
// 创建 Demo04Message 消息
Demo04Message message = new Demo04Message();
message.setId(id);
// 同步发送消息
return kafkaTemplate.send(Demo04Message.TOPIC, message).get();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 消费者
@Component
public class Demo01Consumer {
private final Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo01Message.TOPIC,
groupId = "demo01-com.chenmeng.project.consumer-group-" + Demo01Message.TOPIC)
public void onMessage(Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
@Component
public class Demo04Consumer {
private final Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo04Message.TOPIC,
groupId = "demo04-com.chenmeng.project.consumer-group-" + Demo04Message.TOPIC)
public void onMessage(Demo04Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
throw new RuntimeException("这里故意抛出一个异常");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
代码解析:
@KafkaListener
注解,声明消费的 Topic 是"DEMO_01"
,消费者分组是"demo01-consumer-group-DEMO_01"
。- 一般情况下,我们建议一个
消费者分组
,仅消费一个Topic
。这样做会有个好处:每个消费者分组
职责单一,只消费一个Topic
。 - 虽然
@KafkaListener
注解是方法级别的,但还是建议一个类,对应一个方法(或者一个主题),消费消息。简单清晰。
# 测试类
@SpringBootTest
public class Demo01ProducerTest {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo01Producer producer;
@Test
public void testSyncSend() throws ExecutionException, InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult<Object, Object> result = producer.syncSend(id);
logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);
// 阻塞等待,保证消费(测试场景下的阻塞等待)
new CountDownLatch(1).await();
}
@Test
public void testASyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
producer.asyncSend(id).addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(@NotNull Throwable e) {
logger.info("[testASyncSend][发送编号:[{}] 发送异常]]", id, e);
}
@Override
public void onSuccess(SendResult<Object, Object> result) {
logger.info("[testASyncSend][发送编号:[{}] 发送成功,结果为:[{}]]", id, result);
}
});
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
testSyncSend()
执行结果
2025-02-24 16:25:48.684 INFO 20844 --- [ main] c.c.project.producer.Demo01ProducerTest : [testSyncSend][发送编号:[1740385548] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=DEMO_01, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 99, 104, 101, 110, 109, 101, 110, 103, 46, 112, 114, 111, 106, 101, 99, 116, 46, 109, 101, 115, 115, 97, 103, 101, 46, 68, 101, 109, 111, 48, 49, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Demo01Message(id=1740385548), timestamp=null), recordMetadata=DEMO_01-0@1]]]
2025-02-24 16:25:48.729 INFO 20844 --- [ntainer#0-0-C-1] c.c.project.consumer.Demo01AConsumer : [onMessage][线程编号:49 消息内容:ConsumerRecord(topic = DEMO_01, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1740385548614, serialized key size = -1, serialized value size = 17, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Demo01Message(id=1740385548))]
2025-02-24 16:25:48.736 INFO 20844 --- [ntainer#1-0-C-1] c.c.project.consumer.Demo01Consumer : [onMessage][线程编号:69 消息内容:Demo01Message(id=1740385548)]
2
3
testASyncSend()
执行结果
- 启动
testSyncSend()
后不关闭测试,可测集群消费模式
2025-02-24 16:30:50.063 INFO 28752 --- [ad | producer-1] c.c.project.producer.Demo01ProducerTest : [testASyncSend][发送编号:[1740385849] 发送成功,结果为:[SendResult [producerRecord=ProducerRecord(topic=DEMO_01, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 99, 104, 101, 110, 109, 101, 110, 103, 46, 112, 114, 111, 106, 101, 99, 116, 46, 109, 101, 115, 115, 97, 103, 101, 46, 68, 101, 109, 111, 48, 49, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Demo01Message(id=1740385849), timestamp=null), recordMetadata=DEMO_01-0@2]]]
2025-02-24 16:30:50.125 INFO 28752 --- [ntainer#0-0-C-1] c.c.project.consumer.Demo01AConsumer : [onMessage][线程编号:49 消息内容:ConsumerRecord(topic = DEMO_01, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1740385849959, serialized key size = -1, serialized value size = 17, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Demo01Message(id=1740385849))]
2025-02-24 16:30:50.131 INFO 28752 --- [ntainer#1-0-C-1] c.c.project.consumer.Demo01Consumer : [onMessage][线程编号:69 消息内容:Demo01Message(id=1740385849)]
2
3
# 集群消费
@Component
public class Demo01AConsumer {
private final Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo01Message.TOPIC,
groupId = "demo01-A-com.chenmeng.project.consumer-group-" + Demo01Message.TOPIC)
public void onMessage(ConsumerRecord<Integer, String> record) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), record);
}
// @KafkaListener(topics = Demo01Message.TOPIC,
// groupId = "demo01-B-com.chenmeng.project.consumer-group-" + Demo01Message.TOPIC)
// public void onMessage(ConsumerRecord<Integer, String> record) throws InterruptedException {
// logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), record.partition());
// Thread.sleep(10 * 1000L);
// Thread.sleep(1L);
// logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), record.partition());
// }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
集群消费(Clustering):
集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
- 也就是说,如果我们发送一条 Topic 为
"DEMO_01"
的消息,可以分别被"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都消费一次。 - 但是,如果我们启动两个该示例的实例,则消费者分组
"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都会有多个 Consumer 示例。此时,我们再发送一条 Topic 为"DEMO_01"
的消息,只会被"demo01-A-consumer-group-DEMO_01"
的一个 Consumer 消费一次,也同样只会被"demo01-A-consumer-group-DEMO_01"
的一个 Consumer 消费一次。
方法参数,设置消费的消息对应的类不是
Demo01Message
类,而是 Kafka 内置的ConsumerRecord
类。
- 通过
ConsumerRecord
类,我们可以获取到消费的消息的更多信息,例如说消息的所属队列、创建时间等等属性,不过消息的内容(value
)就需要自己去反序列化。- 当然,一般情况下,我们不会使用
ConsumerRecord
类。
# 消费重试
- 消费重试和死信队列,是
RocketMQ
自带的功能。- 而在
Kafka
中,消费重试和死信队列,是由Spring-Kafka
所封装提供的。
# 配置类
@Configuration
public class KafkaConfiguration {
/**
* 增加消费异常的 ErrorHandler 处理器
*
* @param template
* @return
*/
@Bean
@Primary // 标记为主要的错误处理器,Spring 在自动装配时会优先选择带有 @Primary 注解的 Bean
public DefaultErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
// 创建 DeadLetterPublishingRecoverer 对象:
// 该恢复器用于将处理失败的消息发送到一个死信队列(Dead Letter Queue),
// 这里使用 KafkaTemplate 作为参数,将失败的消息发送到另一个 Kafka 主题。
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
// 创建 FixedBackOff 对象:
// FixedBackOff 定义了重试策略,重试的间隔和最大重试次数。
// 这里设置了每次重试间隔为 10 秒(10 * 1000L),
// 最大重试次数为 3 次,即如果消费失败,最多会尝试 3 次。
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
// 创建 DefaultErrorHandler 对象:
// DefaultErrorHandler 是 Spring Kafka 用来处理消息消费失败的错误处理器,
// 它结合了消息重试和消息发送到死信队列的机制。
// 它使用上面创建的 recoverer 和 backOff 对象来定义处理失败消息时的行为。
return new DefaultErrorHandler(recoverer, backOff);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
配置解析:
- Spring-Kafka 的消费重试功能,通过实现自定义的
ErrorHandler
处理器,在 Consumer 消费消息异常的时候,进行拦截处理:- 在重试小于最大次数时,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
- 在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。例如说,本小节我们测试的 Topic 是
"DEMO_04"
,则其对应的死信队列的 Topic 就是"DEMO_04.DLT"
,即在原有 Topic 加上.DLT
后缀,就是其死信队列的 Topic 。
- 创建
DeadLetterPublishingRecoverer
对象,它负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。 - 一共会消费四次消息(包括首次消费),第三次重试消费,不会打印异常日志,直接发到死信队列。(消息头是异常信息)
# 消费进度的提交机制
原生 Kafka Consumer 消费端,有两种消费进度提交的提交机制:
- 【默认】自动提交,通过配置
spring.kafka.consumer.enable-auto-commit=true
,每过auto.commit.interval.ms
时间间隔,都会自动提交消费消费进度。而提交的时机,是在 Consumer 的#poll(...)
方法的逻辑里完成,在每次从 Kafka Broker 拉取消息时,会检查是否到达自动提交的时间间隔,如果是,那么就会提交上一次轮询拉取的位置。 - 手动提交,通过配置
spring.kafka.consumer.enable-auto-commit=false
,后续通过 Consumer 的#commitSync(...)
或#commitAsync(...)
方法,同步或异步提交消费进度。
ack 模式类:
public static enum AckMode {
// ========== 自动提交模式 ==========
/**
* RECORD:每处理完一条消息后,立即提交偏移量。
* 适用于需要精确控制每条消息处理结果的场景。
*/
RECORD, // 每条消息被消费完成后,自动提交
/**
* BATCH:(默认)在每次拉取消息后,处理完当前批次的所有消息后,统一提交偏移量。
* 适用于批量处理,能够提高性能,但可能导致部分消息处理失败时,已处理的消息仍被标记为已消费。
*/
BATCH, // 每一次消息被消费完成后,在下次拉取消息之前,自动提交
/**
* TIME:在处理完当前批次的所有消息后,若距离上次提交的时间超过设定的时间间隔,则提交偏移量。
* 适用于对延迟敏感的场景,能够在一定时间内提交偏移量。
*/
TIME, // 达到一定时间间隔后,自动提交
/**
* COUNT:在处理完当前批次的所有消息后,若已处理的消息数量达到设定的阈值,则提交偏移量。
* 适用于对吞吐量敏感的场景,能够在处理一定数量的消息后提交偏移量。
*/
COUNT, // 消费成功的消息数到达一定数量后,自动提交
/**
* COUNT_TIME:满足 TIME 或 COUNT 中任一条件时,提交偏移量。
* 适用于需要同时满足时间和数量条件的场景。
*/
COUNT_TIME, // TIME 和 COUNT 的结合体,满足任一都会自动提交。
// ========== 手动提交模式 ==========
/**
* MANUAL:消费者需要显式调用 Acknowledgment.acknowledge() 方法来提交偏移量。
* 适用于需要精确控制消息处理结果的场景,能够确保消息处理的可靠性。
*/
MANUAL, // 调用时,先标记提交消费进度。等到当前消息被消费完成,然后在提交消费进度。
/**
* MANUAL_IMMEDIATE:消费者调用 Acknowledgment.acknowledge() 方法后,立即提交偏移量。
* 适用于对延迟敏感的场景,能够在处理完当前消息后立即提交偏移量。
*/
MANUAL_IMMEDIATE; // 调用时,立即提交消费进度。
private AckMode() {
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
那么,既然现在存在原生 Kafka 和 Spring-Kafka 提供的两种消费进度的提交机制,我们应该怎么配置呢?
- 使用原生 Kafka 的方式,通过配置
spring.kafka.consumer.enable-auto-commit=true
。然后,通过spring.kafka.consumer.auto-commit-interval
设置自动提交的频率。 - 使用 Spring-Kafka 的方式,通过配置
spring.kafka.consumer.enable-auto-commit=false
。然后通过spring.kafka.listener.ack-mode
设置具体模式。- 另外,还有
spring.kafka.listener.ack-time
和spring.kafka.listener.ack-count
可以设置自动提交的时间间隔和消息条数。
- 另外,还有
默认什么都不配置的情况下,使用 Spring-Kafka 的 BATCH 模式:每一次消息被消费完成后,在下次拉取消息之前,自动提交。
- 在原生 Kafka 客户端中,
enable.auto.commit
的默认值为true
,即启用自动提交偏移量。- 然而,在 Spring Kafka 2.3 版本及以上,框架默认将
enable.auto.commit
设置为false
,除非在配置中显式设置。
# 更多示例
批量发送和消费消息
在一些业务场景下,我们希望使用 Consumer 批量消费消息,提高消费速度。
以下是加上了批量发送和批量消费相关的配置项
spring: # Kafka 配置项,对应 KafkaProperties 配置类 kafka: bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 # Kafka Producer 配置项 producer: acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。 retries: 3 # 发送失败时,重试发送的次数 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化 # 批量发送消息相关配置 batch-size: 16384 # 每次批量发送消息的最大数量 buffer-memory: 33554432 # 每次批量发送消息的最大内存 properties: linger: ms: 30000 # 批处理延迟时间上限。这里配置为 30 * 1000 ms 过后,不管是否消息数量是否到达 batch-size 或者消息大小到达 buffer-memory 后,都直接发送一次请求。 # Kafka Consumer 配置项 consumer: # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # 批量消费消息相关配置 fetch-max-wait: 10000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息 fetch-min-size: 10 # poll 一次消息拉取的最小数据量,单位:字节 max-poll-records: 100 # poll 一次消息拉取的最大数量 properties: spring: json: trusted: packages: com.chenmeng.project.message # Kafka Consumer Listener 监听器配置 listener: type: BATCH # 监听器类型,默认为 SINGLE ,只监听单条消息。这里我们配置 BATCH ,监听多条消息,批量消费 concurrency: 10 # 每个消费者监听器最大并发数,默认为 1 。可以通过设置 n ,这样对于每个监听器就会使用 n 个线程消费消息,提高整体消费速度。详细可参考博客 https://www.jianshu.com/p/ad0e5424edbd 理解。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
定时消息
- Kafka 并未提供定时消息的功能,需要我们自行拓展。
- 可以考虑基于 MySQL 存储定时消息,Job 扫描到达时间的定时消息,发送给 Kafka 。
广播消费
- 广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
- 不过 Kafka 并不直接提供内置的广播消费的功能!!!此时,我们只能退而求其次,每个 Consumer 独有一个 Consumer Group ,从而保证都能接收到全量的消息。
- 例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 Kafka 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。
- 又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 Kafka 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。
并发消费
- 我们配置的每一个
@KafkaListener
,都是串行消费的。显然,这在监听的 Topic 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。(可以通过启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度) - 添加
@KafkaListener(concurrency=2)
注解,会创建 2 个 Kafka Consumer 。后续,每个 Kafka Consumer 会被单独分配到一个线程中,进行拉取消息,消费消息。
- 我们配置的每一个
顺序消息
- 普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列。
- 完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。
事务消息
Kafka 内置提供事务消息的支持。(不过 Kafka 提供的并不是完整的的事务消息的支持,缺少了回查机制)
增加相关配置项
spring: # Kafka 配置项,对应 KafkaProperties 配置类 kafka: bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 # Kafka Producer 配置项 producer: # ... acks: all # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。 transaction-id-prefix: demo. # 事务编号前缀 # Kafka Consumer 配置项 consumer: # ... properties: # ... isolation: level: read_committed # 读取已提交的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16- 修改
spring.kafka.producer.acks=all
配置,不然在启动时会报"Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence."
错误。因为,Kafka 的事务消息需要基于幂等性来实现,所以必须保证所有节点都写入成功。 - 增加
transaction-id-prefix=demo.
配置,事务编号的前缀。需要保证相同应用配置相同,不同应用配置不同。具体可以看看《How to choose Kafka transaction id for several applications》 (opens new window)的讨论。 - 增加
spring.kafka.consumer.properties.isolation.level=read_committed
配置,Consumer 仅读取已提交的消息**。一定要配置!!!**
- 修改
使用 kafkaTemplate 提交的
#executeInTransaction(OperationsCallback<K, V, T> callback)
模板方法,实现在 Kafka 事务中,执行自定义 KafkaOperations.OperationsCallback 操作。- 在
#executeInTransaction(...)
方法中,我们可以通过 KafkaOperations (opens new window) 来执行发送消息等 Kafka 相关的操作,也可以执行自己的业务逻辑。 - 在
#executeInTransaction(...)
方法的开始,它会自动动创建 Kafka 的事务;然后执行我们定义的 KafkaOperations 的逻辑;如果成功,则提交 Kafka 事务;如果失败,则回滚 Kafka 事务。
- 在
如果 Kafka Producer 开启了事务的功能,则所有发送的消息,都必须处于 Kafka 事务之中,否则会抛出常。如果业务中,即存在需要事务的情况,也存在不需要事务的情况,需要分别定义两个 KafkaTemplate(Kafka Producer)或者其他写法。
Spring-Kafka 提供了对 Spring Transaction 的集成,所以在实际开发中,我们只需要配合使用
@Transactional
注解,来声明事务即可,而无需使用 KafkaTemplate 提供的#executeInTransaction(...)
模板方法。