最新消息:XAMPP默认安装之后是很不安全的,我们只需要点击左方菜单的 "安全"选项,按照向导操作即可完成安全设置。

Java线程池_深入剖析线程池实现原理

XAMPP相关 admin 777浏览 0评论

这周我们来聊聊线程池,线程池在我们的开发日常中,是经常用到的,因此我对他们不陌生,马上进入正题。

 线程池概念

什么是线程池呢?

从字面上面可以看出,其实它就是一个容器,类似于我们spring的IOC容器管理对象生命周期,线程池就是管理线程的生命周期,负责线程的创建和销毁。

 线程池的好处

一个工具或者组件的设计,必定是为了解决一个开发痛点或者提高性能方面而设计出来的。在没有线程池之前,我们都是直接使用new Thread()的方式去创建的,每次使用后就会销毁。但是这样子有个弊端,频繁的创建和销毁线程,是对资源的耗费,线程上下文频繁切换对系统的性能存在弊端,同时频繁创建和销毁线程也需要时间,造成系统的效率更低,还有就是无限创建线程的会引发资源耗尽的风险。

所以设计出线程池,就是为了解决上面一系列问题,可以控制创建线程的数量避免系统资源耗尽,和维护线程的创建和销毁,达到线程的复用,提高系统的稳定性和健壮性。

Java中的ThreadPoolExecutor类

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,如果要透彻的了解线程池,必须要了解ThreadPoolExecutor该类,我们从源码层面来剖析:

drb00019

上面我列举了父子类之间的关系,一看ThreadPoolExecutor类实现和继承了    AbstractExecutorService,ExecutorService,Executor。我们来看看ThreadPoolExecutor四个的构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
  .....
 // 这个构造方法,不需要传线程创建工厂,以及拒绝策略,系统已经帮我们设置成默认的
  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

   // 这个构造方法,要我们传入线程创建工厂实例
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
// 这个构造方法要我们传入线程拒绝策略模式
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    // 这个构造方法需要我们传入线程创建工厂和任务拒绝策略模式,这个
    // 这个构造方法是上面三个构造方法统一调用的
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
 
           maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    .....
}

可以看出上面的构造方法有很多参数,下面我们对这些参数进行讲解:

1 corePoolSize

核心池的大小。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

2 maximumPoolSize

线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

3 keepAliveTime

表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

4 unit

参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙

TimeUnit.NANOSECONDS; //纳秒

5 workQueue

一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue; // 数组阻塞队列
LinkedBlockingQueue; // 链表阻塞队列
SynchronousQueue;    // 同步阻塞队列
PriorityBlockingQueue;// 优先级阻塞队列

ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

6 threadFactory

线程工厂,主要用来创建线程;

7 handler

表示当拒绝处理任务时的策略,有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任

接下来,我们看下它们四个类继承之间的源码,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

public abstract class AbstractExecutorService implements ExecutorService {

    // 封装任务,返回默认值
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    // 封装任务,回调方法
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

  // 提交任务
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

   // 提交任务
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }


    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    // 执行任务,timed是否是设置了超时时间,nanos是超时时间
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {

    }

   // 执行一些任务
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {

    }
    // 执行一些任务,并设置超时时间
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
   // 执行所有任务
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {

    }
    // 执行所有任务,并设置超时时间
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    }
}

上面的方法中,主要是封装任务方法,提交任务执行,以及执行任务的方法,我们再来看看ExecutorService类:

 void shutdown();
 List<Runnable> shutdownNow() ;
 boolean isShutdown();
 boolean isTerminated();
 boolean awaitTermination(long timeout, TimeUnit unit)
         throws InterruptedException;  
 <T> Future<T> submit(Runnable task, T result);
 <T> Future<T> submit(Callable<T> task);
  Future<?> submit(Runnable task);
 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
         throws InterruptedException;
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

ExecutorService类主要定义了一些方法,由AbstractExecutorService方法去实现,我们最后来看下顶级父类Executor的源码,它的源码就只有一个execute方法:

 void execute(Runnable command);

Executor类只是定义了一个执行任务的方法,委托于子类去实现如何执行。

通过上的源码,很清晰的知道ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

然后ThreadPoolExecutor继承了类AbstractExecutorService

深入剖析线程池实现原理

首先,我们要先了解线程池整个大体的执行逻辑是怎样的:新建线程池的时候,线程池状态此时是运行状态,此时是没有线程的(前提是没有调用预先生成线程的方法prestartCoreThread()和prestartAllCoreThreads()方法),此时当提交任务的时候,先判断当前工作线程的数目是否小于核心线程数目,如果小于则创建新的线程去执行任务,如果等于核心线程数目的话,此时如果没有空闲的线程,则将任务放在任务队列中去进行排队等候执行,当任务队列已经满的时候,则判断核心线程跟创建新的线程数目的和是否小于设置好的最大线程数目,如果是,则创建新的线程去执行任务,如果大于的话,则走拒绝策略。以上就是线程池接受任务执行的整个逻辑流程。下面我们细分探讨下。

1 线程池执行状态

在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

上面的变量值中:runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

几个static final变量表示runState可能的几个取值。

当创建线程池后,初始时,线程池处于RUNNING状态;

如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

2 任务执行原理

这里将的任务执行原理,是线程池的核心,下面我们一一探讨下,直接跟着源码走。

public void execute(Runnable command) { 
// 判空
    if (command == null) 
        throw new NullPointerException(); 
       int c = ctl.get(); 
    //1.当前池中线程比核心数少,新建一个线程执行任务 
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) 
            return; 
        c = ctl.get(); 
    } 
    //2.核心池已满,但任务队列未满,添加到队列中 
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get(); 
     //任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了 
        if (! isRunning(recheck) && remove(command))  
         //如果线程池处于非运行状态,并且把当前的任务从任务队列中移除成功,则拒绝该任务 
            reject(command);
        //如果之前的线程已被销毁完,新建一个线程 
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false); 
    } 
    //3.核心池已满,队列已满,试着创建一个新线程 
    else if (!addWorker(command, false)) 
     //如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
        reject(command);  
 }

execute方法是执行任务的方法,上面的方法一进来就是判空,判断任务是否是空,为空,则直接抛空指针异常。接着继续判断是判断当前工作的线程数目大小是否小于核心线程数,如果是,则执行addWorker(command, true)方法。如果不是,则继续往下判断if (isRunning(c) && workQueue.offer(command)),isRunning方法是判断当前线程池是否是在运行状态,如果是,则加锁尝试将任务放入队列中,放入队列的方法:

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // 判断队列是否已满,已满直接返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
        // 加锁后再次判断队列数
            if (count.get() < capacity) {
            // 放入队列中
                enqueue(node);
                // 获取新的队列任务数
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

上面放入任务队列的方法逻辑很简单,就是判断当前队列是否满了,如果满了直接返回false,放入队列失败,如果没有满,则进行加锁放入队列,之后再次判断队列是否满了(这里为啥要再次判断,是因为防止并发问题,防止其他线程已经放任务进行了导致没有空位置了就会报错),如果没有满,则将任务放入队列中,下面两个方法notFull.signal()和signalNotEmpty()方法是通知没有满和没有空的消息。

接下来,我们继续往下走,上面判断如果都通过之后,就走下面这个判断if (! isRunning(recheck) && remove(command)),这里第一个判断是判断当前线程池是否在运行状态,如果不是在运行状态,则走后门的将任务移除队列,如果两者判断成功,则走reject(command)拒绝策略方法,下面(workerCountOf(recheck) == 0)这个判断是如果之前的线程已被销毁完,新建一个线程。

如果execute方法的判断走到(!addWorker(command, false))这个判断,代表是核心线程数已满,队列已满,这里就尝试创建新的线程去执行,如果创建新的线程失败,则走拒绝策略模式reject(command);

下面我们看看addWorker方法源码:


private boolean addWorker(Runnable firstTask, boolean core) { 
    retry:  //goto 语句,避免死循环 
    for (;;) { 
        int c = ctl.get(); 
        int rs = runStateOf(c); 

        // Check if queue empty only if necessary. 
如果线程处于非运行状态,并且 rs不等于 SHUTDOWN且 firstTask不等于空且且workQueue为空,直接返回 false (表示不可添加 work状态)

1. 线程池已经 shutdown后,还要添加新的任务,拒绝

2. (第二个判断) SHUTDOWN状态不接受新任务,但仍然会执行已经加入任务队列的任务,
所以当进入 SHUTDOWN状态,而传进来的任务为空,并且任务队列不为空的时候,是允许添加新线程的 , 如果把这个条件取反,就表示不允许添加 worker 
        if (rs >= SHUTDOWN && 
            ! (rs == SHUTDOWN && 
               firstTask == null && 
               ! workQueue.isEmpty())) 
            return false; 

        for (;;) { //自旋 
            int wc = workerCountOf(c);//获得 Worker 工作线程数 
//如果工作线程数大于默认容量大小或者大于核心线程数大小,则直接返回 false 表示不能再添加 worker。
            if (wc >= CAPACITY ||                  wc >= (core ? corePoolSize : maximumPoolSize)) 
                return false;             //通过 cas 来增加工作线程数,如果 cas 失败,则直接重试 
            if (compareAndIncrementWorkerCount(c))
                break retry; 
            c = ctl.get();  // Re-read ctl // 再次获取 ctl
的值

            if (runStateOf(c) != rs) //这里如果不想等,说明线程的状态发生了变化,
继续重试 
                continue retry; 
            // else CAS failed due to workerCount change; retry inner loop 
        } 
    } 
   //上面这段代码主要是对 worker 数量做原子+1 操作,下面的逻辑才是正式构建一个 worker 
    boolean workerStarted = false; //工作线程是否启动的标识 
    boolean workerAdded = false;  //工作线程是否已经添加成功的标识 
    Worker w = null;  
    try { 
        w = new Worker(firstTask); //构建一个 Worker,这个 worker 是什么呢?我们
可以看到构造方法里面传入了一个 Runnable 对象 
        final Thread t = w.thread; //从 worker 对象中取出线程 
        if (t != null) { 
            final ReentrantLock mainLock = this.mainLock; 
            mainLock.lock(); //这里有个重入锁,避免并发问题 
            try { 
                // Recheck while holding lock. 
                // Back out on ThreadFactory failure or if 
                // shut down before lock acquired. 
                int rs = runStateOf(ctl.get()); 
                //只有当前线程池是正在运行状态,[或是 SHUTDOWN 且 firstTask 为空],才能添加到 workers 集合中 
                if (rs < SHUTDOWN || 
                    (rs == SHUTDOWN && firstTask == null)) {  
//任务刚封装到 work 里面,还没 start,你封装的线程就是 alive,几
个意思?肯定是要抛异常出去的 
                    if (t.isAlive()) // precheck that t is startable 
                        throw new IllegalThreadStateException(); 
                    workers.add(w); //将新创建的 Worker 添加到 workers 集合中 
                    int s = workers.size(); 
//如果集合中的工作线程数大于最大线程数,这个最大线程数表示线程池曾经出现过的最大线程数 
                    if (s > largestPoolSize)  
                        largestPoolSize = s; //更新线程池出现过的最大线程数 
                    workerAdded = true;//表示工作线程创建成功了 
                } 
            } finally { 
                mainLock.unlock(); //释放锁 
            } 
            if (workerAdded) {//如果 worker 添加成功 
                t.start();//启动线程 
                workerStarted = true; 
            } 
        } 
    } finally { 
        if (! workerStarted) 
            addWorkerFailed(w); //如果添加失败,就需要做一件事,就是递减实际工作线
程数(还记得我们最开始的时候增加了工作线程数吗) 
    } 
    return workerStarted;//返回结果 
}

分析已经在代码中表明了,大家可以对着看,应该很清楚。

上面的源码只是分析了主要的核心方法的源代码,其他一些方法可自行去探讨。

今天就记录到此,我们下期再会!

你知道的越多,你不知道的越多!

转载请注明:XAMPP中文组官网 » Java线程池_深入剖析线程池实现原理

您必须 登录 才能发表评论!