消息队列的消息处理
消息处理阶段
一共有三个阶段,分别是生产消息、存储消息和消费消息
如何保证消息不丢失
生产消息
生产者发送消息至 Broker ,需要处理 Broker 的响应,不论是同步还是异步发送消息,同步和异步回调都需要做好 try-catch ,妥善的处理响应,如果 Broker 返回写入失败等错误消息,需要重试发送。当多次发送失败需要作报警,日志记录等。
这样就能保证在生产消息阶段消息不会丢失。
存储消息
存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电这消息就没了,而生产者以为已经发送成功了。
如果 Broker 是集群部署,有多副本机制,即消息不仅仅要写入当前 Broker ,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。
消费消息
应该在消费者真正执行完业务逻辑之后,再发送给 Broker 消费成功,这才是真正的消费成功。
所以只要我们在消息业务逻辑处理完成之后再给 Broker 响应,那么消费阶段消息就不会丢失
注意
消息可靠性增强,性能就会下降,等待消息刷盘、多副本同步后返回都会影响性能。
要看业务逻辑
如何保证消息不重复
什么情况下会出现重复消息
1、业务方因为网络抖动,发过来的消息,就是重复的,会要求处理重复
2、为了保证消息可靠性,我们的基本需求是消息至少得发到Broker 上,那就得等 Broker 的响应,那么就可能存在 Broker 已经写入了,当时响应由于网络原因生产者没有收到,然后生产者又重发了一次,此时消息就重复了
3、消费者消费的时候,假设我们消费者拿到消息消费了,业务逻辑已经走完了,事务提交了,此时需要更新 Consumer offset 了,然后这个消费者挂了,另一个消费者顶上,此时 Consumer offset 还没更新,于是又拿到刚才那条消息,业务又被执行了一遍。于是消息又重复
正常业务而言消息重复是不可避免的,因此我们只能从另一个角度来解决重复消息的问题。
关键点就是幂等。既然我们不能防止重复消息的产生,那么我们只能在业务上处理重复消息所带来的影响
幂等消息处理
保证同样的参数多次调用同一个接口和调用一次产生的结果是一致的,看具体的业务细节
如何保证消息有序
全局有序
如果要保证消息的全局有序,首先只能由一个生产者往 Topic 发送消息,并且一个 Topic 内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的
但是这样就会导致消费进程变慢,一般来说我们不需要全局有序,只需要保证部分有序即可
部分有序
部分有序我们就可以将 Topic 内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率
如何保证消息不堆积
消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。
因此我们需要先定位消费慢的原因,如果是 bug 则处理 bug ,如果是因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。
假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加 Topic 的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者。
当然你消费者内部是单线程还是多线程消费那看具体场景。不过要注意上面提高的消息丢失的问题,如果你是将接受到的消息写入内存队列之后,然后就返回响应给 Broker ,然后多线程向内存队列消费消息,假设此时消费者宕机了,内存队列里面还未消费的消息也就丢了。