SpringCloud Stream学习
# Spring Cloud Stream 学习
官方文档:
# 什么是 Spring Cloud Stream?
Spring Cloud Stream(SCS) 是一个用于构建消息驱动微服务的框架,它基于 Spring Boot,提供了一种简化的方式来处理消息和事件的传递。它旨在为不同消息代理(如 Kafka、RabbitMQ、Apache Kafka 等)提供统一的编程模型,使开发者能够更轻松地在微服务架构中使用消息通信。
以下是 Spring Cloud Stream 的一些关键概念和特性:
- Binder(绑定器):Binder 是 Spring Cloud Stream 的核心概念之一,它提供了与底层消息代理之间的连接和交互。通过 Binder,Spring Cloud Stream 能够与不同的消息代理集成,例如 Kafka、RabbitMQ 等。每个消息代理都有自己的 Binder 实现,使开发者可以在不同的消息代理之间切换而无需修改应用代码。
- 消息通道(Message Channels):Spring Cloud Stream 通过消息通道来实现消息的发送和接收。应用程序可以通过绑定到消息通道来与消息代理进行交互。消息通道可以是输入通道(用于接收消息)或输出通道(用于发送消息)。
- 消息转换(Message Conversion):Spring Cloud Stream 会自动进行消息的序列化和反序列化,将消息从 Java 对象转换为消息代理支持的格式,以及将从消息代理接收的消息转换回 Java 对象。
- 发布-订阅模式:Spring Cloud Stream 支持发布-订阅模式,可以让多个消费者订阅同一个主题(topic)的消息,实现了一对多的消息通信。
- 消息分组(Message Grouping):消息分组可以将一组消费者组织在一起,共同处理相同分组 ID 的消息。这对于实现负载均衡和消息去重非常有用。
- 函数式编程模型:Spring Cloud Stream 鼓励使用函数式编程模型,通过定义处理消息的函数来实现业务逻辑。这种方式使得编写简洁、可测试的消息处理逻辑变得更加容易。
- 实时数据处理:Spring Cloud Stream 不仅用于消息传递,还可以用于实时数据处理。您可以在消息到达之后立即对其进行处理,从而支持实时分析、转换和处理。
总体而言,Spring Cloud Stream 简化了在微服务架构中使用消息传递的复杂性,提供了一种与消息代理集成的高级抽象,让开发者能够更专注于业务逻辑的实现。它的灵活性使得您能够轻松地在不同的消息代理之间切换,同时提供了强大的工具来处理消息和事件的传递,从而使您的微服务系统更具可扩展性和弹性。
消息中间件的切换只需要 更换依赖 即可。
Binder (opens new window),跟消息中间件集成的组件,用来创建对应的 Binding。
各消息中间件都有自己的 Binder 具体实现。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
# 讲讲 Kafka
本文以 Spring Cloud Stream + Kafka 为例编写。
Kafka 是一个开源的分布式流数据平台,最初由 LinkedIn 开发并捐赠给 Apache 软件基金会。它被设计用于处理高吞吐量、可持久化的实时数据流。Kafka 的主要目标是提供一种高效、可扩展、持久化的消息传递系统,能够处理大规模的实时数据流,同时保证数据的可靠性和可用性。
以下是 Kafka 的一些关键特性和概念:
- 发布-订阅模式:Kafka 采用发布-订阅模式,生产者(Publisher)将消息发布到主题(Topic),而消费者(Consumer)可以订阅一个或多个主题来接收消息。这使得多个消费者能够独立地从同一个主题订阅消息,实现一对多的消息传递。
- 分区和副本:Kafka 将每个主题分为多个分区,每个分区可以在不同的服务器上进行副本复制,从而提高可用性和容错性。分区和副本的组合允许 Kafka 处理大规模的消息流并保证数据的持久性。
- 持久化:Kafka 将消息以持久化的方式存储在磁盘上,确保消息在生产者发送和消费者接收之间不会丢失。消息被保存在分区中,可以根据需要保留一段时间,甚至可以通过配置来保留特定时间段的历史消息。
- 高吞吐量:Kafka 在处理消息时具有高吞吐量的能力,它能够同时处理成千上万的消息,适用于大规模的实时数据处理场景。
- 水平扩展:Kafka 支持水平扩展,可以通过添加新的服务器节点来增加吞吐量和存储容量,从而适应不断增长的数据量。
- 消息保序性:Kafka 保证在同一分区内的消息保持顺序,这对于一些需要按照顺序处理的场景非常重要。
- 流处理:Kafka 不仅用于消息传递,还可以用于实时流数据处理。Kafka Streams 是一个用于处理和分析流数据的库,可以在 Kafka 上进行流式处理,支持流数据的转换、聚合和计算等操作。
- 社区生态系统:Kafka 拥有丰富的社区生态系统,提供了许多与 Kafka 集成的工具和库,如消费者和生产者客户端、连接器(Connectors)用于将 Kafka 与其他数据源集成、Kafka 管理工具等。
总体而言,Kafka 是一个强大的分布式消息流平台,适用于许多实时数据处理和消息传递的应用场景。它的可靠性、高性能和可扩展性使得它成为构建大规模实时数据处理系统的重要组件之一。
# Spring Cloud Stream 和 Kafka 之间的联系和区别
Spring Cloud Stream 是一个用于构建基于 Spring Boot 的消息驱动微服务的框架,它提供了统一的编程模型和抽象来处理消息流,而 Kafka 是 Spring Cloud Stream 支持的消息中间件之一。
下面我们来讲一下它们之间的联系和区别:
联系:
消息驱动架构:Spring Cloud Stream 和 Kafka 都支持消息驱动架构,通过将消息作为信息传递的核心来构建应用程序。它们都支持发布-订阅模式,允许不同的微服务之间通过消息进行通信。
微服务和云原生:Spring Cloud Stream 是 Spring Cloud 生态系统中的一部分,专注于帮助开发人员构建云原生的微服务应用程序。Kafka 作为 Spring Cloud Stream 的一种消息中间件实现,与 Spring Cloud Stream 一起可以支持在微服务架构中使用消息传递来解耦微服务之间的通信。
可插拔的消息中间件:Spring Cloud Stream 提供了一个抽象层,使得在不同的消息中间件之间进行切换变得容易。它支持多种消息中间件,包括 Kafka、RabbitMQ 等,使开发人员可以根据实际需求选择合适的消息中间件。
区别:
- 定位和用途:
- Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,它提供了统一的编程模型和抽象来简化消息传递。
- 而 Kafka 是一个分布式流数据平台,专注于处理高吞吐量、可持久化的实时数据流。
- 功能广度:
- Kafka 是一个功能丰富的消息中间件,除了消息传递外,还提供了分区、副本、持久化、高吞吐量等特性。
- 而 Spring Cloud Stream 更加专注于在微服务架构中实现消息传递。
- 编程模型:
- Spring Cloud Stream 提供了更加抽象的编程模型,通过
Binder
将应用程序与消息中间件解耦。开发人员只需关注业务逻辑,而不必过多关注底层的消息传递细节。 - Kafka 则需要更多的配置和代码来实现消息的生产和消费。
- Spring Cloud Stream 提供了更加抽象的编程模型,通过
- 生态系统:
- Spring Cloud Stream 作为 Spring Cloud 生态系统的一部分,可以与其他 Spring Cloud 组件无缝集成,如服务发现、负载均衡等。
- Kafka 作为独立的消息中间件,可以在不同的技术栈中使用。
总的来说,Spring Cloud Stream 和 Kafka 都是用于处理消息的技术,但它们的定位和功能略有不同。
Spring Cloud Stream 提供了更加抽象和便捷的方式来构建消息驱动的微服务,
而 Kafka 提供了更丰富的特性来处理实时数据流。
在使用时,开发人员可以根据项目需求和技术栈的选择来决定是否使用 Spring Cloud Stream 以及选择哪种消息中间件。
# 实践
# 集成 Kafka
# 引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>stream-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<!-- SpringCloud版本兼容表:https://spring.io/projects/spring-cloud#overview -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 配置文件
- 这里是将【生产者】和【消费者】配置在了一个服务里面
- 实际开发也可以将【生产者】和【消费者】拆分成两个服务来配置
spring:
cloud:
stream:
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 #Kafka的消息中间件服务器
auto-create-topics: true #设置自动创建Topic
required-acks: 1 # 0-不等待任何确认。1-等待 leader 应答(默认)。all-等待所有 leader 和 follower 应答。
# Binding 配置项,对应 BindingProperties Map
bindings:
output:
destination: demo_topic #消息发往的目的地
content-type: text/plain #消息发送的格式,这里使用 text。发送字符串消息的时候需要配置,默认是 json格式
input:
destination: demo_topic #目的地
group: demo_consumer_group #消费者分组
# 自定义output - 生产
face_output:
destination: face_topic
content-type: text/plain
human_output:
destination: human_topic
content-type: application/json #默认是 application/json(可能会因版本而异)
vehicle_output:
destination: vehicle_topic
# 自定义input - 消费
face_input:
destination: face_topic
group: face_consumer_group
human_input:
destination: human_topic
group: human_consumer_group
vehicle_input:
destination: vehicle_topic
group: vehicle_consumer_group
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 自定义通道接口
/**
* 自定义(生产)通道 - 模仿source接口造轮子
*
* @author chenmeng
*/
public interface CustomSourceChannel {
String FACE_OUTPUT = "face_output";
String HUMAN_OUTPUT = "human_output";
String VEHICLE_OUTPUT = "vehicle_output";
@Output(FACE_OUTPUT)
MessageChannel faceOutput();
@Output(HUMAN_OUTPUT)
MessageChannel humanOutput();
@Output(VEHICLE_OUTPUT)
MessageChannel vehicleOutput();
}
/**
* 自定义(消费)通道 - 模仿sink接口造轮子
*
* @author chenmeng
*/
public interface CustomSinkChannel {
String FACE_INPUT = "face_input";
String HUMAN_INPUT = "human_input";
String VEHICLE_INPUT = "vehicle_input";
@Input(FACE_INPUT)
SubscribableChannel faceInput();
@Input(HUMAN_INPUT)
SubscribableChannel humanInput();
@Input(VEHICLE_INPUT)
SubscribableChannel vehicleInput();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
注意事项:
- 一般接口上不写
@Component
注解。 - 接口主要用于定义行为契约,而具体的实现通常由类来提供。
@Component
注解用于将类标识为 Spring 容器管理的组件,而不是接口。实际上,将@Component
注解放在接口上可能会引发问题。
踩坑小记:
有时候在指定类中使用 @Resource
注入的时候会报错 -- 创建不了这个接口 bean,但是使用 Spring 自带的 @Autowired
注解就会没问题。(可能是因为是两个注解默认注入 bean 的方式不一样引起的)
Autowired
默认的注入方式为byType
(根据类型进行匹配,Spring 原生注解),@Resource
默认注入方式为byName
(根据名称进行匹配,JSR-250 标准注解)
详细可参考:autowired-和-resource-的区别是什么 (opens new window)
# dto 类
@Data
public class FaceDTO {
private String faceId;
private String desc;
}
@Data
public class HumanDTO {
private String humanId;
private String desc;
}
@Data
public class VehicleDTO {
private String vehicleId;
private String desc;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 发送业务类(生产者)
source
- output
/**
* 发送业务类(生产者)
*
* @author chenmeng
*/
@EnableBinding(Source.class)
public class SendService {
@Resource
private Source source;
public void sendMsg(String msg) {
source.output()
.send(MessageBuilder
.withPayload(msg)
.build());
}
public void sendBody(Object object) {
source.output()
.send(MessageBuilder
.withPayload(object)
.build());
}
}
/**
* 自定义通道发送业务类(生产者)
*
* @author chenmeng
*/
@EnableBinding(value = {CustomSourceChannel.class})
public class CustomSendService {
@Resource
private CustomSourceChannel sourceChannel;
public void sendFaceMsg(String msg) {
sourceChannel.faceOutput()
.send(MessageBuilder
.withPayload(msg)
.build());
}
public void sendHumanMsg(HumanDTO msg) {
sourceChannel.humanOutput()
.send(MessageBuilder
.withPayload(msg)
.build());
}
public void sendVehicleMsg(VehicleDTO msg) {
sourceChannel.vehicleOutput()
.send(MessageBuilder
.withPayload(msg)
.build());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 接收业务类(消费者)
sink
- input
/**
* 接收业务类(消费者)
*
* @author chenmeng
*/
@EnableBinding(Sink.class)
public class ReceiveService {
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
System.out.println(payload);
}
}
/**
* 自定义通道接收业务类(消费者)
*
* @author chenmeng
*/
@Slf4j
@EnableBinding(CustomSinkChannel.class)
public class CustomReceiveService {
@StreamListener(CustomSinkChannel.FACE_INPUT)
public void receiveFace(Object payload) {
System.out.println("Face payload = " + payload);
}
@StreamListener(CustomSinkChannel.HUMAN_INPUT)
public void receiveHuman(HumanDTO payload) {
System.out.println("Human payload = " + payload);
}
@StreamListener(CustomSinkChannel.VEHICLE_INPUT)
public void receiveVehicle(VehicleDTO payload) {
log.info("Vehicle payload = {}", payload);
// 无论设置了多少次重试,最终都只会打印一次异常
throw new RuntimeException("异常测试 -- 重试机制");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 控制器 Controller
@RestController
@RequestMapping("/send")
public class StreamController {
private final SendService sendService;
private final CustomSendService customSendService;
/**
* 普通字符串消息发送
* localhost:port/send/hello
*
* @param msg 消息
*/
@GetMapping("/{msg}")
public String send(@PathVariable("msg") String msg){
sendService.sendMsg(msg);
return "发送成功";
}
/**
* 自定义消息发送
* localhost:port/send/face/hello
*
* @param msg 消息
*/
@GetMapping("/face/{msg}")
public String sendFaceMsg(@PathVariable("msg") String msg){
customSendService.sendFaceMsg(msg);
return "发送成功-Face";
}
/**
* 自定义请求体发送
* localhost:port/send/human
*
* @param dto 消息
*/
@PostMapping("/human")
public String sendHumanMsg(@RequestBody HumanDTO dto){
customSendService.sendHumanMsg(dto);
return "发送成功-Human";
}
/**
* 消费重试测试
* localhost:port/send/vehicle
*
* @param dto 消息
*/
@PostMapping("/vehicle")
public String sendVehicleMsg(@RequestBody VehicleDTO dto){
customSendService.sendVehicleMsg(dto);
return "发送成功-vehicle";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
其他写法参考
注意配置文件中 input-in-0
的写法
- https://blog.csdn.net/qq_42221396/article/details/128409521 (opens new window)
- Spring Cloud Stream 3.1以后的使用方法_org.springframework.cloud.stream.annotation.enable (opens new window)
# 消费重试
默认情况下,Spring Cloud Stream 遇到异常是不会自动发送到死信队列中(默认关闭),需要配置
# 配置文件
# 加上了死信队列配置
spring:
cloud:
stream:
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 #Kafka的消息中间件服务器
auto-create-topics: true #设置自动创建Topic
# 开启死信队列,需要设置成 all
required-acks: all # 0-不等待任何确认。1-等待 leader 应答(默认)。all-等待所有 leader 和 follower 应答。
# Kafka Binding 配置项,对应 KafkaBindingProperties 类
bindings:
vehicle_input:
# Kafka Consumer 配置项,对应 KafkaConsumerProperties 类
consumer:
enable-dlq: true # 是否开启死信队列,默认为 false 关闭
dlq-name: # 死信队列名,默认为 `errors.{topicName}.{consumerGroup}`
# Binding 配置项,对应 BindingProperties Map
bindings:
output:
destination: demo_topic #消息发往的目的地
content-type: text/plain #消息发送的格式,这里使用 text。发送字符串消息的时候需要配置,默认是 json格式
input:
destination: demo_topic #目的地
group: demo_consumer_group #消费者分组
# 自定义output - 生产
face_output:
destination: face_topic
content-type: text/plain
human_output:
destination: human_topic
content-type: application/json #默认是 application/json(可能会因版本而异)
vehicle_output:
destination: vehicle_topic
# 自定义input - 消费
face_input:
destination: face_topic
group: face_consumer_group
human_input:
destination: human_topic
group: human_consumer_group
vehicle_input:
destination: vehicle_topic
group: vehicle_consumer_group
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
max-attempts: 3 # 重试次数,默认为 3 次。包括第一次尝试,一共尝试消费 max-attempts 次。
back-off-initial-interval: 3000 # 重试间隔的初始值,单位毫秒,默认为 1000
back-off-multiplier: 2.0 # 重试间隔的递乘系数,默认为 2.0
back-off-max-interval: 10000 # 重试间隔的最大值,单位毫秒,默认为 10000
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 代码调试
调用上面示例中的 sendVehicleMsg
的接口测试即可。
# 消费异常处理机制
在 Spring Cloud Stream 中,提供了通用的消费异常处理机制,可以拦截到消费者消费消息时发生的异常,进行自定义的处理逻辑。
我们有两种方式来实现异常处理:
- 局部的异常处理:通过订阅指定错误 Channel
- 全局的异常处理:通过订阅全局错误 Channel
在全局和局部异常处理都定义的情况下,
- 错误消息仅会被符合条件的局部错误异常处理。
- 如果没有符合条件的,错误消息才会被全局异常处理。
代码示例如下:
// /**
// * 写法测试无效,暂时不知道原因
// * 局部的异常处理:通过订阅指定错误 Channel
// *
// * @param errorMessage
// */
// @ServiceActivator(inputChannel = "error.vehicle_topic.vehicle_consumer_group")
// public void handleError(ErrorMessage errorMessage) {
// log.error("[handleError][订阅到错误Channel:{}]", "error.vehicle_topic.vehicle_consumer_group");
// log.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage());
// log.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
// log.error("[handleError][headers:{}]", errorMessage.getHeaders());
// }
/**
* 全局的异常处理:通过订阅全局错误 Channel
*
* @param errorMessage
*/
@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
public void globalHandleError(ErrorMessage errorMessage) {
log.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage());
log.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
log.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
// 可存储到数据库,后续处理
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
一般如何处理死信队列的内容?
# 学习参考
视频地址:
博客地址:
Spring Cloud 系列之 Spring Cloud Stream - 风的姿态 - 博客园 (cnblogs.com) (opens new window)
Spring Cloud (十五)Stream 入门、主要概念与自定义消息发送与接收 - 东北小狐狸 - 博客园 (cnblogs.com) (opens new window)
Springcloud Stream详解及整合kafka - 简书 (jianshu.com) (opens new window)
芋道 Spring Cloud 消息队列 Kafka 入门 | 芋道源码 —— 纯源码解析博客 (opens new window)