Java充电社
专辑
博文
联系我
本人继续续收门徒,亲手指导
RocketMQ专题
-> 应用篇-事务消息
1、MQ概述
2、RocketMQ概述
3、基本概念
4、系统架构
5、安装篇-单机安装
6、安装篇-dashboard控制台的安装与启动
7、安装篇-集群搭建理论
8、安装篇-磁盘阵列RAID(补充)
9、安装篇-集群搭建实践 (多主多从)
10、原理篇-消息的生产
11、原理篇-消息存储
12、原理篇-indexfile
13、原理篇-消息消费
14、原理篇-订阅关系的一致性
15、原理篇-offset管理
16、原理篇-消息的清理
17、原理篇-消费幂等
18、原理篇-消息堆积与消费延迟
19、应用篇-普通消息
20、应用篇-顺序消息
21、应用篇-延时消息
22、应用篇-事务消息
23、应用篇-批量消息
24、应用篇-消息过滤
25、应用篇-消息发送重试
26、应用篇-消息消费重试机制
27、应用篇-死信队列
28、高级功能汇总
29、消息发送案例汇总
30、RocketMQ整合SpringBoot
上一篇:应用篇-延时消息
下一篇:应用篇-批量消息
<div style="display:none"></div> ## 1、问题引入 这里的一个需求场景是:工行用户A向建行用户B转账1万元。 我们可以使用同步消息来处理该需求场景: ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/370/94d59b30-b6d3-4243-8944-f52ebf60683f.png) 1. 工行系统发送一个给B增款1万元的同步消息M给Broker 2. 消息被Broker成功接收后,向工行系统发送成功ACK 3. 工行系统收到成功ACK后从用户A中扣款1万元 4. 建行系统从Broker中获取到消息M 5. 建行系统消费消息M,即向用户B中增加1万元 > 这其中是有问题的:若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了1万元。出现了数据不一致问题。 ## 2、解决思路 解决思路是,让第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即使用事务消息。这里要使用分布式事务解决方案。 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/370/5f43d972-da7c-4037-a1ef-248d8f748d4f.png) 使用事务消息来处理该需求场景: 1. 事务管理器TM向事务协调器TC发起指令,开启全局事务 2. 工行系统发一个给B增款1万元的事务消息M给TC 3. TC会向Broker发送半事务消息prepareHalf,将消息M预提交到Broker。此时的建行系统是看不到Broker中的消息M的 4. Broker会将预提交执行结果Report给TC。 5. 如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会调用工行系统的回调操作,去完成工行用户A的预扣款1万元的操作 6. 工行系统会向TC发送预扣款执行结果,即本地事务的执行状态 7. TC收到预扣款执行结果后,会将结果上报给TM >预扣款执行结果存在三种可能性: > >```java >// 描述本地事务执行状态 >public enum LocalTransactionState { > COMMIT_MESSAGE, // 本地事务执行成功 > ROLLBACK_MESSAGE, // 本地事务执行失败 > UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果 >} >``` 8. TM会根据上报结果向TC发出不同的确认指令 - 若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令 - 若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令 - 若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态回查操作。回查操作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback 9. TC在接收到指令后会向Broker与工行系统发出确认指令 - TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch Commit指令。此时Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款操作才真正被确认 - TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时Broker中的消息M将被撤销;工行用户A中的扣款操作将被回滚 > 以上方案就是为了确保 消息投递与 扣款操作 能够在一个事务中,要成功都成功,有一个失败,则全部回滚。 > > 以上方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的消息预提交与预扣款操作间是同步的。 ## 3、基础 ### 3.1、分布式事务 对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。 ### 3.2、事务消息 RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式。 ### 3.3、半事务消息 暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。 ### 3.4、本地事务状态 Producer回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令。 ```java // 描述本地事务执行状态 public enum LocalTransactionState { COMMIT_MESSAGE, // 本地事务执行成功 ROLLBACK_MESSAGE, // 本地事务执行失败 UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果 } ``` ### 3.5、消息回查 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/370/027861cf-3ce5-4e1c-aec5-716118b2ec1f.png) 消息回查,即重新查询本地事务的执行状态。本例就是重新到DB中查看预扣款操作是否执行成功。 > 注意,消息回查不是重新执行回调操作。回调操作是进行预扣款操作,而消息回查则是查看预扣款操作执行的结果。 > > 引发消息回查的原因最常见的有两个: > > 1)回调操作返回UNKNWON > > 2)TC没有接收到TM的最终全局事务确认指令 ### 3.5、RocketMQ中的消息回查设置 关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如: - transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒 - transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。 - transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒 ## 4、XA模式三剑客 ### 4.1、XA协议 XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。 XA模式中有三个重要组件:TC、TM、RM。 ### 4.2、TC Transaction Coordinator,事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。 > RocketMQ中Broker充当着TC。 ### 4.3、TM Transaction Manager,事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。 > RocketMQ中事务消息的Producer充当着TM。 ### 4.4、RM Resource Manager,资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。 > RocketMQ中事务消息的Producer及Broker均是RM。 ## 5、XA模式架构 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/370/daa3d124-e831-4553-89cb-127db62a7bb7.png) XA模式是一个典型的2PC,其执行原理如下: 1. TM向TC发起指令,开启一个全局事务。 2. 根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。 3. 各个RM在接收到指令后会在进行本地事务预执行。 4. RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。 5. TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指令。 - 若所有结果都是成功响应,则向TC发送Global Commit指令。 - 只要有结果是失败响应,则向TC发送Global Rollback指令。 6. TC在接收到指令后再次向RM发送确认指令。 > 事务消息方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的消息预提交与预扣款操作间是同步的。 ## 6、注意 - 事务消息不支持延时消息 - 对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的情况) ## 7、代码举例 ### 7.1、定义工行事务监听器 ```java import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; public class ICBCTransactionListener implements TransactionListener { // 回调操作方法 // 消息预提交成功就会触发该方法的执行,用于完成本地事务 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("预提交消息成功:" + msg); // 假设接收到TAGA的消息就表示扣款操作成功,TAGB的消息表示扣款失败, // TAGC表示扣款结果不清楚,需要执行消息回查 if (StringUtils.equals("TAGA", msg.getTags())) { return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.equals("TAGB", msg.getTags())) { return LocalTransactionState.ROLLBACK_MESSAGE; } else if (StringUtils.equals("TAGC", msg.getTags())) { return LocalTransactionState.UNKNOW; } return LocalTransactionState.UNKNOW; } // 消息回查方法 // 引发消息回查的原因最常见的有两个: // 1)回调操作返回UNKNWON // 2)TC没有接收到TM的最终全局事务确认指令 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("执行消息回查" + msg.getTags()); return LocalTransactionState.COMMIT_MESSAGE; } } ``` ### 7.2、定义事物消息生产者 ```java import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.concurrent.*; public class TransactionProducer { public static void main(String[] args) throws Exception { TransactionMQProducer producer = new TransactionMQProducer("tpg"); producer.setNamesrvAddr("192.168.216.128:9876"); /** * 定义一个线程池 * @param corePoolSize 线程池中核心线程数量 * @param maximumPoolSize 线程池中最多线程数 * @param keepAliveTime 这是一个时间,当线程池中线程数量大于核心线程数量是多余空闲线程的存活时长 * @param unit 时间单位 * @param workQueue 临时存放任务的队列,其参数就是队列的长度 * @param threadFactory 线程工厂 */ ExecutorService executorService = new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); // 为生产者指定一个线程池 producer.setExecutorService(executorService); // 为生产者添加事务监听器 producer.setTransactionListener(new ICBCTransactionListener()); producer.start(); String[] tags = {"TAGA", "TAGB", "TAGC"}; for (int i = 0; i < 3; i++) { byte[] body = ("Hi," + i).getBytes(); Message msg = new Message("TTopic", tags[i], body); // 发送事务消息 // 第二个参数用于指定在执行本地事务时要使用的业务参数 SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println("发送结果为:" + sendResult.getSendStatus()); } } } ``` ### 7.3、定义消费者 直接使用普通消息的SomeConsumer作为消费者即可 ```javascript import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class SomeConsumer { public static void main(String[] args) throws MQClientException { // 定义一个pull消费者 // DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg"); // 定义一个push消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); // 指定nameServer consumer.setNamesrvAddr("rocketmqOS:9876"); // 指定从第一条消息开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET ); // 指定消费topic与tag consumer.subscribe("TTopic", "*"); // 指定采用“广播模式”进行消费,默认为“集群模式” // consumer.setMessageModel(MessageModel.BROADCASTING); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { // 一旦broker中有了其订阅的消息就会触发该方法的执行,其返回值为当前consumer消费的状态 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 逐条消费消息 for (MessageExt msg : msgs) { System.out.println(msg); } //返回消费状态:消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 开启消费者消费 consumer.start(); System.out.println("Consumer Started"); } } ``` <a style="display:none" target="_blank" href="https://mp.weixin.qq.com/s/_S1DD2JADnXvpexxaBwLLg" style="color:red; font-size:20px; font-weight:bold">继续收门徒,亲手带,月薪 4W 以下的可以来找我</a> ## 最新资料 1. <a href="https://mp.weixin.qq.com/s?__biz=MzkzOTI3Nzc0Mg==&mid=2247484964&idx=2&sn=c81bce2f26015ee0f9632ddc6c67df03&scene=21#wechat_redirect" target="_blank">尚硅谷 Java 学科全套教程(总 207.77GB)</a> 2. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484192&idx=1&sn=505f2faaa4cc911f553850667749bcbb&scene=21#wechat_redirect" target="_blank">2021 最新版 Java 微服务学习线路图 + 视频</a> 3. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484573&idx=1&sn=7f3d83892186c16c57bc0b99f03f1ffd&scene=21#wechat_redirect" target="_blank">阿里技术大佬整理的《Spring 学习笔记.pdf》</a> 4. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484544&idx=2&sn=c1dfe907cfaa5b9ae8e66fc247ccbe84&scene=21#wechat_redirect" target="_blank">阿里大佬的《MySQL 学习笔记高清.pdf》</a> 5. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485167&idx=1&sn=48d75c8e93e748235a3547f34921dfb7&scene=21#wechat_redirect" target="_blank">2021 版 java 高并发常见面试题汇总.pdf</a> 6. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485664&idx=1&sn=435f9f515a8f881642820d7790ad20ce&scene=21#wechat_redirect" target="_blank">Idea 快捷键大全.pdf</a> ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/1/2883e86e-3eff-404a-8943-0066e5e2b454.png)
#custom-toc-container