消息队列
消息队列的概念:
应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。
消息队列的特点
1.采用异步处理模式
发送者可以发送一个消息而无须等待响应,接收者无需对消息发送者做出同步回应
2.发送者和接收者不必了解对方、只需要确认消息;
3.发送者和接收者不必同时在线。
应用场景
1. 支付完成,统修改订单支付状态
2.短信通知、终端状态推送、App 推送、用户注册
常用的消息中间件
1. ActiveMQ:曝光率最高,但是可能会丢消息。
2. ZeroMQ:不支持消息持久化和崩溃恢复。
3. Kafka:定位是日志消息队列, 吞吐量最大。
4. RabbitMQ: 可靠。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
RabbitMQ(3.7)
是一个开源的AMQP(高级消息队列协议)实现,服务器端用Erlang语言编写
官方文档:
https://www.rabbitmq.com/getstarted.html
参考文档:
https://www.sojson.com/blog/48.html
AMQP介绍:
https://blog.csdn.net/qq_33314107/article/details/80172042
RabbitMQ中的组成部分
Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列。
Queue:消息队列,存储消息的队列。
Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
Consumer:消息消费者。消费队列中存储的消息。
Exchange的类型
fanout:
把消息路由到与该交换机绑定的所有队列上。用于发布订阅
direct:
把消息路由到那些binding key与routing key完全匹配的Queue中。用于关键字发送
topic:
把消息路由到那些binding key与routing key按规则完全匹配的Queue中。用于模糊匹配
headers:
根据发送的消息内容中的headers属性进行匹配
有两种模式:
全部匹配
部分匹配
RabbitMQ分发(消息)模式:
1. 简单模式:
包含一个生产者、一个消费者和一个队列。
生产者向队列里发送消息,消费者从队列中获取消息并消费。
2. 工作模式:
包含一个生产者、两个消费者和一个队列
消费者取数据的类型分为:
1. 平均分配
每个消费者处理相同个数的任务
2. 循环调度
每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。
设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
3.发布订阅模式:
一个生产者、两个消费者、两个队列和一个交换机。
两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,
所有消费者接收并消费消息。
交换类型 exchange_type='fanout’
4. 路由模式(关键字):
包含一个生产者、两个消费者、两个队列和一个交换机。
两个消费者同时绑定到不同的队列上去,两个队列通过路由键绑定到交换机上去,生产者发送消息到交换机,
交换机通过路由键转发到不同队列,队列绑定的消费者接收并消费消息。
交换类型:exchange_type='direct'
5.通配符模式(模糊匹配):
通过路由键匹配规则转发到不同队列
特殊匹配符号:
*:只能匹配一个单词;
#:可以匹配零个或多个单词
交换类型:exchange_type='topic'
6.RPC:双向通道
场景:客户端发送消息给服务端,服务端处理完后将结果返回,客户端获取到数据
处理:客户端往队列1送完数据后,建个新的队列2,服务端在队列1接收数据后,将处理后将结果放到队列2返回,客户端在队列2处接收
对于RPC请求,客户端发送带有两个属性的消息: reply_to,设置为回调队列,correlation_id,设置为每个请求的唯一值。
RPC的实现机制
作用:
消费者能了解到任务处理成功或者失败
实现:
1. 客户端发送请求(消息)时,在消息的属性中设置两个值replyTo (Queue名称,告诉服务器处理消息发送到这个Queue 中)和correlationId (此次请求的标识号)
2. 服务器端收到消息并处理
3. 服务器端处理完消息后,将生成一条应答消息到replyTo 指定的Queue ,同时带上correlationId 属性
4. 客户端之前已订阅replyTo 指定的Queue ,从中收到服务器的应答消息后,根据其中的correlationId 属性分析哪条请求被执行了,
根据执行结果进行后续业务处理
AMQP
AMQP组成
Producer: 消息生产者,就是投递消息的程序
Consumer: 消息消费者,就是接受消息的程序
Broker: 简单来说就是消息队列服务器实体
Connection:连接,应用程序与Broker的网络连接
Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
Message:服务器和应用程序之间传送的数据
Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
Queue: 消息队列,保存消息并将它们转发给消费者
Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
VHost: 虚拟 broker 一个Virtual Host里面可以有若干个Exchange和Queue
为什么使用AMQP
在分布式的系统中,子系统如果使用socket连接进行通讯,有很多问题需要解决。比如:
1.信息的发送者和接受者如何维持这个连接,如果一方中断,这期间的数据如何防止丢失?
2.如何降低发送者和接受者的耦合度?
3.如何让优先级高的接受者先接到数据?
4.如何将信息发送到相关的接收者,如果接受者订阅了不同的数据,如何正确的分发到接受者?
5.如何保证接受者接到了完整,正确或是有序的数据?
AMQP解决了这些问题
AMQP通信流程:
1. 建立连接Connection。由producer和consumer创建连接,连接到broker的物理节点上。
2. 建立消息Channel。
Channel是建立在Connection之上的,一个Connection可以建立多个Channel。
producer连接Virtual Host建立Channel,Consumer连接到相应的queue上建立Channel。
3. 发送消息
由Producer发送消息到Broker中的exchange中。
4. 路由转发
exchange收到消息后,根据一定的路由策略,将消息转发到相应的queue中去。
5. 消息接收
Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。
6. 消息确认
当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。
Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;
如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性
Question
参考文档: https://juejin.cn/post/6844904125935665160#heading-14
https://juejin.cn/post/6991610913354678285#heading-31
https://juejin.cn/post/6844904192146931720#heading-28
消息的路由(routing/binding)
消息创建时设定一个路由键(routing key), 通过队列路由键(bind key),把队列绑定到交换器上。
RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)
消息的传输(Channel)
由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。
RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。
保证消息发送至MQ(发送方确认模式)
1. 将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。
2. 一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),
信道会发送一个确认给生产者(包含消息唯一 ID)
3. 如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息
4. 发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。
当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息
保证消息被消费(接收方确认机制)
丢失消息的原因:
消费者默认采用自动确认模式,消息在发送后立即被认为是发送成功。如果这时处理消息失败,就会丢失该消息;
手动确认:
消费者接收每一条消息后都必须进行确认,只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。
无超时机制, 只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。
特殊情况:
1.如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分
发给下一个订阅的消费者。
2.如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。
保证消息不被重复消费
生产时消息重复
原因:
生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ已经接收到了消息。这时候生产者就会重新发送一遍这条消息。
解决方案:
MQ内部针对每条生产者发送的消息生成一个inner-msg-id,这个值全局唯一,且由MQ生成,与业务无关
消费时消息重复
原因:
消费者消费成功后,再给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。
解决方案:
消息体中,必须有一个与业务相关的唯一id(,如支付ID、订单ID、帖子ID等)
由发送方放到消息体中,消费方对ID进行判重,保证相同id的消息只有1条消息被消费
保证消息的顺序性
乱序的原因:
业务上产生三条消息,分别是对数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
解决消费顺序的问题:
将需要保证顺序的数据发到一个队列中,该队列只有一个消费者消费
RabbitMQ持久化
1. 交换机的持久化
2. 队列的持久化
channel.queue_declare(queue = 'hello',durable = True)
3. 消息的持久化
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
保证RabbitMQ消息的可靠传输
生产者丢失消息:
发送方确认模式
消息列表丢失消息:
队列的持久化
消息的持久化
消费者丢失消息:
消费者手动确认