专栏名称: 爬蜥
目录
相关文章推荐
51好读  ›  专栏  ›  爬蜥

java 中的 Executors 简介与多线程在网站上逐步优化的运用案例

爬蜥  · 掘金  ·  · 2018-09-26 05:58

正文

阅读 18

java 中的 Executors 简介与多线程在网站上逐步优化的运用案例

Executors简介

提供Executor的工厂类

忽略了自定义的ThreadFactory、callable和unconfigurable相关的方法

  • newFixedxxx:在任意时刻,最多有nThreads个线程在处理task;如果所有线程都在运行时来了新的任务,它会被扔入队列;如果有线程在执行期间因某种原因终止了运行,如果需要执行后续任务,新的线程将取代它

       return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
    复制代码
  • newCachedxxx:新任务到来如果线程池中有空闲的线程就复用,否则新建一个线程。如果一个线程超过60秒没有使用,它就会被关闭移除线程池

     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    复制代码
  • newSingleThreadExecutor:仅使用一个线程来处理任务,如果这线程挂了,会产生一个新的线程来代替它。每一个任务被保证按照顺序执行,而且一次只执行一个

      public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    复制代码

    使用newFixedxxx方法也能实现类似的作用,但是ThreadPoolExecutor会提供修改线程数的方法,FinalizableDelegatedExecutorService则没有修改的途径,它在DelegatedExecutorService的基础 上仅提供了执行finalize时候去关闭线程,而DelegatedExecutorService仅暴漏ExecutorService自身的方法

  • newScheduledThreadPool:提供一个线程池来延迟或者定期执行任务

      public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                  new DelayedWorkQueue());
        }
    复制代码
  • newSingleThreadScheduledExecutor:提供单个线程来延迟或者定期执行任务,如果执行的线程挂了,会生成新的。

      return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
    复制代码

    同样,它保证返回的Executor自身的线程数不可修改

从上述的实现可以看出,核心在于三个部分

  • ThreadPoolExecutor:提供线程数相关的控制
  • DelegatedExecutorService:仅暴露ExecutorService自身的方法,保证线程数不变来实现语义场景
  • ScheduledExecutorService:提供延迟或者定期执行的功能

对应的,相应也有不同的队列去实现不同的场景

  • LinkedBlockingQueue:无界阻塞队列
  • SynchronousQueue:没有消费者消费时,新的任务就会被阻塞
  • DelayQueue:队列中的任务过期之后才可以执行,否则无法查询到队列中的元素

DelegatedExecutorService

它仅仅是包装了ExecutorService的方法,交由传入的ExecutorService来执行,所谓的UnConfigurable实际也就是它没有暴漏配置各种参数调整的方法

  static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }
复制代码

ScheduledExecutorService

提供一系列的schedule方法,使得任务可以延迟或者周期性的执行,对应schedule方法会返回ScheduledFuture以供确认是否执行以及是否要取消。它的实现ScheduledThreadPoolExecutor也支持立即执行由submit提交的任务

仅支持相对延迟时间,比如距离现在5分钟后执行。类似Timer也可以管理延迟任务和周期任务,但是存在一些缺陷:

  • 所有的定时任务只有一个线程,如果某个任务执行时间长,将影响其它TimerTask的精确性。 ScheduledExecutorService的多线程机制可弥补
  • TimerTask抛出未检查的异常,将终止线程执行,此时会错误的认为任务都取消了。 1:可以使用try-catch-finally对相应执行快处理;2:通过execute执行的方法可以设置UncaughtExceptionHandler来接收未捕获的异常,并作出处理;3:通过submit执行的,将被封装层ExecutionException重新抛出

ThreadPoolExecutor

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if






请到「今天看啥」查看全文