Kafka基础小结
# Kafka 基础小结
# Kafka 工作原理
# 生产
消息经过序列化后,通过不同的分区策略,找到对应的分区。
相同主题和分区的消息,会被存放在同一个批次里,然后由一个独立的线程负责把它们发到 Kafka Broker 上。
分区的策略包括顺序轮询、随机轮询和 key hash 这 3 种方式,那什么是分区呢?
分区是 Kafka 读写数据的最小粒度,比如主题 A 有 15 条消息,有 5 个分区,如果采用顺序轮询的方式,15 条消息会顺序分配给这 5 个分区,后续消费的时候,也是按照分区粒度消费。
由于分区可以部署在多个不同的机器上,所以可以通过分区实现 Kafka 的伸缩性,比如主题 A 的 5 个分区,分别部署在 5 台机器上,如果下线一台,分区就变为 4。
# 消费
- Kafka 消费是通过消费群组完成,【同一个】消费者群组,一个消费者可以消费多个【分区】,但是一个【分区】,只能被一个消费者消费。
- 如果消费者增加,会触发 Rebalance,也就是分区和消费者需要重新配对。
- 不同的消费群组互不干涉。
# 分区
# 消费概念
分区是最小的并行单位
- 一个消费者可以消费多个分区
- 一个分区可以被多个消费者组里的消费者消费
- 一个分区不能同时被同一个消费者组里的多个消费者消费
# 消息模型
1、发布-订阅模式
- 每个消费者都属于不同的消费者组 -- 这里 P0(partition-0) 和 P1(partition-1) 代表 分区一和分区二的意思
2、点对点(一对一)
- 所有消费者都属于同一个消费者组
# 分区与消息顺序
- 同一个生产者发送到同一分区的消息,先发送的 offset 比后发送的 offset 小
- 同一个生产者发送到不同分区的消息,消息顺序无法保证
消费者按照消息在分区里的存放顺序进行消费的 Kafka 只保证分区内的消息顺序,不能保证分区间的消息顺序
如何保证所有消息的顺序?
- 设置一个分区,这样就可以保证所有消息的顺序,但是失去了拓展性和性能(但一般不推荐)
- 支持通过设置消息的 key,相同 key 的消息会发送的同一个分区
# 消息传递语义
- 最多一次 -- 消息可能会丢失,永远不重复发送
- 最少一次 -- 消息不会丢失,但是可能会重复
- 精确一次 -- 保证消息被传递到服务端且在服务端不重复
- 在 Kafka 0.11.0 及之后的版本才实现
需要生产者和消费者共同来保证。
# ack 参数
required-acks
是 Kafka 生产者配置的一个参数,它定义了生产者等待确认消息已被接收的策略。
required-acks
的值可以是:
0
:生产者不需要等待任何确认,一旦消息被发送到 leader broker 就会返回成功。这是最快速但最不保证的消息传递。(至多一次)1
(默认):生产者等待 leader broker 确认收到消息。这是通常推荐的设置,因为它提供了单个副本的消息持久化保证。(至多一次)-1
或all
:生产者不仅等待 leader broker 的确认,还等待所有 ISR(In-Sync Replicas)的确认。这是最安全但最慢的设置。(至少一次)
# Kafka 如何保证消息不重复消费?
kafka 出现消息重复消费的原因:
- 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
- Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。
将
enable.auto.commit
参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。
那么这里会有个问题:什么时候提交 offset 合适?
- 处理完消息再提交:依旧有消息重复消费的风险(例如提交前服务挂掉了),和自动提交一样。
- 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
通过 offset 来防止重复消费不是一个好的办法 通常在消息中加入 唯一ID (例如流水ID,订单ID),在处理业务时,通过判断 ID 来防止重复处理
# Kafka 的自动提交是怎么样的
Kafka 的自动提交(Automatic Offset Commit)是一种机制,用于确定 Kafka 消费者应该将其当前的消费位移(offset)提交到 Kafka 服务器。
消费位移是一个指示 Kafka 主题分区中消费者已读取到的位置的值。
自动提交可以帮助简化消费者的管理,但它也涉及一些注意事项和权衡。
以下是关于 Kafka 自动提交的一些重要信息:
默认行为:Kafka 消费者默认启用了自动提交。这意味着消费者会自动定期将当前的消费位移提交到 Kafka 服务器,而无需显式调用
commitSync
或commitAsync
方法。提交频率:自动提交的频率由消费者配置参数
auto.commit.interval.ms
决定,默认值为 5000 毫秒(5 秒)。这意味着每隔 5 秒,消费者将提交当前的位移。幂等性问题:自动提交可能导致幂等性问题。如果消息在处理过程中成功处理但在位移提交之前失败,那么消息可能会被重新处理,这可能导致副作用。为了解决这个问题,你可以采用幂等性的消息处理逻辑,或者使用手动位移提交来更精确地控制位移的提交时机。
配置禁用:如果你希望完全控制位移的提交,可以禁用自动提交。通过将
enable.auto.commit
配置为false
,你可以关闭自动提交功能,然后在适当的时候手动调用commitSync
或commitAsync
来提交位移。enable.auto.commit=false
1手动提交:手动提交允许你在消息处理成功后显式地提交位移。这可以确保只有在消息成功处理后才提交位移,从而避免重复处理消息的问题。
consumer.poll(Duration.ofMillis(100)); // 拉取消息 // 处理消息 consumer.commitSync(); // 手动提交位移
1
2
3
总之,自动提交是 Kafka 消费者的默认行为,但在一些情况下,特别是需要确保幂等性和消息处理成功后才提交位移的情况下,可能需要禁用自动提交并使用手动提交。自动提交的频率可以通过配置进行调整。
# 讲讲 kafka 的自动提交,是在处理完消息前提交还是处理完消息之后提交
Apache Kafka 的消费者有两种方式来提交消费的进度,也就是提交 offset,一种是自动提交,另一种是手动提交。
这里我们主要讨论自动提交。
在 Kafka 中,自动提交是指:消费者自动地定期提交已经拉取的消息的 offset。
如果启用了自动提交,那么 auto.commit.interval.ms
配置的时间到了,消费者就会提交最新的 offset,无论消息是否已经处理完毕。
这意味着,自动提交的 offset 是在拉取到消息后就可能提交,而不一定是在处理完消息之后提交。这将导致在消费者崩溃或者重新启动时,可能会出现消息重复处理或者消息丢失的情况。
例如,如果消费者已经拉取了一些消息,但还没来得及处理,这时候自动提交触发,offset 被提交。如果此时消费者崩溃,再次启动时,它会从提交的 offset 开始消费,导致之前拉取但未处理的消息丢失。
另一方面,如果消费者拉取了一些消息并处理了它们,但在自动提交提交触发之前消费者崩溃了,那么 offset 并没有被提交。当消费者再次启动时,它会从上次提交的 offset 开始消费,这将导致处理过的消息被重复处理。
因此,
- 如果你的应用对消息的处理具有幂等性(即处理多次和处理一次的效果一样),那么使用自动提交可能是一个简单且有效的选择。
- 但如果你的应用需要精确地处理每一条消息,即不能丢失消息也不能重复处理消息,那么你可能需要考虑使用手动提交 offset,这样可以更精确地控制何时提交 offset。
# 自动提交与手动提交
- 自动提交(Auto Commit):这是指 Kafka 消费者定期自动将当前已消费的消息位移(offset)提交到 Kafka 服务器,而无需显式的用户干预。自动提交的频率由配置参数控制,通常情况下是定期的。这种提交位移的方式可能会导致一些消息被处理但尚未确认已成功处理的情况。如果发生故障,这些消息可能会被重新处理,从而可能导致消息的重复消费。
- 位移/手动 提交(Offset Commit):这是指消费者在成功处理一条消息后,显式地将该消息的位移提交到 Kafka 服务器。位移提交是由用户代码控制的,通常在处理逻辑执行后,确认消息已被成功处理后执行。这种提交位移的方式确保了消息只有在成功处理后才会被视为已消费,从而避免了消息的重复处理。
一般来说,位移提交更可靠,因为它允许消费者完全控制位移何时提交,以确保消息被成功处理后才被标记为已消费。自动提交则更容易实现,但在某些情况下可能导致消息的重复消费,因此需要根据具体的需求和应用场景来选择使用哪种方式。
所以,自动提交是指消费者定期自动提交位移,而位移提交是指显式地提交位移,由用户代码控制。
# Kafka 重试机制
# 消费失败会怎样?
在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。
因此,即使某个消息消费异常,Kafka 消费者仍然能够继续消费后续的消息,不会一直卡在当前消息,保证了业务的正常进行。
# 默认会重试多少次?
Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。
如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。
# 如何在重试失败后进行告警?
自定义重试失败后逻辑,需要手动实现,可以通过【重写 DefaultErrorHandler
的 handleRemaining
函数,加上自定义的告警等操作来】实现。
@Slf4j
public class DelErrorHandler extends DefaultErrorHandler {
public DelErrorHandler(FixedBackOff backOff) {
super(null,backOff);
}
@Override
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
super.handleRemaining(thrownException, records, consumer, container);
log.info("重试多次失败");
// 自定义操作
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
DefaultErrorHandler
只是默认的一个错误处理器,Spring Kafka 还提供了 CommonErrorHandler
接口。手动实现 CommonErrorHandler
就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。
# 重试失败后的数据如何再次处理?
当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?
当达到最大重试次数后,如果仍然无法成功处理消息,消息会被发送到对应的死信队列中。对于死信队列的处理,既可以用 @DltHandler
处理,也可以使用 @KafkaListener
重新消费。
# 重试注解 @RetryableTopic
@RetryableTopic
是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。
// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒
@RetryableTopic(
attempts = "5",
backoff = @Backoff(delay = 100, maxDelay = 1000)
)
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {
log.info("kafka customer:{}", message);
Integer n = Integer.parseInt(message);
if (n % 5 == 0) {
throw new RuntimeException();
}
System.out.println(n);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 什么是死信队列?
**死信队列(Dead Letter Queue,简称 DLQ)**是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。
- 当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。
- 在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。