首页 - 通讯 - RabbitMQ 如何避免丢失消息

RabbitMQ 如何避免丢失消息

2023-10-03 10:59

消息丢失

消息从生产到消费经历三个阶段,即生产、队列转发和消费。每个链接中的消息都可能丢失。

以下是RabbitMQ举个例子来说明每个阶段会出现的问题以及如何解决。在讲解之前,我们先回顾一下RabbitMQ的基本架构图

1。 Producer生产消息到RabbitMQ Server消息丢失场景

1。网络问题

外部环境问题原因:网络丢包、网络故障等导致RabbitMQ Server收不到消息,因为生产环境的网络非常复杂,网络抖动、丢包的情况很常见。下面我们就来说说这个问题是如何解决的。

2。代码层面和配置层面考虑不足导致消息丢失

生产者一般使用Confirm模式来传递消息。如果解决方案不够严谨,比如RabbitMQ Server接收消息失败后会发送nack消息通知生产者。生产者未能监控消息或不执行任何操作,存在消息丢失的风险。 ;
生产者将消息发送到交换机后,发送的路由没有绑定到队列,消息将会丢失。下面会提到具体的例子,以确保意外情况发生时,即使发生也在可控范围内。 。

解决方法:开启确认模式

首先,生产者通过调用channel.confirmSelect方法将频道设置为确认模式。一旦通道进入确认模式,该通道上发布的所有消息都会被分配一个唯一的ID(从1开始)。一旦消息投递到达所有匹配队列后,RabbitMQ就会向生产者发送一个确认(Basic.Ack)(包含消息的唯一deliveryTag和多个参数)。

其实Confirm模式有三种实现方式:
串行confirm模式:生产者发送消息后,调用waitForConfirms()方法,等待broker确认。如果服务器返回 false 或者在超时时间内没有返回,则客户端重传消息。

对于int=0;i <50;i++){频道.基本发布(交换, routingKey,强制,立即 messageProperties消息获取内容( ));如果  { 系统.out.println("发送成功" );} else {//这里发消息失败 重新发送逻辑 系统.out .println(“发送失败”) ;}
}

批量确认模式:生产者发送一批消息后,调用waitForConfirms()方法,等待broker确认。
问题:一旦confirm返回false或者超时,客户端需要重新发送本批次的所有消息,这会带来明显数量的重复消息,并且当消息经常丢失时,批量confirm性能会受到影响。下降而不是上升。

对于int=0;i <50;i++){频道.基本发布(交换, routingKey,强制,立即 messageProperties消息获取内容( ));
}
if 通道等待确认() )){系统out.println("发送成功");
}其他{系统.println("发送失败");
}

异步确认模式:提供回调方法。当broker确认一条或多条消息后,生产者会回调该方法。我们分别来看看这三种确认模式。

 公共 void 发送队列字符串)  appId, StringhandleUserId, 列表<字符串>deviceIds) {列表<对象> 列表 =  ArrayList<>();JSONObject jsonObject =  JSONObject() ;jsonObject.put(设备常量.命令,删除);jsonObject.放置设备常数BODY,列表);字符串主题交换 = RabbitMqConstant.EXCHANGE_TOPIC_DATA ;StringroutingKey =RabbitMqConstant.ROUTING_KEY_ LOCAL_DATA;//rabbitTemplate.convertAndSend(topicExchange,routingKey,jsonObject.toJSONString() );尝试{频道频道=rabbitTemplate .getConnectionFactory().创建连接().创建通道( );频道.确认选择();频道.基本发布(topicExchange,routingKey, null, jsonObject.toJSONString( ).getBytes() );频道.添加确认监听器(确认监听器() { //消息失败处理@Overridepublic voidhandleNack(发货标签, 布尔值多个) 抛出 IOException{log.信息("sendQueue-ack-confirm-fail==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}--message:{}", topicExchange ,routingKey,交付标签,多个,  jsonObject);尝试  {线程.睡眠(3000l);} catch InterruptedException e {抛出运行时异常e);}//重发频道.basicPublish (topicExchange,routingKey , null, jsonObject.toJSONString().获取字节( ));}//消息成功处理@OverridepublicvoidhandleAck(交付标签,布尔值多个) 抛出 IOException {log .info("sendQueue-ack-confirm-successs==>交换:{}--routingkey:{}--deliveryTag:{}--multiple:{}", topicExchange,routingKey,deliveryTag, 多个);}} );} catch (异常 e ){log错误("sendQueue-ack-发送消息失败:{}",ExceptionUtils. getStackTrace(e));}} 

2。队列本身可能会丢失消息

1。该消息未完全保留。当机器重启后,所有消息都会丢失,甚至Queue也没有了

仅持久化Message,不持久化Exchange和Queue。这种坚持是无效的。

解决方案:

开关持久性:声明开关时将持久性设置为 true。

//参数1:开关名称
//参数2:交换机类型,topic/direct/fanout/headers
//参数3:是否持久
通道.exchangeDeclare(exchangeName,exchangeType, 真);

队列持久化:声明队列时将durable参数设置为true。

消息持久化:

如果希望消息持久化,需要修改消息生产者推送消息的方法中的参数。将此属性添加到 MessageProperties.PERSISTENT_TEXT_PLAIN。

2。单节点模式问题,节点挂了,消息只存在于当前节点上。如果硬盘损坏,消息将无法恢复

如果实现消息持久化方案,消息会持久化到硬盘,机器重启后不会丢失;但也有一种极端的情况,服务器磁盘突然坏了(公司遇到过很多磁盘问题),消息无法持久化,不处于高可用状态。这种模式在生产环境中要慎重考虑。

3。默认集群模式下,消息只会存在于当前节点,不会同步到其他节点。其他节点只会同步本节点的队列结构


上图中的三个节点分别是创建了 RabbitMQ 集群。其中,交换是交换。其元数据信息(交换名称、交换属性、绑定键等)在所有节点上都是一致的,队列中的实际消息数据只会存在于所有节点上。在创建的节点上,其他节点只知道这个队列的元数据信息和一个指向拥有这个消息的队列的节点指针
RabbitMQ 集群将同步四种类型的内部元数据:队列元数据(队列名称和属性)、交换元数据(交换机名称和属性)、绑定密钥和虚拟机。当用户访问任意一个rabbitmq节点时,查询到的queue、user、exchange、vhost等信息都是一致的。

那为什么普通集群只同步元数据而不同步消息内容呢?这涉及到存储空间和性能问题。如果每个节点都保留一条消息,就会导致每个节点的空间非常大,消息的积压会增加,而积压问题无法通过扩展节点容量来解决。另外,如果每个节点都存储消息的副本,对于持久化消息来说,内存和磁盘的同步复制机制会对性能造成很大的影响。

工作原理


上图中的三个节点,节点1为数据节点(即实际存储消息内容的节点)。
如果客户端(生产者或消费者)与节点1建立连接,那么消息的发送和接收只会在节点1上进行(可以理解为简单的单机模式);

如果客户端(消费者)与节点2或节点3建立连接,由于数据在节点1上,节点2或节点3只会起到消息转发的作用,比如这个客户端是消费者,那么消息会被节点2或节点3从节点1拉取,然后通过自己的节点路由到消费者;
如果是客户端(生产者),那么消息会先发送到节点2或节点3,然后路由到节点1的队列并存储。

节点可以是磁盘节点和内存节点。磁盘节点将元数据存储在磁盘上,内存节点将元数据存储在内存中。
这里需要注意的是,内存节点在内存中只存储元数据(如队列名称和属性、交换机名称和属性、虚拟机等),所以资源管理(创建和删除队列、交换机、虚拟机等)机)等)得到了改善,但发布和订阅的消息速率并没有得到改善。
RabbitMQ 需要集群中至少有一个磁盘节点。当节点加入和离开集群时,必须通知磁盘节点(如果集群中唯一的磁盘节点崩溃,则无法创建队列、创建交换机、创建绑定、添加用户、更改权限、添加和删除集群节点)。如果唯一磁盘的磁盘节点崩溃了,集群可以继续运行,但无法改变任何事情。因此,建议集群中设置两个磁盘节点。只有一台可以正常运行。总之,在无法知道如何使用磁盘节点的情况下,建议使用磁盘节点,以保证最佳效果。

总结:普通集群模式无法保证服务的高可用,因为其他节点只复制队列、交换机等元数据信息,并不复制实际的消息内容到自己的节点。这种部署模型只解决了单个节点的压力问题,但是当数据节点宕机后,就无法提供服务,消息路由线路被阻塞,客户端无法继续与服务交互。为了解决这个问题,需要将消息数据复制到集群中的其他节点,因此rabbitmq引入了镜像部署模式。

解决方案:镜像部署,消息会同步到其他节点,可以设置同步节点数量,但吞吐量会下降。


Rabbitmq 的镜子cluster实际上是在普通集群的基础上增加了一个策略。需要和普通集群一样部署。部署完成后,镜像队列创建完成。该策略实现了主备节点之间的消息同步。也就是说,每个备节点都有与主节点相同的队列。这个队列是主节点通过创建镜像队列来生成的,这些备用节点可以及时同步主节点中的排队消息。当消息设置持久化后,每个节点都有自己本地的消息持久化存储机制。当消息入队和出队时,主节点上的所有操作都会同步到备节点进行更新。在这种集群模式下,主节点宕机后,备节点保留的消息与主节点完全相同,从而实现高可用性。

工作原理


上图为镜像集群模式实现过程包括三个节点(主节点、备份节点1、备份节点2)和三个镜像队列(备份节点队列on是由主节点镜像生成的)。需要注意的是,这里的主节点和备份节点是针对某个队列的,不能认为一个节点是所有队列的主节点,因为在整个镜像集群模式中,会存在多个节点和多个队列。队列。此时任意节点都可以作为某个队列的镜像主节点,其他节点成为镜像备份节点(例如:有A、B、C三个节点和Q1、Q2、Q3三个队列,如果A作为Q1的镜像主节点,则B和C作为Q1的镜像备份节点,在此基础上,如果B作为Q2的镜像主节点,则A和C成为Q2的镜像备份节点.节点)。

每个队列由两部分组成,一是队列,用于接收消息和发布消息,二是BackingQueue,用于本地消息持久化。客户端发送到主节点队列的消息和ack响应将同步到其他备用节点。

镜像主队列(mirror_queue_master)上的所有操作都会通过组播GM同步到其他备份节点。这里的GM负责广播消息,mirror_queue_slave负责回调处理(更新这个同步内容),所以当有消息发送到备份节点时,mirror_queue_slave做实际的处理,并将消息存储到队列中。如果是持久化消息,也存储在BackingQueue中。 master上的回调由coordinator处理(发布同步内容)。在主节点中,BackingQueue的存储是通过Queue来调用的。对于生产者来说,消息发送到队列后,再调用mirror_queue_master进行持久化处理,然后通过GM广播将同步消息发送到备份节点。备份节点通过回调mirror_queue_slave将消息同步到队列和BackingQueue;对于消费来说,比如从队列中获取消息后,消息队列会等待消费者的ack响应。收到ack响应后,队列和BackingQueue中的消息将被删除,并且此ack的内容将通过GM广播发送到备份节点进行同步。这个操作。如果从站出现故障,不会影响客户端的服务提供。如果master宕机,其他备份节点将提升为master,继续服务消息而不会丢失。那么这多个备份节点如何选择其中一个作为主节点呢?这里选择“最老”的节点作为主节点,因为这个备用节点是与其他节点相比同步时间最长、同步状态最好的节点。然而,如果没有从机与主机完全同步的情况,那么主机中未同步的消息将会丢失。

GM

GM模块实现的可靠组播通信协议。该协议可以保证组播消息的原子性,即保证组内所有活节点要么收到消息,要么不收到消息。
其实现大致如下:所有节点组成一个循环链表。每个节点都会监视其左侧和右侧的节点。当添加节点时,相邻节点确保当前广播消息将被复制到新节点。在节点上;当一个节点发生故障时,相邻节点将接管,以确保广播消息被复制到下一个节点。主节点和从节点上的这些GM组成一个组,组(gm_group)信息会记录在mnesia中。不同的镜像队列组成不同的组。消息从主节点对应的GM发出后,沿着链表依次传输到所有节点。由于所有节点形成循环链表,因此主节点对应的GM最终会收到自己发送的消息。此时,主节点就知道该消息已被复制到所有从节点。还需要注意的是,每个新节点的添加都会首先清除该节点的原有数据。下图是新节点加入集群的简单模型:
消息同步:
将新节点添加到现有镜像队列中。默认情况下 ha-sync-mode=manual,镜像队列中的消息不会主动同步到新节点,除非显式调用同步命令。当调用同步命令时,队列开始阻塞,直到同步完成后才能进行操作。

总结

镜像集群模式使所有节点通过从主节点复制消息来保留数据的副本。一旦主节点崩溃,备份节点可以完成替换并继续对外提供服务。这样解决了节点宕机带来的问题,提高了服务稳定性,但无法实现负载均衡,因为每个操作都必须在所有节点上完成,这无疑降低了系统性能。此外,当大量消息加入队列时,同步通信会极大消耗集群内的网络带宽。因此更适合可靠性要求高、性能要求低、消息量小的场景。如果对高可用和负载均衡都有要求,则需要与HAProxy(实现节点间负载均衡)和keepalived(实现HAproxy的主备模式)中间件配合使用。下面我们就过一遍这个场景的整个部署流程。概述。

3。消费者可能会丢失消息

消费端采用自动ack机制。在处理完成之前,消费者端宕机了。

消费者可能需要一段时间才能完成任务。如果其中一个消费者正在处理一项很长的任务,只完成了其中的一部分,然后突然挂起,消息就会丢失。因为一旦 RabbitMQ 将消息传递给消费者,它就会立即将该消息标记为删除。

解决方案:改为手动ack,消息正确处理后再通知mq。消费者处理消息异常后,返回nack,以便mq将消息投递给另一个消费者。

如何回复消息

  1. Channel.basicAck(long DeliveryTag, boolean multiple):用于肯定确认。 RabbitMQ 已经知道该消息并成功处理它,因此它可以丢弃它

deliveryTag:消息的索引
multiple:是否批量。 true:所有小于deliveryTag的消息都会被立即确认。

多参数分析
true表示批量响应
例如,页面上有标签为5,6,7,8的消息频道。当前标签为 8
那么此时消息 5-8 将确认收到消息回复
false 与上述相比,
只会回复消息 5、6 ,和 7,标签=8。这三个消息仍然不会 消息响应的确认{IMG_10: Ahr0chm6ly9pbwctymxvzy5jc2RMNul2M5yzq4mzm0NDQ0NTI5MJGXOGIWMRIMJNDQ1LNBUZW ==/}

  1. Channel.void basicNack(long DeliveryTag, boolean multiple, boolean requeue):用于否定确认

deliveryTag:消息的索引。
multiple:是否批量。 true:所有小于deliveryTag的消息将被立即拒绝。
requeue:被拒绝的是否重新排队。

  1. Channel.basicReject(long DeliveryTag, boolean requeue):用于否定确认(推荐)

deliveryTag:消息的索引。
requeue:被拒绝的是否重新排队。

basicNack() 和 basicReject() 的区别在于 basicNack() 可以批量拒绝消息,而 basicReject() 一次只能拒绝一条消息。

演示

 @RabbitHandler@RabbitListener队列= RabbitMqConstant.xxx ,并发= "1-1")公共 void  receiveQueueCommonLocal通道通道,消息消息){字符串   messageBody =  字符串(消息.getBody());  //System.out.println("messageBody===> "+ messageBody);try {//todo业务逻辑/*手动确认成功*参数:*deliveryTag:消息的索引* multiple:是否批量process.true: 将立即确认小于deliveryTag的所有消息* **/channel.basicAck(message.  getMessageProperties() . } catch 异常)  e) {e.打印堆栈跟踪();日志.错误(“receiveQueueCommonLocal=====>错误:{} --josn:{}", ExceptionUtil.getMessage(e), 消息正文);尝试 {//手动确认回滚,拒绝deliveryTag对应的消息。第二个参数是是否重新排队。如果为true,则重新进入队列,否则将被丢弃或进入死信队列。 通道基本拒绝消息getMessageProperties().获取送货标签() ),true);}抓住 (IOException ex ) {抛出  运行时异常 (ex);} } }

文章来源:https://www.gsm-guard.net/u_15840568/5784352
https://www.gsm-guard.net/p/79545722
集群:https://www.gsm-guard.net/ weixin_43498985/article/details/122185972
消费者确认:https://www.gsm-guard.net/m0_64337991/article/details/122755297
https://www.gsm-guard.net/p/4832891 06 ?utm_id= 0