Java充电社
专辑
博文
联系我
本人继续续收门徒,亲手指导
如何实现一个通用的延迟队列?
相关专辑:
Java高并发教程
<div style="display:none"></div> 电商大家都用过吧,下单后若未支付,通常都有一段支付倒计时,比如15分钟,若时间到了之后,还未支付的,订单将被关闭,库存将被释放。 这种业务就需要用到延迟队列的功能,将任务丢到延迟队列、设置一个延迟时间、回调函数,到了时间之后,延迟队列将回调指定的函数消费指定的任务。 下面代码是一个通用的延迟队列的实现,大家可以直接拿去用。 **代码还是比较简单的,技术要点:** - 调用addTask方法将任务丢到延迟队列中,主要参数(延迟时间、任务信息、回调【任务到期后会进行回调】) - 使用到了java中的延迟队列DelayQueue来存放延迟任务 - 下面的构造方法会自动调用一个start方法,start方法中会自动启动一个线程,线程轮询从延迟队列中拉取到期的任务,然后丢到线程池executorService.submit中进行处理,会自动调用创建延迟任务中指定的回调函数 - main方法中有使用步骤 ```java import java.util.concurrent.*; import java.util.function.Consumer; import java.util.logging.Logger; public class DelayQueueService<T> { Logger logger = Logger.getLogger(DelayQueueService.class.getName()); //延迟队列名称 private String delayQueueName; private DelayQueue<DelayedTask> delayQueue = new DelayQueue<>(); //处理队列中任务的线程池 private ExecutorService executorService; public DelayQueueService(String delayQueueName) { this(delayQueueName, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4)); } public DelayQueueService(String delayQueueName, ExecutorService executorService) { this.delayQueueName = delayQueueName; this.executorService = executorService; //启动队列消费 this.start(); } /** * 添加任务 * * @param delayedTimeUnit 延迟时间单位 * @param delayedTime 延迟时间 * @param task 任务 * @param consumer 任务消费者(到期了会回调) */ public void addTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer<T> consumer) { this.delayQueue.offer(new DelayedTask(delayedTimeUnit, delayedTime, task, consumer)); } private void start() { //轮询从延迟队列中拉取任务,然后调用线程池进行处理 Thread pollThread = new Thread(() -> { while (true) { try { DelayedTask delayedTask = this.delayQueue.poll(100, TimeUnit.MILLISECONDS); if (this.executorService.isShutdown()) { break; } if (delayedTask != null) { executorService.submit(() -> { delayedTask.consumer.accept(delayedTask.task); }); } } catch (InterruptedException e) { logger.warning(e.getMessage()); } } }); pollThread.setDaemon(Thread.currentThread().isDaemon()); pollThread.setName(this.getClass().getName() + "-pollThread-" + this.delayQueueName); pollThread.start(); } public void close() { if (!this.executorService.isShutdown()) { this.executorService.shutdown(); } } public class DelayedTask implements Delayed { //延迟时间单位 private TimeUnit delayedTimeUnit; //延迟时间 private long delayedTime; //到期时间(毫秒) private long endTime; //延迟任务信息 private T task; //消费者 private Consumer<T> consumer; public DelayedTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer<T> consumer) { this.delayedTimeUnit = delayedTimeUnit; this.delayedTime = delayedTime; this.task = task; this.endTime = System.currentTimeMillis() + delayedTimeUnit.toMillis(delayedTime); this.consumer = consumer; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { DelayedTask task = (DelayedTask) o; return Long.compare(this.endTime, task.endTime); } } public static void main(String[] args) { //创建一个延迟队列:用来对超过支付日期的订单进行关闭 String delayQueueName = "orderCloseDelayQueue"; //1、创建延迟队列 DelayQueueService<String> orderCloseDelayQueue = new DelayQueueService<String>(delayQueueName); for (int i = 1; i <= 10; i++) { //2、调用addTask将延迟任务加入延迟队列 orderCloseDelayQueue.addTask(TimeUnit.SECONDS, i, "订单" + i, new Consumer<String>() { @Override public void accept(String s) { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread() + ",关闭订单:" + s); } }); } //3、系统关闭的时候,调用延迟队列的close方法 //orderCloseDelayQueue.close(); } } ``` main方法中模拟了10个延迟任务,运行看看效果,输出 ```java 1614346780438,Thread[pool-1-thread-1,5,main],关闭订单:订单1 1614346781437,Thread[pool-1-thread-2,5,main],关闭订单:订单2 1614346782436,Thread[pool-1-thread-3,5,main],关闭订单:订单3 1614346783437,Thread[pool-1-thread-4,5,main],关闭订单:订单4 1614346784437,Thread[pool-1-thread-5,5,main],关闭订单:订单5 1614346785437,Thread[pool-1-thread-6,5,main],关闭订单:订单6 1614346786437,Thread[pool-1-thread-7,5,main],关闭订单:订单7 1614346787436,Thread[pool-1-thread-8,5,main],关闭订单:订单8 1614346788437,Thread[pool-1-thread-9,5,main],关闭订单:订单9 1614346789437,Thread[pool-1-thread-10,5,main],关闭订单:订单10 ``` <a style="display:none" target="_blank" href="https://mp.weixin.qq.com/s/_S1DD2JADnXvpexxaBwLLg" style="color:red; font-size:20px; font-weight:bold">继续收门徒,亲手带,月薪 4W 以下的可以来找我</a> ## 最新资料 1. <a href="https://mp.weixin.qq.com/s?__biz=MzkzOTI3Nzc0Mg==&mid=2247484964&idx=2&sn=c81bce2f26015ee0f9632ddc6c67df03&scene=21#wechat_redirect" target="_blank">尚硅谷 Java 学科全套教程(总 207.77GB)</a> 2. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484192&idx=1&sn=505f2faaa4cc911f553850667749bcbb&scene=21#wechat_redirect" target="_blank">2021 最新版 Java 微服务学习线路图 + 视频</a> 3. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484573&idx=1&sn=7f3d83892186c16c57bc0b99f03f1ffd&scene=21#wechat_redirect" target="_blank">阿里技术大佬整理的《Spring 学习笔记.pdf》</a> 4. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484544&idx=2&sn=c1dfe907cfaa5b9ae8e66fc247ccbe84&scene=21#wechat_redirect" target="_blank">阿里大佬的《MySQL 学习笔记高清.pdf》</a> 5. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485167&idx=1&sn=48d75c8e93e748235a3547f34921dfb7&scene=21#wechat_redirect" target="_blank">2021 版 java 高并发常见面试题汇总.pdf</a> 6. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485664&idx=1&sn=435f9f515a8f881642820d7790ad20ce&scene=21#wechat_redirect" target="_blank">Idea 快捷键大全.pdf</a> ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/1/2883e86e-3eff-404a-8943-0066e5e2b454.png)
相关专辑:
Java高并发教程