Java充电社
专辑
博文
联系我
本人继续续收门徒,亲手指导
RocketMQ专题
-> 应用篇-延时消息
1、MQ概述
2、RocketMQ概述
3、基本概念
4、系统架构
5、安装篇-单机安装
6、安装篇-dashboard控制台的安装与启动
7、安装篇-集群搭建理论
8、安装篇-磁盘阵列RAID(补充)
9、安装篇-集群搭建实践 (多主多从)
10、原理篇-消息的生产
11、原理篇-消息存储
12、原理篇-indexfile
13、原理篇-消息消费
14、原理篇-订阅关系的一致性
15、原理篇-offset管理
16、原理篇-消息的清理
17、原理篇-消费幂等
18、原理篇-消息堆积与消费延迟
19、应用篇-普通消息
20、应用篇-顺序消息
21、应用篇-延时消息
22、应用篇-事务消息
23、应用篇-批量消息
24、应用篇-消息过滤
25、应用篇-消息发送重试
26、应用篇-消息消费重试机制
27、应用篇-死信队列
28、高级功能汇总
29、消息发送案例汇总
30、RocketMQ整合SpringBoot
上一篇:应用篇-顺序消息
下一篇:应用篇-事务消息
<div style="display:none"></div> ## 1、什么是延时消息 当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。 采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。 > 在电商平台中,订单创建时会发送一条延迟消息。这条消息将会在30分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完成,则取消订单,将商品再次放回到库存;如果完成支付,则忽略。 > > 在12306平台中,车票预订成功后就会发送一条延迟消息。这条消息将会在45分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完成,则取消预订,将车票再次放回到票池;如果完成支付,则忽略。 ## 2、延时等级 延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中: ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/369/50150fb4-a326-456b-9b6c-85419da1cdf4.png) 即,若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。 当然,如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了1天这个等级1d)。配置文件在RocketMQ安装目录下的conf目录中。 ```properties messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d ``` ## 3、延时消息实现原理 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/369/b00c1114-f5d7-4563-9f65-10088533d013.png) 具体实现方案是: ### 3.1、修改消息 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/369/119368d6-c494-487f-adde-16a9673467bd.png) Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相应的consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有则需要经历一个复杂的过程: - 修改消息的Topic为SCHEDULE_TOPIC_XXXX - 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件(如果没有这些目录与文件的话)。 > 延迟等级delayLevel与queueId的对应关系为queueId = delayLevel -1 > > 需要注意,在创建queueId目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕,而是用到哪个延迟等级创建哪个目录 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/369/42109cad-0d43-458c-86a4-09c1f8c35258.png) - 修改消息索引单元内容。索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到commitlog中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息被发送到Broker时的时间戳。 - 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中 > SCHEDULE_TOPIC_XXXX目录中各个延时等级Queue中的消息是如何排序的? > > 是按照消息投递时间排序的。一个Broker中同一等级的所有延时消息会被写入到consumequeue目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等级时间是相同的。那么投递时间就取决于消息存储时间了。即按照消息被发送到Broker的时间进行排序的。 ### 3.2、投递延时消息 Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到⽬标Topic中。不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标Topic中。 > ScheuleMessageService在Broker启动时,会创建并启动一个定时器TImer,用于执行相应的定时任务。系统会根据延时等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟等级消息的消费与投递。每个TimerTask都会检测相应Queue队列的第一条消息是否到期。若第一条消息未到期,则后面的所有消息更不会到期(消息是按照投递时间排序的);若第一条消息到期了,则将该消息投递到目标Topic,即消费该消息。 ### 3.3、将消息重新写入commitlog 延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。 > 这其实就是一次普通消息发送。只不过这次的消息Producer是延迟消息服务类ScheuleMessageService。 ## 4、代码举例 ### 4.1、定义DelayProducer类 ```java public class DelayProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr("rocketmqOS:9876"); producer.start(); for (int i = 0; i < 10; i++) { byte[] body = ("Hi," + i).getBytes(); Message msg = new Message("TopicB", "someTag", body); // 指定消息延迟等级为3级,即延迟10s msg.setDelayTimeLevel(3); SendResult sendResult = producer.send(msg); // 输出消息被发送的时间 System.out.print(new SimpleDateFormat("mm:ss").format(new Date())); System.out.println(" ," + sendResult); } producer.shutdown(); } } ``` ### 4.2、定义OtherConsumer类 ```java public class OtherConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); consumer.setNamesrvAddr("rocketmqOS:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicB", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { // 输出消息被消费的时间 System.out.print(new SimpleDateFormat("mm:ss").format(new Date())); System.out.println(" ," + msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started"); } } ``` <a style="display:none" target="_blank" href="https://mp.weixin.qq.com/s/_S1DD2JADnXvpexxaBwLLg" style="color:red; font-size:20px; font-weight:bold">继续收门徒,亲手带,月薪 4W 以下的可以来找我</a> ## 最新资料 1. <a href="https://mp.weixin.qq.com/s?__biz=MzkzOTI3Nzc0Mg==&mid=2247484964&idx=2&sn=c81bce2f26015ee0f9632ddc6c67df03&scene=21#wechat_redirect" target="_blank">尚硅谷 Java 学科全套教程(总 207.77GB)</a> 2. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484192&idx=1&sn=505f2faaa4cc911f553850667749bcbb&scene=21#wechat_redirect" target="_blank">2021 最新版 Java 微服务学习线路图 + 视频</a> 3. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484573&idx=1&sn=7f3d83892186c16c57bc0b99f03f1ffd&scene=21#wechat_redirect" target="_blank">阿里技术大佬整理的《Spring 学习笔记.pdf》</a> 4. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484544&idx=2&sn=c1dfe907cfaa5b9ae8e66fc247ccbe84&scene=21#wechat_redirect" target="_blank">阿里大佬的《MySQL 学习笔记高清.pdf》</a> 5. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485167&idx=1&sn=48d75c8e93e748235a3547f34921dfb7&scene=21#wechat_redirect" target="_blank">2021 版 java 高并发常见面试题汇总.pdf</a> 6. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485664&idx=1&sn=435f9f515a8f881642820d7790ad20ce&scene=21#wechat_redirect" target="_blank">Idea 快捷键大全.pdf</a> ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/1/2883e86e-3eff-404a-8943-0066e5e2b454.png)
#custom-toc-container