admin 管理员组文章数量: 1087649
RabbitMQ 保证消息可靠性传输
RabbitMQ 保证消息可靠性传输
- 消息可靠性问题
- 解决消息可靠性问题
- 生产者到 Queue 阶段消息丢失
- Exchange 到 Queue 阶段
- 消息进入消费者
我们知道,RabbltMQ 不能够保证消息不会丢失,那么在高并发场景下,如何保证消息的可靠性传输?
消息可靠性问题
Publisher 发送消息到 Exchange 阶段:
- Publisher 发送的消息未到达 Exchange。
- 消息到达 Exchange 未到达 Queue。
Exchange 发送消息到 Queue 阶段:MQ 宕机,Queue 将消息丢失。
Queue 发送消息到 Consumer 阶段:Consmer 接收消息后未消费就宕机。
解决消息可靠性问题
RabbitMQ 提供了解决消息可靠性问题的方案。
生产者到 Queue 阶段消息丢失
生产者消息确认:
RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。消息发送到 MQ 后,会返回一个结果,表示消息是否成功。
publisher-confirm,发送者确认。
- 消息成功投递到交换机,返回 ack
- 消息未投递到交换机,返回 nack
publisher-return,发送者回执。
- 消息投递到交换机了,但是没有路由到队列。返回 ACK,以及路由失败原因。
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一 id,以区分不同消息,避免 ack 冲突。
ReturnCallback 和 ConfimCallback 进行生产者确认。ReturnCallback 只能够重写一个,而 ConfimCallback 可以每个生产者都进行重写。
# 通过配置开启 生产者消息确认。
spring:rabbitmq:listener:simple:publisher-confirm-type: correlated # 异步回调,定义 ConfirmCallback,MQ 返回结果时会回调这个 ConfirmCallback。publisher-returns: true # 开启 publisher-confirm 功能,同样基于 callback 机制,不过是定义 ReturnCallback。template:mandatory: true # 定义消息路由失败的策略。true 调用 ReturnCallback;false 丢弃消息。
重写 ReturnCallback 方法:
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 判断是否是延迟消息Integer receivedDelay = message.getMessageProperties().getReceivedDelay();if (receivedDelay != null && receivedDelay > 0) {// 是一个延迟消息,忽略这个错误提示return;}// 记录日志log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有需要的话,重发消息// rabbitTemplate.convertAndSend(exchange, routingKey, message);});}
}
重写 ConfimCallback 方法:
@Testpublic void testSendMessage2SimpleQueue() throws InterruptedException {// 消息String msg = "hello, spring amqp";// correlationData// 1. 准备 idCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 2. 准备 ConfirmCallbackcorrelationData.getFuture().addCallback( result -> { // 成功// 判断结果if (result.isAck()) {// ACKlog.debug("消息 {} 成功投递到交换机。", correlationData.getId());}else {// NACKlog.error("消息投递到交换机失败!消息 ID:{}", correlationData.getId());}}, ex -> { // 失败// 记录日志log.error("消息发送失败,失败原因是:" + ex);// 也可以重发消息});// 发送消息rabbitTemplate.convertAndSend("amq.topic", "simple.test", msg, correlationData);}
Exchange 到 Queue 阶段
消息传递到 Queue 中,而此时 Queue 宕机,消息丢失。
针对这种情况,可以采取消息持久化的方式:
- 交换机持久化(默认持久化)。
- 队列持久化(默认持久化)。
- 消息持久化,SpringAMQP中的消息默认持久化。
// 声明持久化交换器 @Beanpublic DirectExchange simpleDirect(){return new DirectExchange("simple.direct", true, false);}// 声明持久化队列并绑定@Beanpublic Queue simpleQueue(){return QueueBuilder.durable("simple.queue").build();}// 消息持久化// 1. 准备消息Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化消息设置.build();
消息进入消费者
消费者消息确认:
消费者处理消息后,向 MQ 发送 ack 回执, MQ 收到 ack 回执后才会删除该消息。
SpringAMQP 允许配置三种模式:
manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack。
auto:自动 ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack;抛出异常则返回nack(推荐)。
none:关闭 ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即删除。
消费失败重试机制:
消费者消息确认机制中,消费者消费失败就永无停止的重发,浪费资源。
可以利用 Spring 的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制的 requeue 到队列。
消费者失败消息处理策略
开启重试模式后,重试次数耗尽,如果消息依然失败,则需要 MessageRecoverer 接口来处理,有三种实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息(默认)。
ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队。
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定交换机(异常交换机)。
通过 yaml 文件配置消费者确认机制和 Spring 重试机制。
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 消费者确认机制 自动 ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack;抛出异常则返回nack(推荐)。retry: # 启用 spring 的 retry 机制enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为 1 smultiplier: 3 # 下次失败的等等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 4 # 最大尝试次数stateless: true # true 无状态;false 有状态。如果业务包含事务,改为 false。
配置异常队列与失败消息处理策略:
@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorMessageBinding(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}/*** 消费者失败消息处理策略* @param rabbitTemplate* @return*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
本文标签: RabbitMQ 保证消息可靠性传输
版权声明:本文标题:RabbitMQ 保证消息可靠性传输 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/p/1700323891a396972.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论