Java充电社
专辑
博文
联系我
本人继续续收门徒,亲手指导
RocketMQ专题
-> 应用篇-普通消息
1、MQ概述
2、RocketMQ概述
3、基本概念
4、系统架构
5、安装篇-单机安装
6、安装篇-dashboard控制台的安装与启动
7、安装篇-集群搭建理论
8、安装篇-磁盘阵列RAID(补充)
9、安装篇-集群搭建实践 (多主多从)
10、原理篇-消息的生产
11、原理篇-消息存储
12、原理篇-indexfile
13、原理篇-消息消费
14、原理篇-订阅关系的一致性
15、原理篇-offset管理
16、原理篇-消息的清理
17、原理篇-消费幂等
18、原理篇-消息堆积与消费延迟
19、应用篇-普通消息
20、应用篇-顺序消息
21、应用篇-延时消息
22、应用篇-事务消息
23、应用篇-批量消息
24、应用篇-消息过滤
25、应用篇-消息发送重试
26、应用篇-消息消费重试机制
27、应用篇-死信队列
28、高级功能汇总
29、消息发送案例汇总
30、RocketMQ整合SpringBoot
上一篇:原理篇-消息堆积与消费延迟
下一篇:应用篇-顺序消息
<div style="display:none"></div> ## 1、消息发送分类 Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。 ### 1.1、同步发送消息 同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/367/23d7dba5-0b81-4aac-967d-e056cc90bdd7.png) ### 1.2、异步发送消息 异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/367/97120bb1-d688-4fe5-993f-f903a79ce5c5.png) ### 1.3、单向发送消息 单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/367/1ecf6276-84d5-4891-ba41-72fd1f61ea02.png) ## 2、代码举例 ### 2.1、创建工程 创建一个Maven的Java工程rocketmq-test。 ### 2.2、导入依赖 导入rocketmq的client依赖。 ```xml <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency> </dependencies> ``` ### 2.3、定义同步消息发送生产者 ```java public class SyncProducer { public static void main(String[] args) throws Exception { // 创建一个producer,参数为Producer Group名称 DefaultMQProducer producer = new DefaultMQProducer("pg"); // 指定nameServer地址 producer.setNamesrvAddr("rocketmqOS:9876"); // 设置当发送失败时重试发送的次数,默认为2次 producer.setRetryTimesWhenSendFailed(3); // 设置发送超时时限为5s,默认3s producer.setSendMsgTimeout(5000); // 开启生产者 producer.start(); // 生产并发送100条消息 for (int i = 0; i < 100; i++) { byte[] body = ("Hi," + i).getBytes(); Message msg = new Message("someTopic", "someTag", body); // 为消息指定key msg.setKeys("key-" + i); // 发送消息 SendResult sendResult = producer.send(msg); System.out.println(sendResult); } // 关闭producer producer.shutdown(); } } ``` ```java // 消息发送的状态 public enum SendStatus { // 发送成功 SEND_OK, // 刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出现这种异常状态。异步刷盘不会出现 FLUSH_DISK_TIMEOUT, // Slave同步超时。当Broker集群设置的Master-Slave的复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现 FLUSH_SLAVE_TIMEOUT, // 没有可用的Slave。当Broker集群设置为Master-Slave的复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现 SLAVE_NOT_AVAILABLE, } ``` ### 2.4、定义异步消息发送生产者 ```java public class AsyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr("rocketmqOS:9876"); // 指定异步发送失败后不进行重试发送 producer.setRetryTimesWhenSendAsyncFailed(0); // 指定新创建的Topic的Queue数量为2,默认为4 producer.setDefaultTopicQueueNums(2); producer.start(); for (int i = 0; i < 100; i++) { byte[] body = ("Hi," + i).getBytes(); try { Message msg = new Message("myTopicA", "myTag", body); // 异步发送。指定回调 producer.send(msg, new SendCallback() { // 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } // end-for // sleep一会儿 // 由于采用的是异步发送,所以若这里不sleep, // 则消息还未发送就会将producer给关闭,报错 TimeUnit.SECONDS.sleep(3); producer.shutdown(); } } ``` ### 2.5、定义单向消息发送生产者 ```java public class OnewayProducer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr("rocketmqOS:9876"); producer.start(); for (int i = 0; i < 10; i++) { byte[] body = ("Hi," + i).getBytes(); Message msg = new Message("single", "someTag", body); // 单向发送 producer.sendOneway(msg); } p roducer.shutdown(); System.out.println("producer shutdown"); } } ``` ### 2.6、定义消息消费者 ```java public class SomeConsumer { public static void main(String[] args) throws MQClientException { // 定义一个pull消费者 // DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg"); // 定义一个push消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); // 指定nameServer consumer.setNamesrvAddr("rocketmqOS:9876"); // 指定从第一条消息开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 指定消费topic与tag consumer.subscribe("someTopic", "*"); // 指定采用“广播模式”进行消费,默认为“集群模式” // consumer.setMessageModel(MessageModel.BROADCASTING); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { // 一旦broker中有了其订阅的消息就会触发该方法的执行, // 其返回值为当前consumer消费的状态 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 逐条消费消息 for (MessageExt msg : msgs) { System.out.println(msg); } // 返回消费状态:消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 开启消费者消费 consumer.start(); System.out.println("Consumer Started"); } } ``` <a style="display:none" target="_blank" href="https://mp.weixin.qq.com/s/_S1DD2JADnXvpexxaBwLLg" style="color:red; font-size:20px; font-weight:bold">继续收门徒,亲手带,月薪 4W 以下的可以来找我</a> ## 最新资料 1. <a href="https://mp.weixin.qq.com/s?__biz=MzkzOTI3Nzc0Mg==&mid=2247484964&idx=2&sn=c81bce2f26015ee0f9632ddc6c67df03&scene=21#wechat_redirect" target="_blank">尚硅谷 Java 学科全套教程(总 207.77GB)</a> 2. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484192&idx=1&sn=505f2faaa4cc911f553850667749bcbb&scene=21#wechat_redirect" target="_blank">2021 最新版 Java 微服务学习线路图 + 视频</a> 3. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484573&idx=1&sn=7f3d83892186c16c57bc0b99f03f1ffd&scene=21#wechat_redirect" target="_blank">阿里技术大佬整理的《Spring 学习笔记.pdf》</a> 4. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484544&idx=2&sn=c1dfe907cfaa5b9ae8e66fc247ccbe84&scene=21#wechat_redirect" target="_blank">阿里大佬的《MySQL 学习笔记高清.pdf》</a> 5. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485167&idx=1&sn=48d75c8e93e748235a3547f34921dfb7&scene=21#wechat_redirect" target="_blank">2021 版 java 高并发常见面试题汇总.pdf</a> 6. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485664&idx=1&sn=435f9f515a8f881642820d7790ad20ce&scene=21#wechat_redirect" target="_blank">Idea 快捷键大全.pdf</a> ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/1/2883e86e-3eff-404a-8943-0066e5e2b454.png)
#custom-toc-container