RabbitMQ与RedisMQ

Redis队列

rpush,消息类型定义,不同业务定义不同消息类型,while true,一直循环等待取lpop,取不到time。sleep

两个队列对比

redis :没有相应的机制保证消息的可靠消费,如果发布者发布一条消息,而没有对应的订阅者的话,或者消费失败的话,这条消息将丢失,不会存在内存中

rabbitmq:具有消息消费确认机制,如果发布一条消息,还没有消费者消费该队列,那么这条消息将一直存放在队列中,直到有消费者消费了该条消息,以此可以保证消息的可靠消费

redis:实时性高,redis作为高效的缓存服务器,所有数据都存在内存中,所以它具有更高的实时性消费者负载均衡

rabbitmq:可以被多个消费者同时监控消费,但是每一条消息只能被消费一次,由于rabbitmq的消费确认机制,因此它能够根据消费者的消费能力而调整它的负载

redis:发布订阅模式,一个队列可以被多个消费者同时订阅,当有消息到达时,会将该消息依次发送给每个订阅者,她是一种消息的广播形式,redis本身不做消费者的负载均衡,因此消费效率存在瓶颈;

rabbitmq:队列,每条消息都可以选择性持久化,持久化粒度更小,更灵活

rabbitmq实现了后台监控平台,可以在该平台上看到所有创建的队列的详细情况,良好的后台管理平台可以方面我们更好的使用

redis没有所谓的监控平台

redis:轻量级,低延迟,高并发,低可靠性;
rabbitmq:重量级,高可靠,异步,不保证实时

RabbitMQ

原生支持AMQP协议,还支持STOMP, MQTT等多种消

AMQP

三层协议:高中底层

交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。
队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。
绑定 (Binding): 一套规则,告知交换器消息应该将消息投递给哪个队列

基本概念

Publisher 生产者->exchange交换机发送消息

Consumer 消费者->从消息队列中取得消息

Broker 客户端连接,实体服务

Virtual host 一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue

Message 消息 由Properties和Body组成(消息头和消息体)。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body这就是消息体内容

Exchange 交换机,接收生产者发送的消息,根据路由键转发消息到绑定的队列。

三种常见的交换机类型:

1、direct(发布与订阅,完全匹配)

2、fanout(广播)

3、topic(主题,规则匹配)

Routing key:路由键。一个路由规则,虚拟机可用它来确定如何路由一个特定消息
消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的。RabbitMQ也会将其和绑定使用的路由键进行匹配。
如果匹配,消息将投递到该队列;如果不匹配,消息将会进入黑洞

Binding:绑定。Exchange和Queue之间的虚拟连接,binding中可以包含routing key

Channel:网络信道,是TCP里面的虚拟连接。几乎所有的操作都在Channel中进行, Channel是进行消息读写的通道

TCP一旦打开,就会创建AMQP信道。

无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。

为什么不直接通信:
1、TCP的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手
2、如果不用信道,那应用程序就会以TCP连接RabbitMQ,高峰时每秒成千上万条连接会造成资源巨大浪费,而且操作系统每秒处理TCP连接数也是有限制的,必定造成性能瓶颈
3、信道的原理是一条线程一条通道,多条线程多条通道同用一条TCP连接。一条TCP连接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能瓶颈

Queue:一个消息可以投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列上将其取走

流程

发消息的时候需要指定发往哪个Exchange,然后借助routing key发送到对应的消息队列queue,消费者从订阅的消息队列上取消息

exchange模式

direc:完全匹配RouteKey

topic:通配符模糊匹配RouteKey

Fanout:群发,不关心routekey

相关问题

如何保证可靠性

1、消息落库,消息持久化,消息标记, 在高并发场景下,数据库IO压力大,不适用

2、延迟投递:

发送first——send消息

同时发送一个延迟的检查消息(检查第一次发送消息消费情况)

消费端消费消息,消费后发送一个确认ack给broker

回调服务检测到消费端的确认消息,进行数据库的状态持久化(这样相当于数据库一次操作,异步持久化)回调服务检测到消费端的确认消息,进行数据库的状态持久化(这样相当于数据库一次操作,异步持久化)

回调服务响应第二个延时消息,确认消息成功消费,如果出现异常,回调服务调用RPC给生产者,再次发送

消息幂等性,避免重复消费

幂等性即对数据进行若干次操作,仍然保证正确

消息不会消费多次,只消费一次

唯一ID加上时间戳

MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据

消息确认

消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。

生产者进行接收应答,用来确定这个消息是否正常的发送到Broker,这种方式也是消息的可靠性

Return Listener用于处理一些不可路由的消息!

消息限流

RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置Qos的值)未被确认前,不进行消费新的消息

ACK机制

消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功。

消息确认ACK:如果在处理消息的过程中,消费者的服务器在处理消息时出现异常,那可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确认ACK。

ACK的消息确认机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。

1、如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。

2、如果在集群的情况下:RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了不丢失任何消息和任务。

3、消息永远不会从RabbitMQ中删除:只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会删除

4、消息的ACK确认机制默认开

重回队列

消费失败的消息都会放回队列,一般都会关闭

TTL队列

消息过期机制

第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间,第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准

死信队列

当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

消息变成死信:

1、 消息被拒绝(basic.reject/basic.nack)并且request=false;
2、 消息TTL过期;
3、 队列达到最大长度

可以监听这个队列进行处理

生产消息流程

1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel)

2.Producer声明一个交换器并设置好相关属性

3.Producer声明一个队列并设置好相关属性

4.Producer通过路由键将交换器和队列绑定起来

5.Producer发送消息到Broker,其中包含路由键、交换器等信息

6.相应的交换器根据接收到的路由键查找匹配的队列

7.如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者(两种处理方式)

8.关闭信道

9.管理连接

消费者接收消息过程

1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel)

2.向Broker请求消费响应的队列中消息,可能会设置响应的回调函数

3.等待Broker回应并投递相应队列中的消息,接收消息

4.消费者确认收到的消息,ack

5.RabbitMq从队列中删除已经确定的消息

6.关闭信道

7.关闭连接

事务,拒绝消息,延迟,优先处理

保证层级

最多一次,最少一次,恰好一次

队列结构

rabbit_amqqueue_process :负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的 confirm 和消费端的 ack) 等。
backing_queue:是消息存储的具体形式和引擎,并向 rabbit amqqueue process 提供相关的接口以供调用。

消息四种状态

alpha: 消息内容(包括消息体、属性和 headers) 和消息索引都存储在内存中 。
beta: 消息内容保存在磁盘中,消息索引保存在内存中。
gamma: 消息内容保存在磁盘中,消息索引在磁盘和内存中都有 。
delta: 消息内容和索引都在磁盘中 。

消息如何分发

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)

如何确保消息正确发送

将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)

消费方确认接收消息

消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息

如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要根据bizId去重)
如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息

丢数据问题

生产者丢数据:

1、开启事务,但会降低吞吐量

2、 开启confirm模式,丢了可以进行重试

消费队列丢数据:

1、磁盘db持久化,持久化成功,返回个ack,将queue的持久化标识durable设置为true,则代表是一个持久的队列,deliveryMode=2

消费者丢数据:

自动确认,手动确认,不确认模式

实现延时队列

在rabbitmq中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列

场景:用户在系统中创建一个订单,如果超过时间用户没有进行支付,那么自动取消订单

消息如何被优先消费

可以设置消息优先级

如何保证消息顺序性

每个队列有一个消费者

否则只能通过全局ID实现(每条消息都一个msgId,关联的消息拥有一个parentMsgId。可以在消费端实现前一条消息未消费,不处理下一条消息;也可以在生产端实现前一条消息未处理完毕,不发布下一条消息)

多个消费者监听一个队列

轮询: 默认的策略,消费者轮流,平均地接收消息
公平分发: 根据消费者的能力来分发消息,给空闲的消费者发送更多消息