记一次项目小改造-序列化保存线程池任务

Mars 2020年05月24日 14次浏览

记一次项目小改造

背景

  • 原因

    我们线上的老项目里面有着几个线程池,在测试环境重启项目的时候偶尔会上游系统收不到该项目的响应(成功或者失败都没有结果),即当前项目在关闭的时候出现了丢任务

  • 进一步分析

    由于该项目注册了jvm钩子,在钩子关闭时候会一次关闭线程池,进一步发现原来线程池继承自spring框架的,伪代码如下

    class MyPool extends ThreadPoolTaskExecutor{
    	//other method
    }
    
    //钩子线程如下
    MyPool myPool = new MyPool();
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        myPool.shutdown();
    }));
    

    关闭线程池调用的也是spring的shutdown,spring只提供了这一个方法关闭线程池

  • 继续探究spring线程池的shutdown方法

    	private boolean waitForTasksToCompleteOnShutdown = false;
    
    	private int awaitTerminationSeconds = 0;
    
    
    	public void shutdown() {
    		if (logger.isInfoEnabled()) {
    			logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
    		}
    		if (this.executor != null) {
    			if (this.waitForTasksToCompleteOnShutdown) {
    				this.executor.shutdown();
    			}
    			else {
    				for (Runnable remainingTask : this.executor.shutdownNow()) {
    					cancelRemainingTask(remainingTask);
    				}
    			}
    			awaitTerminationIfNecessary(this.executor);
    		}
    	}
    

    进入shutdown方法可以发现,spring线程池通过成员变量waitForTasksToCompleteOnShutdown来决定关闭线程池的方式,老项目没有设置该变量的值,因此注册的钩子线程如同虚设,线程池最终直接调用了executor.shutdownNow()方法关闭线程池,因此正在执行的任务和线程池队列堆积的任务都将丢失。

  • 接着进入awaitTerminationIfNecessary方法

    private void awaitTerminationIfNecessary(ExecutorService executor) {
    		if (this.awaitTerminationSeconds > 0) {
    			try {
    				if (!executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {
    					if (logger.isWarnEnabled()) {
    						logger.warn("Timed out while waiting for executor" +
    								(this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
    					}
    				}
    			}
    			catch (InterruptedException ex) {
    				if (logger.isWarnEnabled()) {
    					logger.warn("Interrupted while waiting for executor" +
    							(this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
    				}
    				Thread.currentThread().interrupt();
    			}
    		}
    	}
    

    即如果设置的awaitTerminationSeconds值,则会调用jdk自带的executor.awaitTermination()方法延时

初步改进

设置spring线程池属性来关闭线程池

  • 设置线程池属性
MyPool myPool = new MyPool();

//为true则会延时关闭
myPool.setWaitForTasksToCompleteOnShutdown(true);
myPool.setAwaitTerminationSeconds(180);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    myPool.shutdown();
}));
  • 当waitForTasksToCompleteOnShutdown为true时,shutdown方法会走if分支

    @Nullable
    private ExecutorService executor;
    
    
    if (this.waitForTasksToCompleteOnShutdown) {
        this.executor.shutdown();
    }
    

    此时相当于调用的jdk线程池的shutdown,shutdown会通知线程池不再接受任务,当前线程池已接受的任务会继续执行

  • 接着进入awaitTerminationSeconds()方法,延时关闭线程池

原生jdk优雅的关闭线程池

  • 从spring线程池获取到实际执行的线程池

    ThreadPoolExecutor poolExecutor = myPool.getThreadPoolExecutor();
    
  • 优雅关闭

    poolExecutor.shutdown();
    
    try {
        if (!poolExecutor.awaitTermination(3, TimeUnit.MINUTES)){
            //未完成任务
        List<Runnable> unfinishedTask= poolExecutor.shutdownNow();
        }
    } catch (InterruptedException e) {
        poolExecutor.shutdownNow();
    }
    

初步改进存在的问题

  • 关闭线程池方式较之前好了很多,但是当延时关闭线程池之后仍然存在正在执行的任务被中断的可能性,虽然这种情况无法避免,但是我们需要让正在执行的任务跑完,若是阻塞了再中断结束线程池
  • 线程池关闭之后未执行的任务仍然无法在下次启动继续执行,即未保存,仍会丢失

继续改进-保存未执行的任务

  • 原项目提交任务方式

    myPool.execute(new Runnable() {
        @Override
        public void run() {
            //todo
        }
    });
    
    myPool.execute(()->{
        //todo
    });