Java

ExecuterServiceを強制終了できるようにした

kght6123

kght6123

ExecuterServiceを強制終了できるようにした

解説

JavaのExecutorServiceに対して

死なないタスクにInterruptedExceptionを発生させて、

任意のタイミングで強制終了させることができるExecutorServiceWrapperとそのWrapperを簡単に作るExecutorServiceUtilを作りました

並行処理が中途半端でも良いので完全に停止させたい時や、常駐処理を停止したいときに使います。

どなたかの参考になれば、、、、と思います。

(loggerは任意のものに置き換えて使ってください!)

ソースコード

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ExecutorServiceUtil {
	
	/**
	 * ユニットテスト
	 * @param args
	 */
	public static void main(final String... args) {
		
		final Logger logger = /* 任意のloggerに置き換える */;
		
		final int THROW = 1;
		final int TRY_CATCH = 2;
		final int TRY_CATCH_NOT_RTN = 3;
		
		class C1 implements Callable<Boolean> {
			
			private final long milliseconds;
			private final int callType;
			
			private C1(final long milliseconds, final int callType) {
				this.milliseconds = milliseconds;
				this.callType = callType;
			}
			
			@Override
			public Boolean call() throws Exception {
				
				switch(callType) {
					case THROW :
						return callForThrow();
					case TRY_CATCH :
						return callForTryCatch();
					case TRY_CATCH_NOT_RTN :
						return callForTryCatchNotReturn();
					default :
						return false;
				}
			}
			
			/**
			 * エラーにしてすぐ死ぬ
			 * @return
			 * @throws InterruptedException
			 */
			private Boolean callForThrow() throws InterruptedException {
				logger.info("callForThrow -start-");
				Thread.sleep(this.milliseconds);
				logger.info("callForThrow -finish-");
				return true;
			}
			
			/**
			 * catchして正常返して死ぬ
			 * @return
			 */
			private Boolean callForTryCatch() {
				try {
					logger.info("callForTryCatch -start-");
					Thread.sleep(this.milliseconds);
					logger.info("callForTryCatch -finish-");
					return true;
				} catch (InterruptedException e) {
					logger.error("callForTryCatch -error-");
					return false;
				}
			}
			
			/**
			 * catchして絶対死なない
			 * @return
			 */
			private Boolean callForTryCatchNotReturn() {
				while(true) {
					try {
						logger.info("callForTryCatchNotReturn -start-");
						Thread.sleep(this.milliseconds);
						logger.info("callForTryCatchNotReturn -finish-");
						return true;
					} catch (InterruptedException e) {
						logger.error("callForTryCatchNotReturn -error-");
						continue;
					}
				}
			}
		}
		
		logger.info("main -start-");
		
		// 下記のデフォルト時間を弄って検証する
		try (final ExecutorServiceWrapper esw = 
				newFixedThreadPool(3, new ExecutorServiceWrapper.ShutdownInfo(3, TimeUnit.SECONDS, logger));) {
			
			logger.info("main submit -start-");
			esw.submit(new C1(10 * 1000, THROW));
			esw.submit(new C1(10 * 1000, TRY_CATCH));
			esw.submit(new C1(10 * 1000, TRY_CATCH_NOT_RTN));
			
			esw.submit(new C1(10 * 1000, THROW));
			esw.submit(new C1(10 * 1000, THROW));
			
			logger.info("main sleep -start-");
			Thread.sleep(60 * 1000);
			
			logger.info("main shutdown -start-");
			
		} catch (final InterruptedException e) {
			logger.error(e.getClass().getName(), e);
		}
	}
	
	/**
	 * Executors.newFixedThreadPool+ExecutorServiceWrapper生成を行う簡易メソッド
	 * 
	 * @param maxSize
	 * @param shutdownInfo
	 * @return
	 */
	public static ExecutorServiceWrapper newFixedThreadPool(final int maxSize, final ExecutorServiceWrapper.ShutdownInfo shutdownInfo) {
		// XXX JDK8以降 のExecutors.newWorkStealingPool(20);を使いたいが、並列実行の競合を減らすためなので、意味合いがnewFixedThreadPoolとちょっと違う。
		return new ExecutorServiceWrapper(Executors.newFixedThreadPool(maxSize), shutdownInfo);
	}
	
	/**
	 * Executors.newFixedThreadPool+DeamonThread化+ ExecutorServiceWrapper生成を行う簡易メソッド
	 * 
	 * @param maxSize
	 * @param shutdownInfo
	 * @return
	 */
	public static ExecutorServiceWrapper newFixedDeamonThreadPool(final int maxSize, final ExecutorServiceWrapper.ShutdownInfo shutdownInfo) {
		return new ExecutorServiceWrapper(Executors.newFixedThreadPool(maxSize, new DaemonThreadFactory()), shutdownInfo);
	}
	
	/**
	 * Executors.newCachedThreadPool+DeamonThread化+ ExecutorServiceWrapper生成を行う簡易メソッド
	 * 
	 * @param shutdownInfo
	 * @return
	 */
	public static ExecutorServiceWrapper newCachedDeamonThreadPool(final ExecutorServiceWrapper.ShutdownInfo shutdownInfo) {
		return new ExecutorServiceWrapper(Executors.newCachedThreadPool(new DaemonThreadFactory()), shutdownInfo);
	}
	
	/**
	 * ExecutorServiceをWrappし、AutoCloseableで終了処理を実装したクラス
	 * 
	 */
	public static class ExecutorServiceWrapper implements AutoCloseable, ExecutorService {
		
		private final ExecutorService es;
		private final ExecutorServiceWrapper.ShutdownInfo sdi;
		
		private ExecutorServiceWrapper(final ExecutorService es, final Logger logger) {
			super();
			this.es = es;
			this.sdi = new ExecutorServiceWrapper.ShutdownInfo(3, TimeUnit.SECONDS, logger);
		}
		
		private ExecutorServiceWrapper(final ExecutorService es, final ExecutorServiceWrapper.ShutdownInfo shutdownInfo) {
			super();
			this.es = es;
			this.sdi = shutdownInfo;
		}
		
		// -- ExecutorServiceの委譲メソッド -- start --
		
		@Override public boolean awaitTermination(long arg0, TimeUnit arg1) throws InterruptedException {
			return es.awaitTermination(arg0, arg1);
		}
		@Override public void execute(Runnable arg0) {
			es.execute(arg0);
		}
		@Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> arg0, long arg1, TimeUnit arg2)
				throws InterruptedException {
			return es.invokeAll(arg0, arg1, arg2);
		}
		@Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> arg0) throws InterruptedException {
			return es.invokeAll(arg0);
		}
		@Override public <T> T invokeAny(Collection<? extends Callable<T>> arg0, long arg1, TimeUnit arg2)
				throws InterruptedException, ExecutionException, TimeoutException {
			return es.invokeAny(arg0, arg1, arg2);
		}
		@Override public <T> T invokeAny(Collection<? extends Callable<T>> arg0) throws InterruptedException, ExecutionException {
			return es.invokeAny(arg0);
		}
		@Override public boolean isShutdown() {
			return es.isShutdown();
		}
		@Override public boolean isTerminated() {
			return es.isTerminated();
		}
		@Override public void shutdown() {
			es.shutdown();
		}
		@Override public List<Runnable> shutdownNow() {
			return es.shutdownNow();
		}
		@Override public <T> Future<T> submit(Callable<T> arg0) {
			return es.submit(arg0);
		}
		@Override public <T> Future<T> submit(Runnable arg0, T arg1) {
			return es.submit(arg0, arg1);
		}
		@Override public Future<?> submit(Runnable arg0) {
			return es.submit(arg0);
		}
		
		// -- ExecutorServiceの委譲メソッド -- end --
		
		@Override public void close() throws InterruptedException {
			ExecutorServiceUtil.shutdown(es, sdi.waitTime, sdi.waitTimeUnit, sdi.logger);
		}
		
		public static class ShutdownInfo {
			
			private final int waitTime;
			private final TimeUnit waitTimeUnit;
			private final Logger logger;
			
			public ShutdownInfo(final int waitTime, final TimeUnit waitTimeUnit, final Logger logger) {
				super();
				this.waitTime = waitTime;
				this.waitTimeUnit = waitTimeUnit;
				this.logger = logger;
			}
		}
	}
	
	
	/**
	 * DaemonThreadでExecutorServiceを作成するためのFactory
	 */
	public static class DaemonThreadFactory implements ThreadFactory {
		
		public Thread newThread(final Runnable r){
			final Thread t = new Thread(r);
			t.setDaemon(true);
			return t;
		}
	}
	
	/**
	 * ExecutorServiceをシャットダウンするための共通処理
	 * 
	 * @param es
	 * @param waitTime
	 * @param waitTimeUnit
	 * @param logger
	 */
	public static void shutdown(final ExecutorService es, final long waitTime, final TimeUnit waitTimeUnit, final Logger logger) {
		
		if(es == null) return;
		
		logger.info("shutdown ExecutorService.");
		es.shutdown();	// すべて終了してるとき、ココですぐ死ぬ
		
		try {
			if (!es.isTerminated()) {
				// 死んでないときに実行
				logger.info("awaitTermination ExecutorService.");
				
				// 死なないタスクにInterruptedExceptionを発生させて、指定時間待つ。
				final boolean exit = 
						es.awaitTermination(waitTime, waitTimeUnit);
				
				if(!exit) {
					// それでも死ななかったとき、強制シャットダウン
					logger.warn("shutdownNow ExecutorService.");
					final List<Runnable> notDieList = 
							es.shutdownNow();	// 実行されてなかったタスクは返され、実行中のタスクは処理続行される
					
					logger.warn("shutdownNow not Die size = "+notDieList.size());
					for (final Runnable notDie : notDieList) {
						logger.warn("shutdownNow not Die = "+notDie);
					}
				}
			}
		} catch (final InterruptedException e) {
			logger.error("InterruptedException.", e);
		}
	}
}