1 Spring-Kafka快速入门
在 Spring 生态中,提供了 Spring-Kafka 项目,让我们更简便的使用 Kafka 。其官网介绍如下:
The Spring for Apache Kafka (spring-kafka) project applies core Spring concepts to the development of Kafka-based messaging solutions.
Spring for Apache Kafka (spring-kafka) 项目将 Spring 核心概念应用于基于 Kafka 的消息传递解决方案的开发。It provides a “template” as a high-level abstraction for sending messages.
它提供了一个“模板”作为发送消息的高级抽象。It also provides support for Message-driven POJOs with @KafkaListener annotations and a “listener container”.
它还通过 @KafkaListener 注解和“侦听器容器(listener container)”为消息驱动的 POJO 提供支持。These libraries promote the use of dependency injection and declarative.
这些库促进了依赖注入和声明的使用。In all of these cases, you will see similarities to the JMS support in the Spring Framework and RabbitMQ support in Spring AMQP.
在所有这些用例中,你将看到 Spring Framework 中的 JMS 支持,以及和 Spring AMQP 中的 RabbitMQ 支持的相似之处。
- 注意,Spring-Kafka 是基于 Spring Message 来实现 Kafka 的发送端和接收端。
Features(功能特性)
- KafkaTemplate
- KafkaMessageListenerContainer
- @KafkaListener
- KafkaTransactionManager
spring-kafka-test
jar with embedded kafka server(带嵌入式 Kafka 服务器的spring-kafka-test
jar 包)
1.1 引入依赖
1 | <!-- 引入 Spring-Kafka 依赖 --> |
1.2 应用配置文件
1 | spring: |
在
spring.kafka
配置项,设置 Kafka 的配置,对应 KafkaProperties 配置类。Spring Boot 提供的 KafkaAutoConfiguration 自动化配置类,实现 Kafka 的自动配置,创建相应的 Producer 和 Consumer 。
spring.kafka.bootstrap-servers
配置项,设置 Kafka Broker 地址。如果多个,使用逗号分隔。spring.kafka.producer
配置项value-serializer
配置,使用了 Spring-Kafka 提供的 JsonSerializer 序列化类- 其它配置,一般默认即可。
spring.kafka.consumer
配置项value-serializer
配置,使用了 Spring-Kafka 提供的 JsonDeserializer 反序列化类properties.spring.json.trusted.packages
配置,配置信任com.example.kafka.message
包下的 Message 类们。因为 JsonDeserializer 在反序列化消息时,考虑到安全性,只反序列化成信任的 Message 类。
1.3 Demo01Message
在com.example.kafka.message
包下创建Demo01Message类提供给当前示例使用。代码如下:
1 | // Demo01Message.java |
TOPIC
静态属性,设置该消息类对应 Topic 为"DEMO_01"
。
1.4 Demo01Producer
在com.example.kafka.producer
包下创建Demo01Producer,使用 Kafka-Spring 封装提供的 KafkaTemplate ,实现三种发送消息的方式。代码如下:
1 | // Demo01Producer.java |
#syncSend(...)
方法,同步发送消息。在方法内部,也是调用KafkaTemplate#send(topic, data)
方法,异步发送消息。不过,因为我们后面调用了 ListenableFuture 对象的#get()
方法,阻塞等待发送结果,从而实现同步的效果。#asyncSend(...)
方法,异步发送消息。在方法内部,会调用KafkaTemplate#send(topic, data)
方法,异步发送消息,返回 Spring ListenableFuture 对象,一个可以通过监听执行结果的 Future 增强。- 暂时未提供 oneway 发送消息的方式。因为需要配置 Producer 的
acks = 0
,才可以使用这种发送方式,先忽略。
对于胖友来说,可能最关心的是,消息 Message 是怎么序列化的。
- 在序列化时,我们使用了 JsonSerializer 序列化 Message 消息对象,它会在 Kafka 消息 Headers 的
__TypeId__
上,值为 Message 消息对应的类全名。 - 在反序列化时,我们使用了 JsonDeserializer 序列化出 Message 消息对象,它会根据 Kafka 消息 Headers 的
__TypeId__
的值,反序列化消息内容成该 Message 对象。
1.5 Demo01Consumer
在com.example.kafka.consumer
包下创建Demo01Consumer消费消息。代码如下:
1 |
|
- 在方法上,添加了
@KafkaListener
注解,声明消费的 Topic 是"DEMO_01"
,消费者分组是"demo01-consumer-group-DEMO_01"
。一般情况下,我们建议一个消费者分组,仅消费一个 Topic 。这样做会有个好处:每个消费者分组职责单一,只消费一个 Topic 。 - 方法参数,使用消费 Topic 对应的消息类即可。这里,我们使用了
Demo01Message
。 - 虽然
@KafkaListener
注解是方法级别的,但是建议一个类对应一个方法消费消息。
1.6 Demo01AConsumer
在com.example.kafka.consumer
包下创建Demo01AConsumer
类,消费消息。代码如下:
1 | // Demo01AConsumer.java |
- 整体和
Demo01Consumer
是一致的,主要有两个差异点,也是为什么又额外创建了这个消费者的原因。
差异一,在方法上,添加了 @KafkaListener
注解,声明消费的 Topic 还是 "DEMO_01"
,消费者分组修改成了 "demo01-A-consumer-group-DEMO_01"
。这样,我们就可以测试 Kafka 集群消费的特性。
集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
- 也就是说,如果发送一条 Topic 为
"DEMO_01"
的消息,可以分别被"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都消费一次。 - 但是,如果启动两个该示例的实例,则消费者分组
"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都会有多个 Consumer 示例。此时,我们再发送一条 Topic 为"DEMO_01"
的消息,只会被"demo01-A-consumer-group-DEMO_01"
的一个 Consumer 消费一次,也同样只会被"demo01-A-consumer-group-DEMO_01"
的一个 Consumer 消费一次。
通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER"
的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:
- 积分模块:判断如果是手机注册,给用户增加 20 积分。
- 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
- 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
- … 等等
这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。
差异二,方法参数,设置消费的消息对应的类不是 Demo01Message 类,而是 Kafka 内置的 ConsumerRecord 类。通过 ConsumerRecord 类,我们可以获取到消费的消息的更多信息,例如说消息的所属队列、创建时间等等属性,不过消息的内容(value
)就需要自己去反序列化。当然,一般情况下,不会使用 ConsumerRecord 类。
1.7 简单测试
创建 Demo01ProducerTest 测试类,编写二个单元测试方法,调用 Demo01Producer 二个发送消息的方式。代码如下:
1 |
|
执行 #testSyncSend()
方法,测试同步发送消息。控制台输出如下:
1 | # Producer 同步发送消息成功。注意 __TypeId__ |
- 通过日志我们可以看到,我们发送的消息,分别被
Demo01AConsumer
和Demo01Consumer
两个消费者(消费者分组)都消费了一次。 - 同时,两个消费者在不同的线程中,消费了这条消息。
不关闭 #testSyncSend()
单元测试方法,模拟每个消费者集群有多个 Consumer 节点。
执行 #testASyncSend()
方法,测试异步发送消息。控制台输出如下:
1 | #这条日志出现在testASyncSend窗口 |
- 和
#testSyncSend()
方法执行的结果,是一致的。此时,我们打开#testASyncSend()
方法所在的控制台,不会看到有消息消费的日志。说明,符合集群消费的机制:集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。。 - 不过如上的日志,也可能出现在
#testASyncSend()
方法所在的控制台,而不在#testSyncSend()
方法所在的控制台。
1.8 @KafkaListener
在Demo01Consumer
中使用了@KafkaListener
注解设置每个 Kafka 消费者 Consumer 的消息监听器的配置。
@KafkaListener
注解的常用属性如下:
1 | /** |
@KafkaListener
注解的不常用属性如下:
1 | /** |
2 批量发送消息
Kafka 提供了一个 RecordAccumulator 消息收集器,将发送给相同 Topic 的相同 Partition 分区的消息们,“偷偷”收集在一起,当满足条件时候,一次性批量发送提交给 Kafka Broker 。如下是三个条件,满足任一即会批量发送:
- 【数量】
batch-size
:超过收集的消息数量的最大条数。 - 【空间】
buffer-memory
:超过收集的消息占用的最大内存。 - 【时间】
linger.ms
:超过收集的时间的最大等待时长,单位:毫秒。
2.1 增加配置项
1 | spring: |
额外三个参数,就是我们说的 Producer 批量发送的三个条件:
spring.kafka.producer.batch-size
spring.kafka.producer.buffer-memory
spring.kafka.producer.properties.linger.ms
2.2 Demo02Message
在 com.example.kafka.message
包下,创建Demo02Message
消息类,提供给当前示例使用。代码如下:
1 | package com.example.kafka.message; |
TOPIC
静态属性,我们设置该消息类对应 Topic 为"DEMO_02"
。
2.3 Demo02Producer
在 com.example.kafka.producer
包下,创建Demo02Producer 类
,它会使用 Kafka-Spring 封装提供的 KafkaTemplate ,实现一个异步发送消息的方法。代码如下:
1 | // Demo02Producer.java |
- 看起来和我们在
1.4 Demo01Producer
提供的异步发送消息的方法,除了换成了 Demo02Message 消息对象,其它都是一模一样的。 - 因为我们发送的消息 Topic 是自动创建的,所以其 Partition 分区大小是 1 。这样,就能保证我每次调用这个方法,满足批量发送消息的一个前提,相同 Topic 的相同 Partition 分区的消息们。
2.4 Demo02Consumer
在com.example.kafka.consumer
包下,创建Demo02Consumer
类,消费消息。代码如下:
1 | // Demo02Consumer.java |
- 和
1.5 Demo01Consumer
基本一致,除了是不同的消费者分组,消费了不同的 Topic 。
2.5 测试
创建 Demo02ProducerTest
测试类,编写单元测试方法,测试 Producer 批量发送消息的效果。代码如下:
1 | // Demo02ProducerTest.java |
- 异步发送三条消息,每次发送消息之间,都故意 sleep 10 秒。目的是,恰好满足配置的
linger.ms
最大等待时长。
执行 #testASyncSend()
方法,测试批量发送消息。控制台输出如下:
1 | # 打印 testASyncSend 方法开始执行的日志 |
3 定时消息
Kafka 并未提供定时消息的功能,需要自行拓展。
可以考虑基于 MySQL 存储定时消息,Job 扫描到达时间的定时消息,发送给 Kafka 。
4 消费重试
Spring-Kafka 提供消费重试的机制。在消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
Spring-Kafka 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,Spring-Kafka 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,Spring-Kafka 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
Spring-Kafka 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费。
每条消息的失败重试,是可以配置一定的间隔时间。参考下面示例
4.1 KafkaConfiguration
在包com.example.kafka.config
下创建KafkaConfiguration
配置类如下
1 | // KafkaConfiguration.java |
Spring-Kafka 的消费重试功能,通过实现自定义的
SeekToCurrentErrorHandler
在 Consumer 消费消息异常的时候,进行拦截处理:- 在重试小于最大次数时,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
- 在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。
- 例如,Topic 是
"DEMO_04"
,则其对应的死信队列的 Topic 就是"DEMO_04.DLT"
,即在原有 Topic 加上.DLT
后缀,就是其死信队列的 Topic 。
<1>
处,创建 DeadLetterPublishingRecoverer 对象,它负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。<2>
处,创建 FixedBackOff 对象。这里,我们配置了重试 3 次,每次固定间隔 30 秒。当然,胖友可以选择 BackOff 的另一个子类 ExponentialBackOff 实现,提供指数递增的间隔时间。<3>
处,创建 SeekToCurrentErrorHandler 对象,负责处理异常,串联整个消费重试的整个过程。一方面,在消息消费失败时,
SeekToCurrentErrorHandler<3>
会将 调用 Kafka Consumer 的#seek(TopicPartition partition, long offset)
方法,将 Consumer 对于该消息对应的 TopicPartition 分区的本地进度设置成该消息的位置。这样,Consumer 在下次从 Kafka Broker 拉取消息的时候,又能重新拉取到这条消费失败的消息。另一方面,Spring-Kafka 使用 FailedRecordTracker 对每个 Topic 的每个 TopicPartition 消费失败次数进行计数,这样相当于对该 TopicPartition 的第一条消费失败的消息的消费失败次数进行计数。
- 在 FailedRecordTracker 中,会调用 BackOff
<2>
来进行计算,该消息的下一次重新消费的时间,通过Thread#sleep(...)
方法,实现重新消费的时间间隔。
- 在 FailedRecordTracker 中,会调用 BackOff
有一点需要注意,FailedRecordTracker 提供的计数是客户端级别的,重启 JVM 应用后,计数是会丢失的。所以,如果想要计数进行持久化,需要自己自行实现下 FailedRecordTracker 类,通过 ZooKeeper 存储计数。
4.2 Demo04Message
在包com.example.kafka.message
下创建Demo04Message如下
1 | // Demo04Message.java |
TOPIC
静态属性,我们设置该消息类对应 Topic 为"DEMO_04"
。
4.3 Demo04Producer
在com.example.kafka.producer
包下创建Demo04Producer
如下
1 | // Demo04Producer.java |
4.4 Demo04Consumer
在com.example.kafka.producer
包下创建Demo04Consumer
如下
1 | // Demo04Consumer.java |
4.5 简单测试
创建 Demo04ProducerTest
测试类,编写一个单元测试方法,调用 Demo04Producer 同步发送消息。代码如下:
1 | // Demo04ProducerTest.java |
运行testSyncSend
方法结果如下
1 | #发送成功 |
5 广播消费
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
- 不过 Kafka 并不直接提供内置的广播消费的功能。使每个 Consumer 独有一个 Consumer Group ,从而保证都能接收到全量的消息。
例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 Kafka 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。
又例如说,基于 WebSocket 实现了 IM 聊天,在给用户主动发送消息时,因为不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 Kafka 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。
5.1 修改应用配置
和2.1 增加配置项
中修改配置项 spring.kafka.consumer.auto-offset-reset=latest
。因为在广播订阅下,一般情况下,无需消费历史的消息,而是从订阅的 Topic 的队列的尾部开始消费即可,所以配置为 latest
。
5.2 Demo05Message
在 com.example.kafka.message
包下,创建 Demo05Message
消息类,提供给当前示例使用。代码如下:
1 | // Demo05Message.java |
TOPIC
静态属性,设置该消息类对应 Topic 为"DEMO_05"
。
5.3 Demo05Producer
在 com.example.kafka.producer
包下,创建 Demo05Producer
类,它会使用 Kafka-Spring 封装提供的 KafkaTemplate ,同步发送消息。代码如下:
1 | // Demo04Producer.java |
- 和
1.4 Demo01Producer
的同步发送消息的代码是一致的,就是换成了Demo05Message
。
5.4 Demo05Consumer
在 cn.iocoder.springboot.lab03.kafkademo.consumer
包下,创建 Demo05Consumer
类,消费消息。代码如下:
1 | // Demo04Consumer.java |
- 在
<X>
处,我们通过 Spring EL 表达式,在每个消费者分组的名字上,使用 UUID 生成其后缀。这样,我们就能保证每个项目启动的消费者分组不同,以达到广播消费的目的。
5.5 简单测试
创建 Demo05ProducerTest
测试类,用于测试广播消费。代码如下:
1 |
|
首先,执行 #test()
测试方法,先启动一个消费者分组 "demo05-consumer-group-DEMO_05-${UUID1}"
的 Consumer 节点。
然后,执行 #testSyncSend()
测试方法,再启动一个消费者分组 "demo05-consumer-group-DEMO_05-${UUID2}"
的 Consumer 节点。同时,该测试方法,调用 Demo05ProducerTest#syncSend(id)
方法,同步发送了一条消息。控制台输出如下:
1 | #### testSyncSend 方法对应的控制台 #### |
- 消费者分组
"demo05-consumer-group-DEMO_05-${UUID1}"
和demo05-consumer-group-DEMO_05-${UUID2}
的两个 Consumer 节点,都消费了这条发送的消息。符合广播消费的预期
6 并发消费
在上述的示例中,我们配置的每一个 Spring-Kafka @KafkaListener
,都是串行消费的。显然,这在监听的 Topic 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。
虽然说,我们可以通过启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度。但是问题是,否能够实现多线程的并发消费呢?答案是有。
在1.8 @KafkaListener
小节中,我们可以看到该注解有 concurrency
属性,它可以指定并发消费的线程数。例如说,如果设置 concurrency=4
时,Spring-Kafka 就会为该 @KafkaListener
创建 4 个线程,进行并发消费。
例如:
- 创建一个 Topic 为
"DEMO_06"
,并且设置其 Partition 分区数为 10 。 - 创建一个 Demo06Consumer 类,并在其消费方法上,添加
@KafkaListener(concurrency=2)
注解。 - 启动项目。Spring-Kafka 会根据
@KafkaListener(concurrency=2)
注解,创建 2 个 Kafka Consumer 。每个 Kafka Consumer 会被单独分配到一个线程中,进行拉取消息,消费消息。 - 之后,Kafka Broker 会将 Topic 为
"DEMO_06"
分配给创建的 2 个 Kafka Consumer 各 5 个 Partition 。 - 因为
@KafkaListener(concurrency=2)
注解,创建 2 个 Kafka Consumer ,在各自的线程中,拉取各自的 Topic 为"DEMO_06"
的 Partition 的消息,各自串行消费。从而,实现多线程的并发消费。
6.1 引入依赖
和 1.1 引入依赖
一致
6.2 应用配置文件
和 1.2 应用配置文件
一致
实际上,可以通过 spring.kafka.listener.concurrency
配置项,全局设置每个 @KafkaListener
的并发消费的线程数。不过个人建议,还是每个 @KafkaListener
各自配置,毕竟每个 Topic 的 Partition 的数量,都是不同的。当然,也可以结合使用。
6.3 Demo06Message
在 com.example.kafka.message
包下,创建 Demo06Message
消息类,提供给当前示例使用。代码如下:
1 | // Demo06Message.java |
注意,记得手动创建一个 "DEMO_06"
的 Partition 大小为 10 。可执行如下命令:
1 | ./kafka-topics.sh --create --topic DEMO_06 --bootstrap-server localhost:9094 --replication-factor 1 --partitions 10 |
6.4 Demo06Producer
在 com.example.kafka.producer
包下,创建 Demo06Producer
类,它会使用 Kafka-Spring 封装提供的 KafkaTemplate ,同步发送消息。代码如下:
1 | // Demo06Producer.java |
- 和
1.1 Demo01Producer
的同步发送消息的代码是一致的,就是换成了 Demo06Message 。
6.5 Demo06Consumer
在 com.example.kafka.consumer
类,消费消息。代码如下:
1 | // Demo06Consumer.java |
- 在
<X>
处,我们在@KafkaListener
注解上,添加了concurrency = "2"
属性,创建 2 个线程消费"DEMO_06"
下的消息。
6.6 简单测试
创建 Demo06ProducerTest
测试类,编写一个单元测试方法,发送 10 条消息,观察并发消费情况。代码如下:
1 |
|
运行 testSyncSend
测试方法结果如下
1 | 2023-01-16 13:19:26.220 INFO 6548 --- [ntainer#5-1-C-1] c.example.kafka.consumer.Demo06Consumer : [onMessage][线程编号:25 消息内容:Demo06Message(id=1673846366)] |
可以看到有两个线程在消费消息,此时,使用 Kafka Manager 来查看 "DEMO_06"
的消费者列表:也可以看到有两个消费者各负责消费了5个分区
7 顺序消息
我们先来一起了解下顺序消息的顺序消息的定义:
- 普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列。
- 完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。
在上述的示例中,我们看到 Spring-Kafka 在 Consumer 消费消息时,天然就支持按照 Topic 下的 Partition 下的消息,顺序消费。即使在6.并发消费
时,也能保证如此。
那么此时,我们只需要考虑将 Producer 将相关联的消息发送到 Topic 下的相同的 Partition 即可。只要发送消息时,指定了消息的 key ,Producer 则会根据 key 的哈希值取模来获取到其在 Topic 下对应的 Partition 。分区策略参考 《Kafka 发送消息分区选择策略详解》 文章。
下面示例,我们根据6. 并发消费
项目改造。
7.1 Demo06Producer
修改 Demo06Producer
类,增加顺序发送消息方法。代码如下:
1 | // Demo06Producer.java |
- 调用 KafkaTemplate 同步发送消息方法时,我们多传入了
id
作为消息的 key ,从而实现发送到DEMO_06
这个 Topic 下的相同 Partition 中。
7.2 简单测试
修改Demo06ProducerTest
测试类,新增一个单元测试方法,顺序发送 10 条消息,观察消费情况。代码如下:
1 | // Demo06ProducerTest.java |
执行 #testSyncSendOrderly()
单元测试,输出日志如下:
1 | 2023-01-18 14:44:40.410 INFO 51476 --- [ main] com.example.kafka.Demo06ProducerTest : [testSyncSend][发送编号:[1] 发送队列:[9]] |
8 事务消息
Kafka 内置提供事务消息的支持。对事务消息的概念参考《事务消息组件的套路》 文章。
不过 Kafka 提供的并不是完整的的事务消息的支持,缺少了回查机制。关于这一点,刚推荐的文章也有讲到。目前,常用的分布式消息队列,只有 RocketMQ 提供了完整的事务消息的支持,参考《Spring Boot 消息队列 RocketMQ 入门》 的「9. 事务消息」小节。
8.1 引入依赖
和1.1 引入依赖
一致
8.2 应用配置文件
application.yaml
配置内容如下:
1 | spring: |
- 相比
1.2 应用配置文件
来说,修改或增加如下三个参数:- 修改
spring.kafka.producer.acks=all
配置,不然在启动时会报"Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence."
错误。因为,Kafka 的事务消息需要基于幂等性来实现,所以必须保证所有节点都写入成功。 - 增加
transaction-id-prefix=demo.
配置,事务编号的前缀。需要保证相同应用配置相同,不同应用配置不同。具体可以看看《How to choose Kafka transaction id for several applications》的讨论。 - 增加
spring.kafka.consumer.properties.isolation.level=read_committed
配置,Consumer 仅读取已提交的消息。
- 修改
8.3 Demo07Producer
在 com.example.kafka.producer
包下,创建Demo07Producer
类,它会使用 Kafka-Spring 封装提供的KafkaTemplate ,实现发送事务消息。代码如下:
1 | // Demo07Producer.java |
- 使用 kafkaTemplate 提交的
#executeInTransaction(OperationsCallback<K, V, T> callback)
模板方法,实现在 Kafka 事务中,执行自定义KafkaOperations.OperationsCallback操作。- 在
#executeInTransaction(...)
方法中,我们可以通过KafkaOperations
来执行发送消息等 Kafka 相关的操作,也可以执行自己的业务逻辑。 - 在
#executeInTransaction(...)
方法的开始,它会自动动创建 Kafka 的事务;然后执行我们定义的 KafkaOperations 的逻辑;如果成功,则提交 Kafka 事务;如果失败,则回滚 Kafka 事务。
- 在
- 另外,我们定义了一个
runner
参数,用于表示本地业务逻辑~
注意,如果 Kafka Producer 开启了事务的功能,则所有发送的消息,都必须处于 Kafka 事务之中,否则会抛出 " No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record"
异常。
所以,如果业务中,即存在需要事务的情况,也存在不需要事务的情况,需要分别定义两个 KafkaTemplate(Kafka Producer)。
8.4 Demo07Consumer
在 cn.iocoder.springboot.lab03.kafkademo.consumer
包下,创建 Demo07Consumer
类,消费消息。代码如下:
1 | // Demo07Consumer.java |
- 和
1.6 Demo1Consumer.java
一致,差别只在于消费 Topic 为DEMO_07
的消息。
8.5 简单测试
创建 Demo07ProducerTest
测试类,编写单元测试方法,调用 Demo07Producer 发送事务消息的方式。代码如下:
1 | // Demo07ProducerTest.java |
- 故意创建一个执行逻辑为 sleep 10 秒的 Runnable 对象,来让我们测试验证,事务消息在提交后,才会被 Consumer 消费到。
执行 #testSyncSendInTransaction()
单元测试,输出日志如下:
1 | # Producer 同步发送消息成功。 |
- 成功观察到,发送的事务消息,在提交之后才被 Consumer 消费到。