Java充电社
专辑
博文
联系我
本人继续续收门徒,亲手指导
RocketMQ第23篇:应用篇-批量消息
相关专辑:
RocketMQ专题
<div style="display:none"></div> ## 1、批量发送消息 ### 1.1、发送限制 生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。不过需要注意以下几点: - 批量发送的消息必须具有相同的Topic - 批量发送的消息必须具有相同的刷盘策略 - 批量发送的消息不能是延时消息与事务消息 ### 1.2、批量发送大小 默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案: - 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送 - 方案二:在Producer端与Broker端修改属性 - Producer端需要在发送之前设置Producer的maxMessageSize属性 - Broker端需要修改其加载的配置文件中的maxMessageSize属性 ### 1.3、生产者发送的消息大小 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/371/ddaf82b4-a945-40f8-8dfd-cdbe1753d676.png) 生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息Body、消息日志(占20字节),及用于描述消息的一堆属性key-value。这些属性中包含例如生产者地址、生产时间、要发送的QueueId等。最终写入到Broker中消息单元中的数据都是来自于这些属性。 ## 2、批量消费消息 ### 2.1、修改批量属性 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/371/403b2096-46c0-4bfc-b423-d26b548aa763.png) Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息,则可以通过修改Consumer的consumeMessageBatchMaxSize属性来指定。不过,该值不能超过32。因为默认情况下消费者每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,则可通过修改Consumer的 pullBatchSize属性来指定。 ### 2.2、存在的问题 Consumer的pullBatchSize属性与consumeMessageBatchMaxSize属性是否设置的越大越好?当然不是。 - pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输出现问题的可能性就越高。若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉取。 - consumeMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果。因为consumeMessageBatchMaxSize指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理。 ## 3、代码举例 该批量发送的需求是,不修改最大发送4M的默认值,但要防止发送的批量消息超出4M的限制。 ### 3.1、定义消息列表分割器 ```java import org.apache.rocketmq.common.message.Message; import java.util.Iterator; import java.util.List; import java.util.Map; // 消息列表分割器:其只会处理每条消息的大小不超4M的情况。 // 若存在某条消息,其本身大小大于4M,这个分割器无法处理, // 其直接将这条消息构成一个子列表返回。并没有再进行分割 public class MessageListSplitter implements Iterator<List<Message>> { // 指定极限值为4M private final int SIZE_LIMIT = 4 * 1024 * 1024; // 存放所有要发送的消息 private final List<Message> messages; // 要进行批量发送消息的小集合起始索引 private int currIndex; public MessageListSplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { // 判断当前开始遍历的消息索引要小于消息总数 return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; // 记录当前要发送的这一小批次消息列表的大小 int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { // 获取当前遍历的消息 Message message = messages.get(nextIndex); // 统计当前遍历的message的大小 int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; // 判断当前消息本身是否大于4M if (tmpSize > SIZE_LIMIT) { if (nextIndex - currIndex == 0) { nextIndex++; } break; } if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } // end-for // 获取当前messages列表的子集合[currIndex, nextIndex) List<Message> subList = messages.subList(currIndex, nextIndex); // 下次遍历的开始索引 currIndex = nextIndex; return subList; } } ``` ### 3.2、定义批量消息生产者 ```java import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.List; public class BatchProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr("rocketmqOS:9876"); // 指定要发送的消息的最大大小,默认是4M // 不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的 // maxMessageSize属性 // producer.setMaxMessageSize(8 * 1024 * 1024); producer.start(); // 定义要发送的消息集合 List<Message> messages = new ArrayList<>(); for (int i = 0; i < 100; i++) { byte[] body = ("Hi," + i).getBytes(); Message msg = new Message("someTopic", "someTag", body); messages.add(msg); } // 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表 MessageListSplitter splitter = new MessageListSplitter(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } } ``` ### 3.3、定义批量消息消费者 ```java import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class BatchConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); consumer.setNamesrvAddr("rocketmqOS:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET ); consumer.subscribe("someTopicA", "*"); // 指定每次可以消费10条消息,默认为1 consumer.setConsumeMessageBatchMaxSize(10); // 指定每次可以从Broker拉取40条消息,默认为32 consumer.setPullBatchSize(40); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg); } // 消费成功的返回结果 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费异常时的返回结果 // return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); System.out.println("Consumer Started"); } } ``` <a style="display:none" target="_blank" href="https://mp.weixin.qq.com/s/_S1DD2JADnXvpexxaBwLLg" style="color:red; font-size:20px; font-weight:bold">继续收门徒,亲手带,月薪 4W 以下的可以来找我</a> ## 最新资料 1. <a href="https://mp.weixin.qq.com/s?__biz=MzkzOTI3Nzc0Mg==&mid=2247484964&idx=2&sn=c81bce2f26015ee0f9632ddc6c67df03&scene=21#wechat_redirect" target="_blank">尚硅谷 Java 学科全套教程(总 207.77GB)</a> 2. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484192&idx=1&sn=505f2faaa4cc911f553850667749bcbb&scene=21#wechat_redirect" target="_blank">2021 最新版 Java 微服务学习线路图 + 视频</a> 3. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484573&idx=1&sn=7f3d83892186c16c57bc0b99f03f1ffd&scene=21#wechat_redirect" target="_blank">阿里技术大佬整理的《Spring 学习笔记.pdf》</a> 4. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484544&idx=2&sn=c1dfe907cfaa5b9ae8e66fc247ccbe84&scene=21#wechat_redirect" target="_blank">阿里大佬的《MySQL 学习笔记高清.pdf》</a> 5. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485167&idx=1&sn=48d75c8e93e748235a3547f34921dfb7&scene=21#wechat_redirect" target="_blank">2021 版 java 高并发常见面试题汇总.pdf</a> 6. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485664&idx=1&sn=435f9f515a8f881642820d7790ad20ce&scene=21#wechat_redirect" target="_blank">Idea 快捷键大全.pdf</a> ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/1/2883e86e-3eff-404a-8943-0066e5e2b454.png)
相关专辑:
RocketMQ专题