消息队列的事务消息
分布式事务消息
2PC
2PC 就是二阶段提交,分别有协调者和参与者两个角色,二阶段分别是准备阶段和提交阶段。
准备阶段就是协调者向各参与者发送准备命令,这个阶段参与者除了事务的提交啥都做了,而提交阶段就是协调者看看各个参与者准备阶段都 o 不 ok,如果有 ok 那么就向各个参与者发送提交命令,如果有一个不 ok 那么就发送回滚命令。
这里的重点就是 2PC 只适用于数据库层面的事务,就是你想在数据库里面写一条数据同时又要上传一张图片,这两个操作 2PC 无法保证两个操作满足事务的约束。
而且 2PC 是一种强一致性的分布式事务,它是同步阻塞的,即在接收到提交或回滚命令之前,所有参与者都是互相等待,特别是执行完准备阶段的时候,此时的资源都是锁定的状态,假如有一个参与者卡了很久,其他参与者都得等它,产生长时间资源锁定状态下的阻塞。
总体而言效率低,并且存在单点故障问题,协调者是就是那个单点,并且在极端条件下存在数据不一致的风险,例如某个参与者未收到提交命令,此时宕机了,恢复之后数据是回滚的,而其他参与者其实都已经执行了提交事务的命令了。
TCC
TCC 能保证业务层面的事务,也就是说它不仅仅是数据库层面,上面的上传图片这种操作它也能做。
TCC 分为三个阶段 try - confirm - cancel,简单的说就是每个业务都需要有这三个方法,先都执行 try方法,这一阶段不会做真正的业务操作,只是先占个坑,比如打算加 10 个积分,那先在
预添加字段加上这 10 积分,这个时候用户账上的积分其实是没有增加的。
然后如果都 try 成功了那么就执行 confirm 方法,大家都来做真正的业务操作,如果有一个 try 失败了那么大家都执行 cancel 操作,来撤回刚才的修改。
可以看到 TCC 其实对业务的耦合性很大,因为业务上需要做一定的改造才能完成这三个方法,这其实就是 TCC 的缺点,并且 confirm 和 cancel 操作要注意幂等,因为到执行这两步的时候没有退路,是务必要完成的,因此需要有重试机制,所以需要保证方法幂等。
事务消息
事务消息主要是适用于异步更新的场景,并且对数据实时性要求不高的地方。它的目的是为了解决消息生产者与消息消费者的数据一致性问题
RocketMQ事务消息
RocketMQ 的事务消息也可以被认为是一个两阶段提交,简单的说就是在事务开始的时候会先发送一个半消息给 Broker。
半消息的意思就是这个消息此时对 Consumer 是不可见的,而且也不是存在真正要发送的队列中,而是一个特殊队列。
发送完半消息之后再执行本地事务,再根据本地事务的执行结果来决定是向 Broker 发送提交消息,还是发送回滚消息。
如果这一步发送提交或者回滚消息失败了怎么办,Broker会定时的向 Producer 来反查这个事务是否成功,具体的就是 Producer 需要暴露一个接口,通过这个接口 Broker 可以得知事务到底有没有执行成功,没成功就返回未知,因为有可能事务还在执行,会进行多次查询。
如果成功那么就将半消息恢复到正常要发送的队列中,这样消费者就可以消费这条消息了
看一下代码:
可以看到使用起来还是很简便直观的,无非就是多加个反查事务结果的方法,然后把本地事务执行的过程写在 TransationListener 里面。
整体流程如下:
分析一下源码:
流程也就是我们上面分析的,将消息塞入一些属性,标明此时这个消息还是半消息,然后发送至
Broker,然后执行本地事务,然后将本地事务的执行状态发送给 Broker
Broker如何处理:
在 Broker 的 SendMessageProcessor#sendMessage 中会处理这个半消息请求, sendMessage 中查到接受来的消息的属性里面MessageConst.PROPERTY_TRANSACTION_PREPARED 是 true ,那么可以得知这个消息是事务消息,然后再判断一下这条消息是否超过最大消费次数,是否要延迟,Broker 是否接受事务消息等操作后,将这条消息真正的 topic 和队列存入属性中,然后重置消息的 topic 为 RMQ_SYS_TRANS_HALF_TOPIC ,并且队列是 0 的队列中,使得消费者无法读取这个消息。
其实延时消息也是这么实现的,最终将换了皮的消息入盘
Broker 处理提交或者回滚消息的处理方法是 EndTransactionProcessor#processRequest
可以看到,如果是提交事务就是把皮再换回来写入真正的topic所属的队列中,供消费者消费,如果是回滚则是将半消息记录到一个 half_op 主题下,到时候后台服务扫描半消息的时候就依据其来判断这个消息已经处理过了。
那个后台服务就是 TransactionalMessageCheckService 服务,它会定时的扫描半消息队列,去请
求反查接口看看事务成功了没,具体执行的就是 TransactionalMessageServiceImpl#check 方法
首先取半消息 topic 即 RMQ_SYS_TRANS_HALF_TOPIC 下的所有队列,如果还记得上面内容的话,就知道半消息写入的队列是 id 是 0 的这个队列,然后取出这个队列对应的 half_op 主题下的队列,即RMQ_SYS_TRANS_OP_HALF_TOPIC 主题下的队列。
这个 half_op 主要是为了记录这个事务消息已经被处理过,也就是说已经得知此事务消息是提交的还是回滚的消息会被记录在 half_op 中。
然后调用 fillOpRemoveMap 方法,从 half_op 取一批已经处理过的消息来去重,将那些没有记录在
half_op 里面的半消息调用 putBackHalfMsgQueue 又写入了 commitlog 中,然后发送事务反查请
求,这个反查请求也是 oneWay,即不会等待响应。当然此时的半消息队列的消费 offset 也会推进。
然后producer中的 ClientRemotingProcessor#processRequest 会处理这个请求,会把任务扔到
TransactionMQProducer 的线程池中进行,最终会调用上面我们发消息时候定义的
checkLocalTransactionState 方法,然后将事务状态发送给 Broker,也是用 oneWay 的方式
为什么已经有一个半消息队列,坏会有一个half_op
首先 RocketMQ 的设计就是顺序追加写入,所以说不会更改已经入盘的消息,那事务消息又需要更新反查的次数,超过一定反查失败就判定事务回滚。
因此每一次要反查的时候就将以前的半消息再入盘一次,并且往前推进消费进度。而 half_op 又会记录每一次反查的结果,不论是提交还是回滚都会记录,因此下一次还循环到处理此半消息的时候,可以从half_op 得知此事务已经结束了,因此就被过滤掉不需要处理了。
如果得到的反查的结果是 UNKNOW,那 half_op 中也不会记录此结果,因此还能再次反查,并且更新反查次数。
Broker处理流程:
Kafka事务消息
Kafka 的事务消息和 RocketMQ 的事务消息又不一样了,RocketMQ 解决的是本地事务的执行和发消息这两个动作满足事务的约束。
而 Kafka 事务消息则是用在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败
Kafka 的事务基本上是配合其幂等机制来实现 Exactly Once 语义的,所以说 Kafka 的事务消息不是我们想的那种事务消息,RocketMQ 的才是。
和我们在业务上实现的一样通过一个唯一 Id, 然后记录下来,如果已经记录过了就不写入,这样来保证恰好一次。
所以说 Kafka 实现的是在特定场景下的恰好一次,不是我们所想的利用 Kafka 来发送消息,那么这条消息只会恰巧被消费一次
Kafka 的事务有事务协调者角色,事务协调者其实就是 Broker 的一部分。
在开始事务的时候,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中,然后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不一样,Kafka 会像对待正常消息一样处理这些事务消息,由消费端来过滤这个消息。
然后发送完毕之后生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了。
比 RocketMQ 更好的事务消息实现
在 RocketMQ 中,事务消息的实现方案是先发半消息(半消息对消费者不可见),待半消息发送成功之后,才能执行本地事务,等本地事务执行成功之后,再向 Broker 发送请求将半消息转成正常消息,这样消费者就可以消费此消息。
这种顺序等于先得成功写入 mq,然后再写入数据库,这样的模式会出现一个问题:即 mq 集群挂了,事务就无法继续进行了,等于整个应用无法正常执行了
本地消息事务表
先看一下本地消息事务表的原理
本地消息就是利用了关系型数据库的事务能力,会在数据库中存放一张本地事务消息表,在进行本地事务操作中加入了本地消息表的插入,即将业务的执行和将消息放入到消息表中的操作放在同一个事务中提交。
这样本地事务执行成功的话,消息肯定也插入成功,然后再调用其他服务,如果其他服务调用成功就修改这条本地消息的状态。
如果失败也不要紧,会有一个后台线程扫描,发现这些状态的消息,会一直调用相应的服务,一般会设置重试的次数,如果一直不行则特殊记录,待人工介入处理
qmq消息事务
qmq的核心思想就是本地事务表
利用关系型数据库的事务能力,将业务的写入和消息表的写入融在一个事务中,这样业务成功则消息表肯定写入成功。
然后在事务提交之后,立刻发送事务消息,如果发送成功,则删除本地消息表中的记录
如果消息发送失败,也就是比如 mq 集群挂了,并不会影响事务的执行,业务的执行和事务消息的插入都已经成功了,那此时待消息已经安安静静的在消息库里等着,后台能会有一个补偿任务,会将这些消息捞出来重新发送,直到发送成功。