About

本サイトについて

趣味で開発したプログラムや開発メモを載せています。
ソースコードはGithubで公開しつつ、なるべく後から分かるように解説に努めてますので、
誰かのお役に立てれば嬉しいです。

プロフィール

kght6123

佐賀県出身で1985年生まれ。
三重県四日市市在住のシステムエンジニア。家庭を大事にしたい2児の父。

kght6123.page

ExecuterServiceを強制終了できるようにした

2022-10-01T11:54:24.867Z

解説

JavaのExecutorServiceに対して

死なないタスクにInterruptedExceptionを発生させて、

任意のタイミングで強制終了させることができるExecutorServiceWrapperとそのWrapperを簡単に作るExecutorServiceUtilを作りました

並行処理が中途半端でも良いので完全に停止させたい時や、常駐処理を停止したいときに使います。

どなたかの参考になれば、、、、と思います。

(loggerは任意のものに置き換えて使ってください!)

ソースコード

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ExecutorServiceUtil {
    
    /**
     * ユニットテスト
     * @param args
     */
    public static void main(final String... args) {
        
        final Logger logger = /* 任意のloggerに置き換える */;
        
        final int THROW = 1;
        final int TRY_CATCH = 2;
        final int TRY_CATCH_NOT_RTN = 3;
        
        class C1 implements Callable<Boolean> {
            
            private final long milliseconds;
            private final int callType;
            
            private C1(final long milliseconds, final int callType) {
                this.milliseconds = milliseconds;
                this.callType = callType;
            }
            
            @Override
            public Boolean call() throws Exception {
                
                switch(callType) {
                    case THROW :
                        return callForThrow();
                    case TRY_CATCH :
                        return callForTryCatch();
                    case TRY_CATCH_NOT_RTN :
                        return callForTryCatchNotReturn();
                    default :
                        return false;
                }
            }
            
            /**
             * エラーにしてすぐ死ぬ
             * @return
             * @throws InterruptedException
             */
            private Boolean callForThrow() throws InterruptedException {
                logger.info("callForThrow -start-");
                Thread.sleep(this.milliseconds);
                logger.info("callForThrow -finish-");
                return true;
            }
            
            /**
             * catchして正常返して死ぬ
             * @return
             */
            private Boolean callForTryCatch() {
                try {
                    logger.info("callForTryCatch -start-");
                    Thread.sleep(this.milliseconds);
                    logger.info("callForTryCatch -finish-");
                    return true;
                } catch (InterruptedException e) {
                    logger.error("callForTryCatch -error-");
                    return false;
                }
            }
            
            /**
             * catchして絶対死なない
             * @return
             */
            private Boolean callForTryCatchNotReturn() {
                while(true) {
                    try {
                        logger.info("callForTryCatchNotReturn -start-");
                        Thread.sleep(this.milliseconds);
                        logger.info("callForTryCatchNotReturn -finish-");
                        return true;
                    } catch (InterruptedException e) {
                        logger.error("callForTryCatchNotReturn -error-");
                        continue;
                    }
                }
            }
        }
        
        logger.info("main -start-");
        
        // 下記のデフォルト時間を弄って検証する
        try (final ExecutorServiceWrapper esw = 
                newFixedThreadPool(3, new ExecutorServiceWrapper.ShutdownInfo(3, TimeUnit.SECONDS, logger));) {
            
            logger.info("main submit -start-");
            esw.submit(new C1(10 * 1000, THROW));
            esw.submit(new C1(10 * 1000, TRY_CATCH));
            esw.submit(new C1(10 * 1000, TRY_CATCH_NOT_RTN));
            
            esw.submit(new C1(10 * 1000, THROW));
            esw.submit(new C1(10 * 1000, THROW));
            
            logger.info("main sleep -start-");
            Thread.sleep(60 * 1000);
            
            logger.info("main shutdown -start-");
            
        } catch (final InterruptedException e) {
            logger.error(e.getClass().getName(), e);
        }
    }
    
    /**
     * Executors.newFixedThreadPool+ExecutorServiceWrapper生成を行う簡易メソッド
     * 
     * @param maxSize
     * @param shutdownInfo
     * @return
     */
    public static ExecutorServiceWrapper newFixedThreadPool(final int maxSize, final ExecutorServiceWrapper.ShutdownInfo shutdownInfo) {
        // XXX JDK8以降 のExecutors.newWorkStealingPool(20);を使いたいが、並列実行の競合を減らすためなので、意味合いがnewFixedThreadPoolとちょっと違う。
        return new ExecutorServiceWrapper(Executors.newFixedThreadPool(maxSize), shutdownInfo);
    }
    
    /**
     * Executors.newFixedThreadPool+DeamonThread化+ ExecutorServiceWrapper生成を行う簡易メソッド
     * 
     * @param maxSize
     * @param shutdownInfo
     * @return
     */
    public static ExecutorServiceWrapper newFixedDeamonThreadPool(final int maxSize, final ExecutorServiceWrapper.ShutdownInfo shutdownInfo) {
        return new ExecutorServiceWrapper(Executors.newFixedThreadPool(maxSize, new DaemonThreadFactory()), shutdownInfo);
    }
    
    /**
     * Executors.newCachedThreadPool+DeamonThread化+ ExecutorServiceWrapper生成を行う簡易メソッド
     * 
     * @param shutdownInfo
     * @return
     */
    public static ExecutorServiceWrapper newCachedDeamonThreadPool(final ExecutorServiceWrapper.ShutdownInfo shutdownInfo) {
        return new ExecutorServiceWrapper(Executors.newCachedThreadPool(new DaemonThreadFactory()), shutdownInfo);
    }
    
    /**
     * ExecutorServiceをWrappし、AutoCloseableで終了処理を実装したクラス
     * 
     */
    public static class ExecutorServiceWrapper implements AutoCloseable, ExecutorService {
        
        private final ExecutorService es;
        private final ExecutorServiceWrapper.ShutdownInfo sdi;
        
        private ExecutorServiceWrapper(final ExecutorService es, final Logger logger) {
            super();
            this.es = es;
            this.sdi = new ExecutorServiceWrapper.ShutdownInfo(3, TimeUnit.SECONDS, logger);
        }
        
        private ExecutorServiceWrapper(final ExecutorService es, final ExecutorServiceWrapper.ShutdownInfo shutdownInfo) {
            super();
            this.es = es;
            this.sdi = shutdownInfo;
        }
        
        // -- ExecutorServiceの委譲メソッド -- start --
        
        @Override public boolean awaitTermination(long arg0, TimeUnit arg1) throws InterruptedException {
            return es.awaitTermination(arg0, arg1);
        }
        @Override public void execute(Runnable arg0) {
            es.execute(arg0);
        }
        @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> arg0, long arg1, TimeUnit arg2)
                throws InterruptedException {
            return es.invokeAll(arg0, arg1, arg2);
        }
        @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> arg0) throws InterruptedException {
            return es.invokeAll(arg0);
        }
        @Override public <T> T invokeAny(Collection<? extends Callable<T>> arg0, long arg1, TimeUnit arg2)
                throws InterruptedException, ExecutionException, TimeoutException {
            return es.invokeAny(arg0, arg1, arg2);
        }
        @Override public <T> T invokeAny(Collection<? extends Callable<T>> arg0) throws InterruptedException, ExecutionException {
            return es.invokeAny(arg0);
        }
        @Override public boolean isShutdown() {
            return es.isShutdown();
        }
        @Override public boolean isTerminated() {
            return es.isTerminated();
        }
        @Override public void shutdown() {
            es.shutdown();
        }
        @Override public List<Runnable> shutdownNow() {
            return es.shutdownNow();
        }
        @Override public <T> Future<T> submit(Callable<T> arg0) {
            return es.submit(arg0);
        }
        @Override public <T> Future<T> submit(Runnable arg0, T arg1) {
            return es.submit(arg0, arg1);
        }
        @Override public Future<?> submit(Runnable arg0) {
            return es.submit(arg0);
        }
        
        // -- ExecutorServiceの委譲メソッド -- end --
        
        @Override public void close() throws InterruptedException {
            ExecutorServiceUtil.shutdown(es, sdi.waitTime, sdi.waitTimeUnit, sdi.logger);
        }
        
        public static class ShutdownInfo {
            
            private final int waitTime;
            private final TimeUnit waitTimeUnit;
            private final Logger logger;
            
            public ShutdownInfo(final int waitTime, final TimeUnit waitTimeUnit, final Logger logger) {
                super();
                this.waitTime = waitTime;
                this.waitTimeUnit = waitTimeUnit;
                this.logger = logger;
            }
        }
    }
    
    
    /**
     * DaemonThreadでExecutorServiceを作成するためのFactory
     */
    public static class DaemonThreadFactory implements ThreadFactory {
        
        public Thread newThread(final Runnable r){
            final Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    }
    
    /**
     * ExecutorServiceをシャットダウンするための共通処理
     * 
     * @param es
     * @param waitTime
     * @param waitTimeUnit
     * @param logger
     */
    public static void shutdown(final ExecutorService es, final long waitTime, final TimeUnit waitTimeUnit, final Logger logger) {
        
        if(es == null) return;
        
        logger.info("shutdown ExecutorService.");
        es.shutdown();  // すべて終了してるとき、ココですぐ死ぬ
        
        try {
            if (!es.isTerminated()) {
                // 死んでないときに実行
                logger.info("awaitTermination ExecutorService.");
                
                // 死なないタスクにInterruptedExceptionを発生させて、指定時間待つ。
                final boolean exit = 
                        es.awaitTermination(waitTime, waitTimeUnit);
                
                if(!exit) {
                    // それでも死ななかったとき、強制シャットダウン
                    logger.warn("shutdownNow ExecutorService.");
                    final List<Runnable> notDieList = 
                            es.shutdownNow();   // 実行されてなかったタスクは返され、実行中のタスクは処理続行される
                    
                    logger.warn("shutdownNow not Die size = "+notDieList.size());
                    for (final Runnable notDie : notDieList) {
                        logger.warn("shutdownNow not Die = "+notDie);
                    }
                }
            }
        } catch (final InterruptedException e) {
            logger.error("InterruptedException.", e);
        }
    }
}
関連タグ