一 概述
RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。
- AMQP 里主要要说两个组件:Exchange 和 Queue ,绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,这部分是 RabbitMQ 实现的。
- 而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型(角色)。
二 Spring-AMQP
在 Spring-Boot
项目中,提供了 AMQP 和 RabbitMQ 的自动化配置,仅需引入 spring-boot-starter-amqp
依赖
1 Direct Exchange
Direct 类型的 Exchange 路由规则比较简单,它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中。以下图的配置为例:
- 以
routingKey="error"
发送消息到 Exchange ,则消息会路由到 Queue1(amqp.gen-S9b…
) 。 - 以
routingKey="info"
或routingKey="warning"
来发送消息,则消息只会路由到 Queue2(amqp.gen-Agl…
) 。 - 如果我们以其它 routingKey 发送消息,则消息不会路由到这两个 Queue 中。
- 总结来说,指定 Exchange + routing key ,有且仅会路由到至多一个 Queue 中。
- 极端情况下,如果没有匹配,消息就发送到“空气”中,不会进入任何 Queue 中。
下面创建一个 Direct Exchange 的使用示例
1.1 引入依赖
引入maven依赖
1 | <!-- 实现对 RabbitMQ 的自动化配置 --> |
1.2 配置
配置application.yaml
配置如下:
1 | spring: |
在启动类添加@EnableAsync
注解用于后续测试
1 |
|
1.3 创建消息类
1 |
|
- 消息类需要实现 Java Serializable 序列化接口。因为 RabbitTemplate 默认使用 Java 自带的序列化方式,进行序列化 POJO 类型的消息。
- 在消息类里,我们枚举了 Exchange、Queue、RoutingKey 的名字。
1.4 RabbitConfig
创建 RabbitConfig
配置类,添加 Direct Exchange 示例相关的 Exchange、Queue、Binding 的配置。代码如下:
1 | // RabbitConfig.java |
- 在 DirectExchangeDemoConfiguration 内部静态类中,创建了 Exchange、Queue、Binding 三个 Bean ,后续 RabbitAdmin 会自动创建交换器、队列、绑定器。具体查看
RabbitAdmin#initialize()
方法,或 《RabbitMQ 自动创建队列/交换器/绑定》 文章。
1.5 Demo01Producer
创建 Demo01Producer
类,它会使用 Spring-AMQP 封装提供的 RabbitTemplate ,实现发送消息。代码如下:
1 |
|
RabbitTemplate 是 AmqpTemplate 接口的实现类, RabbitTemplate 还实现了其它接口,操作会更为丰富。
#syncSend(Integer id)
方法,调用 RabbitTemplate 的同步发送消息方法。方法定义如下:1
2// AmqpTemplate.java
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;- 指定 Exchange + RoutingKey ,从而路由到一个 Queue 中。
#syncSendDefault(Integer id)
方法,也调用 RabbitTemplate 的同步发送消息方法。方法定义如下:1
2// AmqpTemplate.java
void convertAndSend(String routingKey, Object message) throws AmqpException;- 这里传入的 RoutingKey 实际传入了队列名 因为 RabbitMQ 有一条默认的 Exchange: (AMQP default) 规则:
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted
。 - 默认交换器,隐式地绑定到每个队列,路由键等于队列名称。
- 所以,此处即使传入的 RoutingKey 为队列名,一样可以发到对应队列。
- 这里传入的 RoutingKey 实际传入了队列名 因为 RabbitMQ 有一条默认的 Exchange: (AMQP default) 规则:
#asyncSend(Integer id)
方法,通过@Async
注解,声明异步调用该方法,从而实现异步消息到 RabbitMQ 中。使用 AsyncRabbitTemplate 类,同样提供异步的 RabbitMQ 操作。
1.6 Demo01Consumer
创建 Demo01Consumer
类,消息消息,代码如下:
1 | // Demo01Consumer.java |
- 在类上,添加了
@RabbitListener
注解,声明了消费的队列是"QUEUE_DEMO_01"
。 - 在方法上,添加了
@RabbitHandler
注解,申明了处理消息的方法。同时,方法入参为消息的类型。
1.7 简单测试
创建 Demo01ProducerTest 测试类,编写三个单元测试方法,调用 Demo01Producer 三个发送消息的方式。代码如下:
1 |
|
执行testSyncSend方法
成功接收到消息
执行tesSyncSendDefault方法
成功接收到消息
执行testAsyncSend方法
成功接收到消息
2 Topic Exchange
前面讲到 Direct Exchange路由规则,是完全匹配 binding key 与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。
Topic Exchange 在匹配规则上进行了扩展,它与 Direct 类型的Exchange 相似,也是将消息路由到 binding key 与 routing key 相匹配的 Queue 中,但这里的匹配规则有些不同,它约定:
- routing key 为一个句点号
"."
分隔的字符串。我们将被句点号"."
分隔开的每一段独立的字符串称为一个单词,例如 “stock.usd.nyse”、”nyse.vmw”、”quick.orange.rabbit” - binding key 与 routing key 一样也是句点号
"."
分隔的字符串。 - binding key 中可以存在两种特殊字符
"*"
与"#"
,用于做模糊匹配。其中"*"
用于匹配一个单词,"#"
用于匹配多个单词(可以是零个)。
以下图中的配置为例:
routingKey="quick.orange.rabbit"
的消息会同时路由到 Q1 与 Q2 。routingKey="quick.orange.fox"
的消息会路由到 Q1 。routingKey="lazy.brown.fox"
的消息会路由到 Q2 。routingKey="lazy.pink.rabbit"
的消息会路由到Q2(只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配)。routingKey="quick.brown.fox"
、routingKey="orange"
、routingKey="quick.orange.male.rabbit"
的消息将会被丢弃,因为它们没有匹配任何 bindingKey 。
2.1 Demo02Message
创建Demo02Message消息类如下
1 |
|
- 在消息类里,定义了 Exchange、Queue、RoutingKey 的名字。
- 新定义的路由键
ROUTING_KEY = "#.li.lei"
,匹配以"li.lei"
结尾,开头可以是任意个单词的路由键。
2.2 RabbitConfig
修改 RabbitConfig
配置类,添加 Topic Exchange 示例相关的 Exchange、Queue、Binding 的配置。代码如下
1 |
|
2.3 Demo02Producer
创建Demo02Producer,代码如下
1 |
|
routingKey由方法参数传入
2.4 Demo02Consumer
与1.6 Demo01Consumer类似 消费的队列是 "QUEUE_DEMO_02"
1 |
|
2.5 简单测试
创建 Demo02ProducerTest
测试类,编写两个单元测试方法,调用 Demo02Producer 发送消息的方法。代码如下:
1 |
|
方法testSyncSendSuccess()
发送消息的 RoutingKey 是 "da.li.lei"
,可以匹配到 "DEMO_QUEUE_02"
。
方法testSyncSendFailure()
发送消息的 RoutingKey 是 "yu.zhang.kai"
,无法匹配到 "DEMO_QUEUE_02"
3 Fanout Exchange
Fanout Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中。如下图:
- 生产者(P)发送到 Exchange(X)的所有消息都会路由到图中的两个 Queue,并最终被两个消费者(C1 与 C2)消费。
- 总结来说,指定 Exchange ,会路由到多个绑定的 Queue 中。
3.1 Demo03Message
创建Demo03Message
消息类,提供给当前示例使用。代码如下:
1 | // Demo03Message.java |
- 未定义 RoutingKey 的名字。因为,Fanout Exchange 仅需要 Exchange 即可。
- 定义两个 Queue 的名字。因为要测试 Fanout Exchange 投递到多个 Queue 的效果。
3.2 RabbitConfig4
修改 RabbitConfig 配置类,添加 Fanout Exchange 示例相关的 Exchange、Queue、Binding 的配置。代码如下:
1 |
|
- 在 FanoutExchangeDemoConfiguration 内部静态类中,也创建了 Exchange、Queue、Binding 三种 Bean 。其中Exchange创建的是本例中的
FanoutExchange
。 - 同时,因为要投递到两个 Queue 中,所以我们创建了两个 Binding 。
3.3 Demo03Producer
创建 Demo03Producer
类,使用 Spring-AMQP 封装提供的 RabbitTemplate ,实现发送消息。代码如下:
1 |
|
3.4 Demo03Consumer
创建 Demo03ConsumerA
和 Demo03ConsumerB
两个类,消费消息。代码如下:
1 | // Demo03ConsumerA.java |
1 | // Demo03ConsumerB.java |
3.5 简单测试
创建Demo03ProducerTest
类进行测试,代码如下
1 |
|
测试结果
可以看到两个队列都收到了消息
4 批量发送消息
在一些业务场景下,希望使用 Producer 批量发送消息,提高发送性能。RocketMQ 是提供了一个可以批量发送多条消息的 API 。而 Spring-AMQP 提供的批量发送消息,它提供了一个 MessageBatch 消息收集器,将发送给相同 Exchange + RoutingKey 的消息们,收集在一起,当满足条件时候,一次性批量发送提交给 RabbitMQ Broker 。
Spring-AMQP 通过 BatchingRabbitTemplate 提供批量发送消息的功能。如下是三个条件,满足任一即会批量发送:
- 【数量】
batchSize
:超过收集的消息数量的最大条数。 - 【空间】
bufferLimit
:超过收集的消息占用的最大内存。 - 【时间】
timeout
:超过收集的时间的最大等待时长,单位:毫秒。超时开始计时的时间,是以最后一次发送时间为起点。也就说,每调用一次发送消息,都以当前时刻开始计时,重新到达timeout
毫秒才算超时。
另外,BatchingRabbitTemplate 提供的批量发送消息的能力比较弱。对于同一个 BatchingRabbitTemplate 对象来说,**同一时刻只能有一个批次(保证 Exchange + RoutingKey 相同)**,否则会报错。
4.1 Demo05Message
创建Demo05Message,代码如下
1 |
|
修改配置类中的DirectExchangeDemoConfiguration
如下
1 |
|
配置batchRabbitTemplate
代码如下
1 |
|
batchRabbitTemplate(ConnectionFactory connectionFactory)
方法,创建 BatchingRabbitTemplate Bean 对象。具体的 batchSize
、bufferLimit
、timeout
数值配置多少,根据自己的应用来。这里故意将 timeout
配置成了 30 秒,主要为了演示之用。
4.5 Demo05Producer
创建 Demo05Producer
类,使用 Spring-AMQP 封装提供的 BatchingRabbitTemplate ,实现批量发送消息。代码如下:
1 |
|
4.6 Demo05Consumer
创建Demo05Consumer
,代码如下
1 |
|
4.7 简单测试
创建测试类,代码如下
1 |
|
- Producer 成功同步发送了 3 条消息,每条间隔 10 秒。
- Demo05Consumer 在最后一条消息发送成功后果的 30 秒,消费到这 3 条消息。
- 因为使用 BatchingRabbitTemplate 批量发送消息,所以在 Producer 成功发送完第一条消息后,Consumer 并未消费到这条消息。
- 又因为 BatchingRabbitTemplate 是按照每次发送后,都重新计时,所以在最后一条消息成功发送后的 30 秒,Consumer 才消费到批量发送的 3 条消息。
5 批量消费消息
在上一节批量发送消息中,已经实现批量发送消息到 RabbitMQ Broker 中。
实际上,RabbitMQ Broker 存储的是一条消息。又或者说,RabbitMQ 并没有提供批量接收消息的 API 接口。
上一节能批量发送消息是借助于 SimpleBatchingStrategy
所封装提供:
- 在 Producer 最终批量发送消息时,SimpleBatchingStrategy 会通过
#assembleMessage()
方法,将批量发送的多条消息组装成一条“批量”消息,然后进行发送。 - 在 Consumer 拉取到消息时,会根据
#canDebatch(MessageProperties properties)
方法,判断该消息是否为一条“批量”消息。如果是,则调用# deBatch(Message message, Consumer fragmentConsumer)
方法,将一条“批量”消息拆开,变成多条消息。
在一些业务场景下,会希望使用 Consumer 批量消费消息,提高消费速度。在 Spring-AMQP 中,提供了两种批量消费消息的方式。
5.1 方法一
在 SimpleBatchingStrategy 将一条“批量”消息拆开,变成多条消息后,直接批量交给 Consumer 进行消费处理。
5.1.1 RabbitConfig
修改 RabbitConfig
配置类,添加自定义的 SimpleRabbitListenerContainerFactory
Bean ,支持用于创建支持批量消费的 SimpleRabbitListenerContainer
。代码如下:
1 |
|
- 在
RabbitAnnotationDrivenConfiguration
自动化配置类中,它会默认创建一个名字为"rabbitListenerContainerFactory"
的 SimpleRabbitListenerContainerFactory Bean ,可用于消费者的监听器是单个消费的。 - 我们自定义创建的一个名字为
"consumerBatchContainerFactory"
的 SimpleRabbitListenerContainerFactory Bean ,可用于消费者的监听器是批量消费的。重点是<X>
处,配置消费者的监听器是批量消费消息的类型。
5.1.2 Demo06Message
创建Demo06Message
,代码如下。
1 |
|
5.1.3 Demo06Consumer
创建Demo06Consumer
,代码如下
1 |
|
- 在类上的
@RabbitListener
注解的containerFactory
属性,设置了在5.1.1 RabbitConfig
创建的SimpleRabbitListenerContainerFactory Bean ,表示它要批量消费消息。 - 在
#onMessage(...)
消费方法上,修改方法入参的类型为 List 数组。
绑定配置和Producer与上一节批量发送消息基本一致,消息换成Demo06Message
,代码如下
1 | //配置 |
1 | //Producer |
5.1.4 简单测试
和上一节类似,创建测试类
1 |
|
符合预期,Demo06Consumer 批量消费了 3 条消息。
5.2 方法二
在上一节学习了一种批量消费消息的方式。因为其依赖4 批量发送消息
的功能,不够灵活。所以,Spring-AMQP 提供了第二种批量消费消息的方式。
其实现方式是,阻塞等待最多 receiveTimeout
秒,拉取 batchSize
条消息,进行批量消费。
- 如果在
receiveTimeout
秒内已经成功拉取到batchSize
条消息,则直接进行批量消费消息。 - 如果上一条消息等待超过
receiveTimeout
秒还没拉取到batchSize
条消息,不再等待,而是进行批量消费消息。
注意 Spring-AMQP 的阻塞等待时长 receiveTimeout
的意义
- 它代表的是,每次拉取一条消息,最多阻塞等待
receiveTimeout
时长。如果等待不到下一条消息,则进入已获取到的消息的批量消费。也就是说,极端情况下,可能等待receiveTimeout * batchSize
时长,才会进行批量消费。 - 源码
SimpleMessageListenerContainer#doReceiveAndExecute(BlockingQueueConsumer consumer)
方法。
5.2.1 Demo07Message
创建Demo07Message
代码如下
1 |
|
5.2.2 配置
配置类添加配置如下
1 |
|
DirectExchangeDemoConfiguration3
配置类,用于定义 Queue、Exchange、Binding 的配置。- 相比
方法一
来说,额外增加了batchSize = 10
、receiveTimeout = 30 * 1000L
、consumerBatchEnabled = 30 * 1000L
属性。 - 删除上一节中
BatchingRabbitTemplate
的有关配置
5.2.3 Demo07Producer
创建Demo07Producer
,代码如下:
1 |
|
5.2.4 Demo07Consumer
创建 Demo07Consumer
代码如下
1 |
|
@RabbitListener
使用刚才配置的consumerBatchContainerFactory2
5.2.5 简单测试
创建Demo07ProducerTest
测试,代码如下:
1 |
|
运行testSyncSend01
- 可以发现在发送三次消息后等待30秒,即使没有拉取到10条消息也批量消费
运行testSyncSen02
- 可以发现在循环满10条消息后,不经等待就批量消费了10条消息
6 重试与死信队列
DLX, Dead-Letter-Exchange。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:
- 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
- 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))
- 队列达到最大长度
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0以前支持的immediate参数(可以参考RabbitMQ之mandatory和immediate)的功能。
在消息消费失败的时候,Spring-AMQP 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,Spring-AMQP 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费。
- 而在 RabbitMQ 中,消费重试是由 Spring-AMQP 所封装提供的,死信队列是 RabbitMQ 自带的功能。
那么消费失败到达最大次数的消息,是怎么进入到死信队列的呢?Spring-AMQP 在消息到达最大消费次数的时候,会将该消息进行否定(basic.nack
),并且 requeue=false
,这样后续就可以利用 RabbitMQ 的死信队列的机制,将该消息转发到死信队列。
另外,每条消息的失败重试,是可以配置一定的间隔时间。具体,在示例的代码中,进行具体的解释。
下面 Consumer 消费重试的示例。新建一个项目。
6.1 配置
配置文件添加配置如下
1 | spring: |
相比之前的配置文件来说,新增
spring.rabbitmq.simple.retry.enable=true
配置项,来开启 Spring-AMQP 的消费重试的功能。同时,通过新增max-attempts
和initial-interval
配置项,设置重试次数和间隔。max-attempts
配置项要注意,是一条消息一共尝试消费总共max-attempts
次,包括首次的正常消费。另外,可以通过添加
spring.rabbitmq.listener.simple.retry.multiplier
配置项来实现递乘的时间间隔,添加spring.rabbitmq.listener.simple.retry.max-interval
配置项来实现最大的时间间隔。
在 Spring-AMQP 的消费重试机制中,在消费失败到达最大次数后,会自动抛出 AmqpRejectAndDontRequeueException 异常,从而结束该消息的消费重试。这意味着什么呢?如果我们在消费消息的逻辑中,主动抛出 AmqpRejectAndDontRequeueException 异常,也能结束该消息的消费重试。
另外,默认情况下,spring.rabbitmq.simple.retry.enable=false
,关闭 Spring-AMQP 的消费重试功能。但是实际上,消费发生异常的消息,还是会一直重新消费。这是为什么呢?Spring-AMQP 会将该消息通过 basic.nack
+ requeue=true
,重新投递回原队列的尾巴。如此,我们便会不断拉取到该消息,不断“重试”消费该消息。当然在这种情况下,我们一样可以主动抛出 AmqpRejectAndDontRequeueException 异常,也能结束该消息的消费重试。
Spring-AMQP 是如何提供消费重试的功能的?
- Spring-AMQP 基于 spring-retry 项目提供的 RetryTemplate ,实现重试功能。Spring-AMQP 在获取到消息时,会交给 RetryTemplate 来调用消费者 Consumer 的监听器 Listener(就是我们实现的),实现该消息的多次消费重试。
- 在该消息的每次消费失败后,RetryTemplate 会通过 BackOffPolicy 来进行计算,该消息的下一次重新消费的时间,通过
Thread#sleep(...)
方法,实现重新消费的时间间隔。到达时间间隔后,RetryTemplate 又会调用消费者 Consumer 的监听器 Listener 来消费该消息。 - 当该消息的重试消费到达上限后,RetryTemplate 会调用 MethodInvocationRecoverer 回调来实现恢复。而 Spring-AMQP 自定义实现了 RejectAndDontRequeueRecoverer 来自动抛出 AmqpRejectAndDontRequeueException 异常,从而结束该消息的消费重试。
- 有一点需要注意,Spring-AMQP 提供的消费重试的计数是客户端级别的,重启 JVM 应用后,计数是会丢失的。所以,如果想要计数进行持久化,需要自己重新实现下。
6.2 Demo08Message
创建Demo08Message,代码如下:
1 |
|
- 额外增加了死信队列会用到的 Queue 和 RoutingKey
6.3 RabbitConfig
创建RabbitConfig
,代码如下
1 |
|
- 创建的正常 Queue 额外设置了,当消息成为死信时,RabbitMQ 自动转发到 Exchange 为
Demo08Message.EXCHANGE
,RoutingKey 为Demo08Message.DEAD_ROUTING_KEY
的死信队列中。 - 通过
#demo08DeadQueue()
方法来创建死信队列的 Queue ,通过#demo08DeadBinding()
方法来创建死信队列的 Binding 。因为重用了 Exchange 为Demo08Message.EXCHANGE
,所以无需创建。当然,也可以根据自己的需要,创建死信队列的 Exchange 。
6.4 Demo08Producer
创建 Demo08Producer
代码如下:
1 |
|
使用 Spring-AMQP 封装提供的 RabbitTemplate ,实现发送消息。
6.5 Demo08Consumer
创建 Demo08Consumer
代码如下
1 |
|
- 在
<X>
处消费消息时候,抛出一个 RuntimeException 异常,模拟消费失败。
6.6 Demo08DeadConsumer
创建Demo08DeadConsumer
消费死信队列的消息,代码如下
1 |
|
在类上,添加了 @RabbitListener
注解,声明了消费的队列是 DEAD_QUEUE_DEMO_07
这个死信队列
6.7 简单测试
编写测试类,代码如下
1 |
|
- Demo08Consumer 重试消费消息 3 次,每次间隔 1 秒,全部都失败,最终该消息转发到死信队列中。
- Demo08DeadConsumer 消费死信队列中的该消息。
6.8 发送重试
在 Spring-AMQP 也提供了消息发送失败时的重试机制,也是基于 spring-retry 项目提供的 RetryTemplate 来实现。在 application.yaml
增加配置如下即可:
1 | spring: |
spring.rabbitmq.template.enable=true
配置项,来开启 Spring-AMQP 的发送重试的功能。同时,通过新增max-attempts
和initial-interval
配置项,设置重试次数和间隔。max-attempts
配置项要注意,是一条消息一共尝试消费总共max-attempts
次,包括首次的正常消费。另外,可以通过添加
spring.rabbitmq.template.retry.multiplier
配置项来实现递乘的时间间隔,添加
spring.rabbitmq.template.retry.max-interval
配置项来实现最大的时间间隔。
具体示例类似不再演示。
7 延时消息
rabbitMq延时消息的实现方案有两种,一种是通过死信队列实现,一种使用rabbitMq延时插件实现。
下面分别介绍
7.1 方法一 死信队列实现
在上一小节中,看到 Spring-AMQP 基于 RabbitMQ 提供的死信队列,通过 basic.nack
+ requeue=false
的方式,将重试消费到达上限次数的消息,投递到死信队列中。
本小节,还是基于 RabbitMQ 的死信队列,实现定时消息的功能。RabbitMQ 提供了过期时间 TTL 机制,可以设置消息在队列中的存活时长。在消息到达过期时间时,会从当前队列中删除,并被 RabbitMQ 自动转发到对应的死信队列中。
那么,如果创建消费者 Consumer ,来消费该死信队列,就实现了延迟队列的效果。
新建一个项目进行演示
7.1.1 依赖与配置
坐标
1 | <dependency> |
基本配置RabbitMQ
1 | spring: |
7.1.2 Demo09Message
创建 Demo09Message
代码如下:
1 |
|
7.1.3 RabbitConfig
创建 RabbitConfig
配置队列,代码如下:
1 |
|
- 在
#demo08Queue()
方法创建 Queue 时,设置了该队列的消息的默认过期时间为 10 秒。
7.1.4 Demo09Producer
创建 Demo09Producer
,代码如下:
1 |
|
- 调用
#syncSend(Integer id, Integer delay)
方法来发送消息时,如果传递了方法参数delay
,则会设置消息的 TTL 过期时间。
7.1.5 Demo09Consumer
创建 Demo09Consumer
代码如下:
1 |
|
- 在类上,添加了
@RabbitListener
注解,声明了消费的队列是"DELAY_QUEUE_DEMO_08"
这个延迟队列(死信队列) - 在消费逻辑中,正常消费该消息即可,实现自己需要的业务逻辑。
7.1.6 简单测试
创建 Demo09ProducerTest
进行测试
1 |
|
#testSyncSend01()
方法,不设置消息的过期时间,使用队列默认的消息过期时间10秒。#testSyncSend02()
方法,发送消息的过期时间为 5000 毫秒。
运行#testSyncSend01()
方法:
可以看到发送和消费的时间差为10秒,符合预期
运行#testSyncSend02()
方法:
可以看到发送和消费的时间差为5秒,符合预期
7.2 方法二 插件实现
7.2.1 安装插件
访问 Community Plugins — RabbitMQ 并搜索 rabbitmq_delayed_message_exchange
下载与 RabbitMQ
版本一致的插件
本文示例 RabbitMQ
为3.8.13 选择如下
下载完插件后,将其放置到RabbitMQ安装目录下的plugins
目录下,并使用如下命令启动这个插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
本文示例目录为
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.13/plugins
如果启动成功会出现如下信息:
The following plugins have been enabled: rabbitmq_delayed_message_exchange
启动插件成功后,记得重启一下RabbitMQ,让其生效。
1 | systemctl restart rabbitmq-server |
下面新建一个项目进行演示,依赖与配置与上文方法一一致
7.2.2 Demo10Message
创建 Demo10Message
代码如下
1 |
|
7.2.3 RabbitConfig
创建 RabbitConfig
代码如下
1 |
|
这里要特别注意的是,使用的是CustomExchange
,不是DirectExchange
,另外CustomExchange
的类型必须是x-delayed-message
。
7.2.4 Demo10Producer
创建 Demo10Producer
代码如下
1 |
|
注意在发送的时候,必须加上一个header
并设置延迟时间
7.2.5 Demo10Consumer
创建 Demo10Consumer
代码如下
1 |
|
7.2.6 简单测试
创建测试类,代码如下。
1 |
|
#testSyncSend01()
方法,不设置消息的过期时间,默认不延时发送。#testSyncSend02()
方法,发送消息的过期时间为 5000 毫秒。
执行#testSyncSend01()
方法
可以看到发送和消费几乎同时,符合预期
执行#testSyncSend02()
方法
可以看到延迟了5秒,符合预期
8 消息模式
在消息队列中,有两种经典的消息模式:「点对点」和「发布订阅」。
如果胖友有使用过 RocketMQ 或者 Kafka 消息队列,可能比较习惯的叫法是:
- 「点对点」 消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
- 「发布订阅」 消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
下面新建项目进行示例
8.1 点对点
在 RabbitMQ 中,如果多个 Consumer 订阅相同的 Queue ,那么每一条消息有且仅会被一个 Consumer 所消费。这个特性,就为我们实现点对点消费提供了基础。
在本示例中,我们会把一个 Queue 作为一个 Consumer Group ,同时创建消费该 Queue 的 Consumer 。这样,在我们启动多个 JVM 进程时,就会有多个 Consumer 消费该 Queue ,从而实现点对点消费的效果。
8.1.1 依赖与配置
maven坐标
1 | <dependency> |
配置
1 | spring: |
8.1.2 ClusteringMessage
创建 ClusteringMessage
代码如下
1 |
|
- 在这里,并没有定义 RoutingKey 的枚举,后面直接在消费者的注解中配置
8.1.3 RabbitConfig
创建配置类 RabbitConfig
代码如下
1 |
|
- 在这里,我们创建了 Exchange 类型是 Topic 。
考虑到点对点消费的模式会存在多 Consumer Group 消费的情况,显然要支持一条消息投递到多个 Queue 中,所以 Direct Exchange 基本就被排除了。
为什么不选择 Exchange 类型是 Fanout 或者 Headers 呢?实际是可以的, Spring Cloud Stream RabbitMQ 是默认是使用 Topic Exchange 的,所以这里也就使用 Topic Exchange 类型。
8.1.4 ClusteringProducer
创建生产者 ClusteringProducer
代码如下
1 |
|
传递的 routingKey
参数为 null
也是因为后面直接在消费者的注解中配置
8.1.5 ClusteringConsumer
创建 ClusteringConsumer
代码如下
1 |
|
- 相比其它 Consumer 示例来说,这里添加的
@RabbitListener
注解复杂很多。 - 在
bindings
属性,添加了@QueueBinding
注解,来定义了一个 Binding 。通过key
属性,设置使用的 RoutingKey 为#
,匹配所有。这就是为什么在ClusteringMessage
未定义 RoutingKey ,以及在ClusteringProducer
中使用routingKey = null
的原因。 - 在
exchange
属性,我们添加了@Exchange
注解,设置了对应EXCHANGE_CLUSTERING
这个 Exchange 。type
属性,设置是 TopicExchange 。declare
属性,因为8.1.2 RabbitConfig
中,已经配置创建这个 Exchange 了,表明这里并非创建交换机,而是使用配置的交换机
- 在
value
属性,我们添加了@Queue
注解,设置消费QUEUE_CLUSTERING-GROUP-01
这个 Queue 的消息。
使用 @Exchange
、@Queue
、@QueueBinding
注解时,如果未声明 declare="false"
时,会自动创建对应的 Exchange、Queue、Binding 。
8.1.6 简单测试
创建测试类,代码如下
1 |
|
首先,执行 #mock()
测试方法,先启动一个消费 "QUEUE_CLUSTERING-GROUP-01"
这个 Queue 的 Consumer 节点。
然后,执行 #testSyncSend()
测试方法,再启动一个消费 "QUEUE_CLUSTERING-GROUP-01"
这个 Queue 的 Consumer 节点。
同时调用 ClusteringProducer#syncSend(id)
方法,同步发送了 3 条消息。控制台输出如下:
testSyncSend
方法:
mock
方法:
可以看到一共发送了三条消息,其中一个节点消费了一条,另一个节点消费了两条。符合预期。
8.2 发布订阅
在点对点消息模式中,通过在 RabbitMQ 中,如果多个 Consumer 订阅相同的 Queue ,那么每一条消息有且仅会被一个 Consumer 所消费的特性,实现了点对点消费。
在发布订阅消息模式中,可以通过给每个 Consumer 创建一个其独有 Queue ,从而保证都能接收到全量的消息。
同时,RabbitMQ 支持队列的自动删除,所以可以在 Consumer 关闭的时候,通过该功能删除其独有的 Queue 。
下面示例在上一小节的项目中进行修改。
8.2.1 BroadcastMessage
创建 BroadcastMessage
代码如下
1 |
|
- 与上一节基本一致,队列名和交换机名不同。
8.2.2 RabbitConfig
在配置类中添加bean如下
1 | /** |
- 和上一节的
ClusteringConfiguration
配置类一致,只是创建了不同的 Exchange 。
9.2.3 BroadcastProducer
创建 BroadcastProducer
发送消息,代码如下
1 |
|
- 和上一节的
ClusteringProducer
基本一致,只是使用了不同的 Exchange 和消息 。
9.2.4 BroadcastConsumer
创建 BroadcastConsumer
消费消息,代码如下
1 |
|
- 总体和上一节
ClusteringConsumer
是一致,主要差异在两点。 - 第一点,在
@Queue
注解的name
属性,我们通过 Spring EL 表达式,在 Queue 的名字上,使用 UUID 生成其后缀。这样,我们就能保证每个项目启动的 Consumer 的 Queue 不同,以达到发布订阅消费的目的。 - 第二点,在
@Queue
注解的autoDelete
属性,我们设置为"true"
,这样在 Consumer 关闭时,该队列就可以被自动删除了。
9.2.5 简单测试
创建测试类,代码如下
1 |
|
首先,执行 #mock()
测试方法,先启动一个消费 "QUEUE_BROADCAST-${UUID1}"
这个 Queue 的 Consumer 节点。
然后,执行 #testSyncSend()
测试方法,再启动一个消费 "QUEUE_BROADCAST-${UUID2}"
这个 Queue 的 Consumer 节点。
同时调用 BroadcastProducer#syncSend(id)
方法,同步发送了 3 条消息。控制台输出如下:
mock
方法:
testSyncSend
方法:
可以看到两个节点都消费到了三条消息。符合预期
9 并发消费
在上述的示例中,我们配置的每一个 Spring-AMQP @RabbitListener
,都是串行消费的。显然,这在监听的 Queue 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。
在 @RabbitListener
注解中,有 concurrency
属性,它可以指定并发消费的线程数。例如说,如果设置 concurrency=4
时,Spring-AMQP 就会为该 @RabbitListener
创建 4 个线程,进行并发消费。
9.1 依赖与配置
maven坐标
1 | <dependency> |
yaml配置
1 | spring: |
- 增加三个参数:
spring.rabbitmq.listener.type
spring.rabbitmq.listener.simple.concurrency
spring.rabbitmq.listener.simple.max-concurrency
注意 spring.rabbitmq.listener.simple.max-concurrency
配置,是限制每个 @RabbitListener
的允许配置的 concurrency
最大大小。如果超过,则会抛出 IllegalArgumentException 异常。
9.2 Demo11Consumer
创建 Demo11Consumer
代码如下
1 |
|
通过配置 @RabbitListener
的 concurrency
属性开启多线程消费。
优先级最高,可覆盖配置文件中的 spring.rabbitmq.listener.simple.concurrency
配置项
Producer等类与第一节 Direct Exchange
示例类似,不再赘述。
9.3 简单测试
创建测试类,编写一个单元测试方法,发送 10 条消息,观察并发消费情况。代码如下:
1 |
|
可以看到有两个线程在消费,符合预期。
10 事务消息
考虑到不污染上述的示例,新建一个项目演示。
10.1 引入依赖
依赖坐标和相关配置与[1 Direct Exchange
] 一致
10.2 Demo12Message
创建Demo12Message
,代码如下
1 |
|
10.3 RabbitConfig
创建RabbitConfig
,代码如下。
1 |
|
- 在类上,添加
@EnableTransactionManagement
注解,开启Spring Transaction 的支持。 - 在
#rabbitTransactionManager()
方法,创建 RabbitTransactionManager 事务管理器 Bean 。 - 在
<Y>
处,标记创建的 RabbitMQ Channel 是事务性的,从而可以使用 RabbitMQ 的事务消息。
因为 Spring-AMQP 通过 RabbitTransactionManager 来实现对 Spring Transaction 的集成,所以在实际开发中,我们只需要配合使用 @Transactional
注解,来声明事务即可。
10.4 Demo12Producer
创建 Demo12Producer
代码如下
1 |
|
- 在发送消息方法上,添加了
@Transactional
注解,声明事务。因为创建了 RabbitTransactionManager 事务管理器,所以这里会创建 RabbitMQ 事务。 - 在
处,故意等待 10 秒,判断 RabbitMQ 事务是否生效。 - 如果同步发送消息成功后,Consumer 立即消费到该消息,说明未生效。
- 如果 Consumer 是 10 秒之后,才消费到该消息,说明已生效。
10.5 Demo12Consumer
创建 Demo12Consumer
代码如下
1 |
|
10.6 简单测试
创建测试类代码如下
1 |
|
可以看到消息发出10s后才消费消息,说明方法结束才事务提交导致消费,事务生效符合预期。
11 消费者的消息确认
在 RabbitMQ 中,Consumer 有两种消息确认的方式:
- 方式一,自动确认。
- 方式二,手动确认。
对于自动确认的方式,RabbitMQ Broker 只要将消息写入到 TCP Socket 中成功,就认为该消息投递成功,而无需 Consumer 手动确认。
对于手动确认的方式,RabbitMQ Broker 将消息发送给 Consumer 之后,由 Consumer 手动确认之后,才任务消息投递成功。
实际场景下,因为自动确认存在可能丢失消息的情况,所以在对可靠性有要求的场景下,我们基本采用手动确认。当然,如果允许消息有一定的丢失,对性能有更高的产经下,我们可以考虑采用自动确认。
在 Spring-AMQP 中,在 AcknowledgeMode 中,定义了三种消息确认的方式:
1 | // AcknowledgeMode.java |
- 实际上,就是将手动确认进一步细分,提供了由 Spring-AMQP 提供 Consumer 级别的自动确认。
在上述的示例中,都采用了 Spring-AMQP 默认的 AUTO
模式。下面搭建一个 MANUAL
模式,手动确认的示例。考虑到不污染上述的示例,新建一个项目。
11.1 依赖与有关类
maven依赖以及Message
、Config
、Producer
三个类与[1 Direct Exchange
]基本一致,编号顺延,不再赘述
11.2 Demo13Consumer
创建 Demo13Consumer
类,代码如下
1 |
|
- 在消费方法上,我们增加类型为 Channel 的方法参数,和
deliveryTag
。通过调用其Channel#basicAck(deliveryTag, multiple)
方法,可以进行消息的确认。 - 在
@RabbitListener
注解的ackMode
属性,我们可以设置自定义的 AcknowledgeMode 模式。 - 在消费逻辑中,我们故意只提交消费的消息的
Demo12Message.id
为奇数的消息。 这样,我们只需要发送一条id=1
,一条id=2
的消息,如果第二条的消费进度没有被提交,就可以说明手动提交消费进度成功。
11.3 简单测试
创建测试类,代码如下
1 |
|
可以看到有一条未确认的消息,符合预期
12 生产者的发送确认
在 RabbitMQ 中,默认情况下,Producer 发送消息的方法,只保证将消息写入到 TCP Socket 中成功,并不保证消息发送到 RabbitMQ Broker 成功,并且持久化消息到磁盘成功。
也就是说,上述示例,Producer 在发送消息都不是绝对可靠,是存在丢失消息的可能性。
担心,在 RabbitMQ 中,Producer 采用 Confirm 模式,实现发送消息的确认机制,以保证消息发送的可靠性。实现原理如下:
- 首先,Producer 通过调用
Channel#confirmSelect()
方法,将 Channel 设置为 Confirm 模式。 - 然后,在该 Channel 发送的消息时,需要先通过
Channel#getNextPublishSeqNo()
方法,给发送的消息分配一个唯一的 ID 编号(seqNo
从 1 开始递增),再发送该消息给 RabbitMQ Broker 。 - 之后,RabbitMQ Broker 在接收到该消息,并被路由到相应的队列之后,会发送一个包含消息的唯一编号(
deliveryTag
)的确认给 Producer 。
通过 seqNo
编号,将 Producer 发送消息的“请求”,和 RabbitMQ Broker 确认消息的“响应”串联在一起。
通过这样的方式,Producer 就可以知道消息是否成功发送到 RabbitMQ Broker 之中,保证消息发送的可靠性。不过要注意,整个执行的过程实际是异步,需要我们调用 Channel#waitForConfirms()
方法,同步阻塞等待 RabbitMQ Broker 确认消息的“响应”。
也因此,Producer 采用 Confirm 模式时,有三种编程方式:
【同步】普通 Confirm 模式:Producer 每发送一条消息后,调用
Channel#waitForConfirms()
方法,等待服务器端 Confirm 。【同步】批量 Confirm 模式:Producer 每发送一批消息后,调用
Channel#waitForConfirms()
方法,等待服务器端 Confirm 。本质上,和「普通 Confirm 模式」是一样的。
【异步】异步 Confirm 模式:Producer 提供一个回调方法,RabbitMQ Broker 在 Confirm 了一条或者多条消息后,Producer 会回调这个方法。
在 Spring-AMQP 中,在 ConfirmType 中,定义了三种消息确认的方式:
1 | // CachingConnectionFactory#ConfirmType.java |
在上面的示例中,我们都采用了 Spring-AMQP 默认的 NONE
模式。下面,我们来搭建两个示例,实现同步的 Confirm 模式和异步的 Confirm 模式。
12.1 同步 Confirm 模式
在本小节中,使用 ConfirmType.SIMPLE
类型,实现同步的 Confirm 模式。
这里的同步,指的是通过调用 Channel#waitForConfirms()
方法,同步阻塞等待 RabbitMQ Broker 确认消息的“响应”。
考虑到不污染上述的示例,新建一个项目演示。
12.1.1 依赖与有关类
maven依赖以及Message
、Config
、Consumer
三个类与[1 Direct Exchange
]基本一致,编号顺延,不再赘述
12.1.2 配置文件
1 | spring: |
新增 spring.rabbitmq.publisher-confirm-type=simple
配置项,设置 Confirm 类型为 ConfirmType.SIMPLE
。
在该类型下,Spring-AMQP 在创建完 RabbitMQ Channel 之后,会自动调用 Channel#confirmSelect()
方法,将 Channel 设置为 Confirm 模式。
12.1.3 Demo14Producer
创建 Demo14Producer
代码如下
1 |
|
在 RabbitTemplate 提供的 API 方法中,如果 Producer 要使用同步的 Confirm 模式,需要调用 #invoke(action, acks, nacks)
方法。代码如下:
1 | // RabbitOperations.java |
- 因为 Confirm 模式需要基于相同 Channel ,所以我们需要使用该方法。
- 在方法参数
action
中,我们可以自定义操作。 - 在方法参数
acks
中,定义接收到 RabbitMQ Broker 的成功“响应”时的成回调。 - 在方法参数
nacks
中,定义接收到 RabbitMQ Broker 的失败“响应”时的成回调。- 当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。
- 如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
12.1.4 简单测试
编写测试类代码如下
1 |
|
每条日志对应解释如下
主线程,Producer 发送 1 条消息完成。
AMQConnection 线程,Producer 确认收到 RabbitMQ Broker 对该消息的成功“响应” 。
主线程,Producer 等待该消息的 Confirm 完成。
单元测试,打印下日志,可以忽略
消费者的线程,Consumer 消费到该消息
符合预期
12.2 异步 Confirm 模式
在本小节中,使用 ConfirmType.SIMPLE
类型,实现异步的 Confirm 模式。
考虑到不污染上述的示例,新建一个项目演示。
12.2.1 依赖与有关类
maven依赖以及Message
、Config
、Consumer
三个类与[1 Direct Exchange
]基本一致,编号顺延,不再赘述
12.2.2 配置文件
1 | spring: |
新增 spring.rabbitmq.publisher-confirm-type=correlated
配置项,设置 Confirm 类型为 ConfirmType.CORRELATED
。
在该类型下,Spring-AMQP 在创建完 RabbitMQ Channel 之后,也会自动调用 Channel#confirmSelect()
方法,将 Channel 设置为 Confirm 模式。
12.2.3 RabbitProducerConfirmCallback
创建 RabbitProducerConfirmCallback
类,实现 RabbitTemplate.ConfirmCallback 接口
提供 Producer 收到 RabbitMQ 确认消息的“响应”的回调。代码如下:
1 |
|
- 在构造方法中,把自己设置到 RabbitTemplate 中,作为 Confirm 的回调。
- 在
#confirm(...)
方法中,根据是否ack
成功,打印不同的日志。
12.2.4 Demo15Producer
创建 Demo15Producer
代码如下
1 |
|
12.2.5 简单测试
编写测试类代码如下
1 |
|
第二条日志RabbitConnectionFactory 线程,Producer 确认收到 RabbitMQ Broker 对该消息的成功“响应” 。
在 Demo13Producer 发送消息的时候,并未传入 CorrelationData 参数,所以为 null 。
符合预期
13 MessageConverter
在 Spring-AMQP 中,通过 MessageConverter 来作为消息转换器:
默认情况下,RabbitTemplate 采用 SimpleMessageConverter 。而 SimpleMessageConverter 内部,采用 Java 自带序列化方式,实现对 Java POJO 对象的序列化和反序列化,所以官方目前不是很推荐。主要缺点如下:
- 无法跨语言
- 序列化后的字节数组太大
- 序列化性能太低
因此一般情况下,建议采用 Jackson2JsonMessageConverter ,使用 JSON 实现对 Java POJO 对象的序列化和反序列化。
- 在序列化时,我们使用了 Jackson2JsonMessageConverter 序列化 Message 消息对象,它会在 RabbitMQ 消息 MessageProperties 的
__TypeId__
上,值为 Message 消息对应的类全名。 - 在反序列化时,我们使用了 Jackson2JsonMessageConverter 序列化出 Message 消息对象,它会根据 RabbitMQ 消息 MessageProperties 的
__TypeId__
的值,反序列化消息内容成该 Message 对象。
下面,搭建一个 Jackson2JsonMessageConverter 的使用示例。
考虑到不污染上述的示例,新建一个演示。
13.1 依赖
maven依赖如下
1 | <dependencies> |
13.2 配置与有关类
配置以及Message
、Producer
两个类与[1 Direct Exchange
]基本一致,编号顺延,不再赘述
13.3 RabbitConfig
创建配置类,代码如下
1 |
|
在 #messageConverter()
方法,创建 Jackson2JsonMessageConverter Bean 对象。
后续,RabbitAutoConfiguration.RabbitTemplateConfiguration 在创建 RabbitTemplate Bean 时,会自动注入它。
13.4 Demo16Consumer
创建 Demo16Consumer
代码如下
1 |
|
- 因为希望通过查看具体消息内容,判断是不是真的使用 JSON 格式,所以采用 AMQP Message 接收消息。
14.5 简单测试
编写测试类,代码如下
1 |
|
可以看到JSON格式的消息,符合预期。
14 消费异常处理器
在[6 重试与死信队列]
中, Consumer 消费异常时,Spring-AMQP 提供消费重试机制。
除此之外,在 Spring-AMQP 中可以自定义消费异常时的处理器。目前有两个接口,可以实现对 Consumer 消费异常的处理:
org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler
接口org.springframework.util.ErrorHandler
接口
下面搭建一个 RabbitListenerErrorHandler 和 ErrorHandler 的使用示例。新建一个项目。
14.1 RabbitListenerErrorHandlerImpl
创建 RabbitListenerErrorHandlerImpl
类实现 RabbitListenerErrorHandler
接口,代码如下
1 |
|
- 在类上,添加
@Component
注解,并设置其 Bean 名为"rabbitListenerErrorHandler"
。稍后,我们会使用到该 Bean 名字。 - 在
#handleError(...)
方法中,我们先打印异常日志,并继续抛出 ListenerExecutionFailedException 异常。 - 要注意,如果此时不继续抛出异常,而是
return
结果,意味着 Consumer 消费成功。 - 如果结合
[6 重试与死信队列]
一起使用的时候,一定要继续抛出该异常,否则消费重试机制将失效。
14.2 RabbitLoggingErrorHandler
创建 RabbitLoggingErrorHandler
实现 ErrorHandler
接口代码如下
1 |
|
- 在构造方法中,把自己设置到 SimpleRabbitListenerContainerFactory 中,作为其 ErrorHandler 异常处理器。
- 在
#handleError(...)
方法中,打印错误日志。
在执行顺序上,RabbitListenerErrorHandler 先于 ErrorHandler 执行。
另外,RabbitListenerErrorHandler 需要每个 @RabbitListener
注解上,需要每个手动设置下 errorHandler
属性。而 ErrorHandler 是相对全局的,所有 SimpleRabbitListenerContainerFactory 创建的 SimpleMessageListenerContainer 都会生效。
14.3 配置
有关配置如下
1 | spring: |
配置了消费重试次数,因为后续测试会故意抛出异常
14.4 配置与有关类
配置以及Message
、Producer
、config
三个类与[1 Direct Exchange
]基本一致,编号顺延,不再赘述
14.5 Demo17Consumer
创建 Demo17Consumer
类,代码如下
1 |
|
- 在
@RabbitListener
注解上,我们通过设置errorHandler
属性为[14.1 RabbitListenerErrorHandlerImpl]
的名字。 - 在
#onMessage(...)
方法中,通过抛出 RuntimeException 异常,模拟消费异常。
14.6 简单测试
编写测试类,代码如下
1 |
|
- 在三次重试中,每次尝试消费失败时
RabbitListenerErrorHandlerImpl
处理异常 - 三次重试达到上限后
RabbitLoggingErrorHandler
处理了异常