文章目录
- 引言
- 阻塞式中断的哲学
- 线程停止准则
- 强制关闭线程池的弊端
- 基于原子化标识优雅关闭
- 无界队列与毒丸消费
- 用后即焚的线程池
- 利用实现shutdownNow线程中断与取消可监控
- 设计与实现思路
- 功能落地注意事项
- 使用注意事项
- 小结
- 参考
引言
我们大部分情况下并发任务都是交由提前设置好的线程池统一管理,这其中对于池化技术的优雅关闭就涉及任务的终止和资源兜底,所以本文将针对这一话题展开探讨。
我是SharkChili,Java 开发者,Java Guide开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
阻塞式中断的哲学
线程停止准则
对于线程的生命周期的管理,按照并发的哲学:
除非拥有某个线程,否则不能对该线程进行操控
所以只有线程所属的线程池才具备对其生命周期的管理,所以在Java应用程序的维度,它是不具备直接管理线程池线程的权限,即非所属线程池维度的线程关闭是需要通过服务于线程池生命周期方法间接关闭线程:
例如应用程序关闭时,对应服务ExecutorService这个服务的关闭可直接通过shutdown或者shutdownNow中断所有的线程:
//等待所有任务结束后关闭threadPool.shutdown();//即刻关闭所有的任务,返回已提交但是还未开始的任务threadPool.shutdownNow();这里也补充一句shutdown和shutdownNow的区别:
shutdown会等待所有任务执行完成再关闭,所以关闭的响应可能会有些许延迟。shutdownNow会直接强行关闭执行任务,同时将未启动的任务返回。
强制关闭线程池的弊端
如果采用强制关闭的方式将线程池直接关闭,就可能导致一些资源未能及时处理而丢失,例如:我们的有一个日志线程,它会不断轮询外部线程投递到阻塞队列的信息并将其写入磁盘。如下图,可以看到如果日志线程在队列未消费完过程中直接强制打断,就会导致一些数据未能及时消费而丢失:
对应的我们给出这个日志工具的示例,可以看到笔者的所编写的Logwritter,它可以通过构造方法日志队列和工作线程写入日志的文件路径logPath,使得logThread可以通过阻塞轮询队列完成日志异步写入:
publicclassLogwritter{privatefinalBlockingQueue<String>queue;privatefinalLogThreadlogThread;//基于外部入参完成消费者线程初始化publicLogwritter(StringlogPath){this.queue=newArrayBlockingQueue<>(100);this.logThread=newLogThread(logPath,queue);}//启动消费者线程publicvoidstart(){logThread.start();}//外部线程可通过log方法将消息存入队列中,让logThread写入本地publicvoidlog(Stringmsg)throwsInterruptedException{queue.put(msg);}//......}对应的我们也给出日志线程的代码,可以看到在interrupted没有被设置为true之前这段该线程就会不断轮询队列数据:
privatestaticclassLogThreadextendsThread{privatefinalBlockingQueue<String>queue;privatefinalBufferedOutputStreamoutputStream;//基于外部入参初始化日志写入路径和消费日志消息的阻塞队列publicLogThread(StringlogPath,BlockingQueue<String>queue){this.outputStream=FileUtil.getOutputStream(logPath);this.queue=queue;}privatebooleaninterrupted=false;//中断当前线程publicvoidinterrupt(){this.interrupted=true;}publicvoidrun(){try{//标识非中断则继续阻塞获取日志数据while(!interrupted){Console.log("阻塞获取日志......");outputStream.write(queue.take().getBytes(StandardCharsets.UTF_8));}}catch(Exceptione){thrownewRuntimeException(e);}finally{if(outputStream!=null){try{outputStream.close();}catch(IOExceptione){//......}}}}}对应的我们给出基础调测代码:
Logwritterlogwritter=newLogwritter("/tmp/logwritter.log");logwritter.start();logwritter.log("hello");logwritter.log("hello");newThread(()->{logwritter.interrupt();}).start();输出结果如下,可以看到代码理想情况下会因为标识的设置而中断:
阻塞获取日志......线程中断......所以这段代码存在一个很严重的缺陷,试想一下如果阻塞队列没有元素的情况下,我们尝试打断日志线程,此时日志线程就会因为阻塞等待队列元素而无法轮询查看中断标识,进而处于长时间阻塞等待的一种状态:
如下代码所示,我们不添加任何元素的情况下直接异步打断线程:
Logwritterlogwritter=newLogwritter("/tmp/logwritter.log");logwritter.start();newThread(()->{logwritter.interrupt();}).start();从输出结果就可以看出,此时代码就处于一个阻塞状态,必须等到获取完一个元素后才能中断循环·:
阻塞获取日志......基于原子化标识优雅关闭
所以,要想解决上述问题,我们必须做到以下几点:
- 中断日志线程时,其他线程再次调用log写入日志时会告知日志线程已停止,不可进行消息写入
- 中断要尽可能及时日志线程感知,避免阻塞等待下一个元素到来时才检查标识完成中断
- 日志线程收到中断信号会确保当前日志写入到文件后再中断停止
基于这种思路,笔者给出优化后的代码,我们先从顶层的Logwritter开始,可以看到笔者将中断操作做了如下改动:
- 声明一个中断标识和记录阻塞队列容量的变量
remaining - 调用log写入日志前,检查是否中断,若没中断则累加计数并写入日志到队列,反之直接返回
- 中断时设置中断标识,并调用
logThread中断方法让其中断
privatefinalBlockingQueue<String>queue;privatefinalBlockingQueue<String>queue;privatefinalLogThreadlogThread;privatebooleaninterrupted=false;privateintremaining;//基于外部入参完成消费者线程初始化publicLogwritter(StringlogPath,intsize){this.queue=newArrayBlockingQueue<>(size);this.logThread=newLogThread(logPath,queue);}//启动消费者线程publicvoidstart(){logThread.start();}//外部线程可通过log方法将消息存入队列中,让logThread写入本地publicvoidlog(Stringmsg)throwsInterruptedException{synchronized(this){//上实例锁检查中断,若中断则输出日志直接返回,反之累加remainingif(interrupted){Console.log("日志线程已中断,消息:{}无法写入",msg);return;}remaining++;}//将阻塞操作放在锁外部,避免因为队列阻塞等待导致所有线程锁住queue.put(msg);Console.log("写入消息成功,消息:{}",msg);}//中断当前线程publicvoidinterrupt(){synchronized(this){interrupted=true;}logThread.interrupt();}//......}重点来了,LogThread逻辑调整为,通过异常感知到中断信号,基于interrupted保留中断状态,并通过remaining数值完成剩余日志写入:
privateclassLogThreadextendsThread{privatefinalBlockingQueue<String>queue;privatefinalBufferedOutputStreamoutputStream;//基于外部入参初始化日志写入路径和消费日志消息的阻塞队列publicLogThread(StringlogPath,BlockingQueue<String>queue){this.outputStream=FileUtil.getOutputStream(logPath);this.queue=queue;}publicvoidrun(){try{//线程未中断或者remaining不为0则继续执行循环,知道被中断且remaining为0时退出while(!interrupted||remaining!=0){try{Stringmsg=queue.take();outputStream.write((msg+"\r\n").getBytes(StandardCharsets.UTF_8));Console.log("写入日志{}成功",msg);remaining--;}catch(InterruptedExceptione){//收到中断后,保存中断状态,继续完成队列中元素消费后退出Console.log("线程中断......");interrupted=true;}}Console.log("线程处理结束");}catch(IOExceptione){//处理io异常}finally{if(outputStream!=null){try{outputStream.close();}catch(IOExceptione){//......}}}}}最后我们给出测试代码,大体逻辑:
- 是启动日志线程后阻塞等待
- 生产者投递日志
- 在日志线程消费者将其打断,查看日志线程是否会在收到中断后完成日志消费再退出
Logwritterlogwritter=newLogwritter("/tmp/logwritter.log",100);//启动日志线程,阻塞等待小肥logwritter.start();//写入日志for(inti=0;i<10;i++){logwritter.log("msg"+i);}//中断newThread(()->logwritter.interrupt()).start();这里笔者基于IDEA的线程模式调试出这段逻辑,对应的输出结果如下,可以看到即使收到中断信号,线程也会将队列中的消息消费完成再退出循环:
写入消息成功,消息:msg0 写入消息成功,消息:msg1 写入消息成功,消息:msg2 写入消息成功,消息:msg3 写入消息成功,消息:msg4 写入消息成功,消息:msg5 写入消息成功,消息:msg6 写入消息成功,消息:msg7 写入消息成功,消息:msg8 写入消息成功,消息:msg9 写入日志msg0成功 线程中断...... 写入日志msg1成功 写入日志msg2成功 写入日志msg3成功 写入日志msg4成功 写入日志msg5成功 写入日志msg6成功 写入日志msg7成功 写入日志msg8成功 写入日志msg9成功 线程处理结束无界队列与毒丸消费
对于传统的生产者消费者模式,面对不会阻塞的无界队列,我们完全可以使用毒丸(poison pill)即特定的元素作为中断标识,确保生产者可以在适当的时机将其放在队列上,当消费者消费到这个对象时立即退出:
对应的我们给出毒丸的定义,因为笔者演示的阻塞队列是字符串类型所以协定好的毒丸就是字符串对象
//协定好的结束标识publicstaticfinalStringPOISON_PILL="POISON_PILL";消费者的代码逻辑也很简单,直接轮询读取队列数据,如果碰到的元素是毒丸则直接退出循环:
publicclassConsumerimplementsRunnable{privatefinalBlockingQueue<String>queue;privatefinalThreadthread;publicConsumer(BlockingQueue<String>queue){this.queue=queue;thread=newThread(this);}publicvoidstart(){Console.log("消费者启动");thread.start();}@Overridepublicvoidrun(){while(true){try{//利用毒丸感知异常中断退出Stringelement=queue.take();if(element.equals(POISON_PILL)){Console.log("消费到毒丸,消费者立即停止");break;}Console.log("消费元素{}成功",element);}catch(InterruptedExceptione){//......}}}}测试代码如下,可以看到笔者尝试插入100w个元素,并利用另外一个线程随机插入毒丸将生产者停止:
BlockingQueue<String>queue=newArrayBlockingQueue<>(1);Consumerconsumer=newConsumer(queue);consumer.start();newThread(()->{try{//随机插入毒丸ThreadUtil.sleep(RandomUtil.randomInt(5000));queue.put(POISON_PILL);}catch(InterruptedExceptione){//......}}).start();IntStream.range(0,100_0000).forEach(i->{//轮询插入100w个元素try{queue.offer(String.valueOf(i),5,TimeUnit.SECONDS);}catch(InterruptedExceptione){//.....}});可以看到,在消费了84w左右的元素时,消费者看到毒丸立即停止退出了:
毒丸在已知的生产者消费者模式下,可以注入有限的毒丸标识中断线程,所以使用这种方式控制线程就必须保证消费者数目已知,可以通过声明有限的毒丸停止线程。
用后即焚的线程池
对于只需要使用一次,注意笔者所强调的只需要使用到一次的异步线程池,我们可以直接通过juc流程控制工具CountDownLatch确保线程池中所有任务完成后销毁线程池,将线程池限制在当前函数的生命周期。
例如我们现在希望执行一批数据的乘2运算,我们希望并行执行这批数据的运算再累加起来,此时我们就可以遍历这批数据并将其提交到线程池中完成计算并累加,然后使用shutdown销毁线程池:
对应的我们给出一次性线程池示例和演示代码:
publicstaticvoidmain(String[]args)throwsInterruptedException{Console.log("并发计算和:{}",calculateInParallel(4));//并发计算和:20}/** * 从1开始遍历入参闭集,并提交到线程池中执行*2运算,累加返回 * * @param rangeClosed * @return 并行运算的最终结果 */publicstaticintcalculateInParallel(intrangeClosed){LongAdderadder=newLongAdder();CountDownLatchcountDownLatch=newCountDownLatch(rangeClosed);//声明闭集数一致的线程池ExecutorServiceexecutorService=Executors.newFixedThreadPool(rangeClosed);for(inti=1;i<=rangeClosed;i++){//并行运算每个数值的双数倍并利用原子类累加intnum=i;executorService.execute(()->{adder.add(num<<1);countDownLatch.countDown();});}try{//等待所有线程执行完成countDownLatch.await();}catch(InterruptedExceptione){//......}//用后即焚一次性线程池executorService.shutdown();returnadder.intValue();}最后笔者要特别说明一下,这个方案有一定的局限性,使用时必须保线程池对应的函数仅被使用少次,如果单位时间内并发调用这个函数尽可能导致独立线程池飙升,进而打爆内存:
对于此类线程池管理的使用案例,感兴趣的读者可以关注笔者这篇文章:
Java线程池知识点小结:https://mp.weixin.qq.com/s/O8MLoni3QE9UA1tLid6C5w
利用实现shutdownNow线程中断与取消可监控
设计与实现思路
从微观的角度了解了关于线程池中的线程的优雅关闭几种技巧之后,我们再来聊聊线程池维度对于取消和中断任务的监控。
上文我们了解到shutdown是优雅关闭,确保所有的任务都执行完成之后销毁线程池。而shutdownNow是一种能够实时关闭正在执行任务,同时还能够取消还未执行任务并返回的函数。所以,如果对于实时性要求较高的场景,我们更推荐使用shutdownNow。
但shutdownNow也存在一定的局限性,即它只能知晓那些数取消的任务,却不知道那些是中断的任务,所以shutdownNow对于需要监控异或者恢复中断的任务的场景就有些力不从心了。
对此,我们可以自行继承线程池框架,并对shutdownNow进行改造,大体思路为:
- 调用
shutdown关闭线程池时,内部调用shutdownNow获取已提交未执行的任务,保存到任务取消列表。 shutdownNow会线程池会将正在执行的任务中断,利用这个中断判断当前线程池状态是否被设置为关闭且当前线程状态是否中断,如果则将其存入中断列表。
对应的我们给出落地代码,整体实现思路如下:
自定义线程池继承
AbstractExecutorService获取线程池基本行为函数声明取消队列
cancelledTaskList和中断队列interruptedTaskList实现
stop方法,内部调用shutdownNow将已提交未执行的取消任务存入取消队列cancelledTaskListexecute函数重写,将外部任务提交到我们内部聚合的线程池中,并保证线程池关闭且当前线程执行被中断的情况下,将该任务存入中断队列
publicclassResumableThreadPoolExecutorextendsAbstractExecutorService{/** * 记录已提交但未执行就被取消的任务 */privatefinalList<Runnable>cancelledTaskList=newArrayList<>();/** * 记录正在执行然后被中断的任务 */privatefinalList<Runnable>interruptedTaskList=newArrayList<>();privateExecutorServiceexecutor;publicResumableThreadPoolExecutor(intsize){executor=Executors.newFixedThreadPool(size);}@Overridepublicvoidexecute(Runnablecommand){executor.execute(()->{try{Console.log("{}执行任务",Thread.currentThread().getName());command.run();}finally{if(isShutdown()&&Thread.currentThread().isInterrupted()){//将线程池关闭后中断的任务存入中断队列interruptedTaskList.add(command);}}});}publicList<Runnable>getCancelledTaskList(){if(!executor.isTerminated()){thrownewRuntimeException("线程池未关闭");}//安全发布取消队列,避免对内部取消列表的不安全修改returnnewArrayList<>(cancelledTaskList);}publicList<Runnable>getInterruptedTaskList(){//安全发布取消队列,避免对内部中断列表的不安全修改returnnewArrayList<>(interruptedTaskList);}@Overridepublicvoidshutdown(){executor.shutdown();}@OverridepublicList<Runnable>shutdownNow(){returnexecutor.shutdownNow();}publicvoidstop(){cancelledTaskList.addAll(executor.shutdownNow());//help gcexecutor=null;}//......}测试代码如下,因为笔者声明的线程池数为1,所以关闭线程池之后所得到的中断任务和取消任务数都为1:
ResumableThreadPoolExecutorthreadPool=newResumableThreadPoolExecutor(1);threadPool.execute(()->{try{TimeUnit.DAYS.sleep(1);}catch(InterruptedExceptione){Console.log("task-0被中断,保留中断状态");//保留中断状态,避免catch后中断状态被清除,进而导致中断任务无法存入中断队列Thread.currentThread().interrupt();}});threadPool.execute(()->{ThreadUtil.sleep(1,TimeUnit.DAYS);});threadPool.stop();threadPool.awaitTermination(5,TimeUnit.SECONDS);Console.log("中断的任务数:{}",threadPool.getInterruptedTaskList().size());Console.log("取消的任务数:{}",threadPool.getCancelledTaskList().size());功能落地注意事项
这段代码逻辑比较简单,唯一需要注意的是task-0对于终端状态的保留,默认情况下shutdown或者shutdownNow关闭线程池时正在执行的线程就会被中断,对应的我们可以查看ThreadPoolExecutor的shutdownNow方法印证:
publicList<Runnable>shutdownNow(){List<Runnable>tasks;finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{//......//中断正在运行的线程interruptWorkers();tasks=drainQueue();}finally{//......}//......returntasks;}重点来了,被中断的线程一旦被catch块捕获,对应的中断状态就会被清除,如果我们不保留这个状态的话,那么这个被中断的任务就会b因为状态被清除而导致无法被存入中断队列,这也就是为什么笔者的测试代码在捕获到中断之后又手动处理执行一下中断,就是为了保证execute的finally语句块能够感知到线程中断状态保证任务能够正确的被存入中断队列:
更多关于线程中断的管理,感兴趣的可以参考笔者这篇文章:
如何优雅的中断java线程:https://mp.weixin.qq.com/s/GWP9qf5W_O1HJ7UMIzwJvQ
使用注意事项
该线程虽然保证线程中断与取消状态保留,但读者在基于该线程池恢复启动任务时还是需要注意一下任务处理的幂等性,因为线程池仅仅保留的中断的状态,对于任务的状态并没有做相应的处理。
小结
我们来简单概括一下本文的内容:
- 线程池的几种关闭方式
- 线程池关闭的几个准则和实践
- 一次性线程池的使用技巧
- 如何实现状态可监控的线程池
- 线程池中断与恢复的注意事项
我是SharkChili,Java 开发者,Java Guide开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
参考
《Java并发编程实战》