ThreadPoolExecutor执行策略

java.util.concurrent.ThreadPoolExecutor构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      RejectedExecutionHandler handler);
参数 描述

corePoolSize

核心线程数,指保留的线程池大小(不超过maximumPoolSize值时,线程池中最多有corePoolSize 个线程工作)

maximumPoolSize

指的是线程池的最大大小(线程池中最大有corePoolSize 个线程可运行)。

keepAliveTime

指的是空闲线程结束的超时时间(当一个线程不工作时,过keepAliveTime时间后将停止该线程).所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

TimeUnit

线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

workQueue

表示存放任务的队列(存放需要被线程池执行的线程队列)

handler

当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。

线程池中任务执行策略

向线程池添加一个任务时会触发execute方法,线程池的主要工作流程如下图

20150406105323 48488
  1. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;

  2. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。

  3. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;

  4. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。

具体分析如下

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //当前线程数量poolSize小于corePoolSize,addIfUnderCorePoolSize函数添加线程处理任务
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        //线程池处于正常运行状态,并且当前线程数量poolSize大于等于corePoolSize,工作队列未满,将任务成功放入到工作队列
        if (runState == RUNNING &amp;&amp; workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        //工作队列已满,如果正在运行的线程数量poolSize大于或等于 maximumPoolSize,那么线程池会抛出异常,否则创建新的线程处理任务
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //当前线程数量poolSize小于corePoolSize,添加线程处理任务
        if (poolSize <corePoolSize &amp;&amp; runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    return t != null;
}
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //如果正在运行的线程数量poolSize小于maximumPoolSize,创建新的线程处理任务
        if (poolSize <maximumPoolSize &amp;&amp; runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    return t != null;
}
//创建新的线程处理任务
private Thread addThread(Runnable firstTask) {
    //使用Worker类的run方法处理任务
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);
    boolean workerStarted = false;
    if (t != null) {
        if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
        w.thread = t;
        workers.add(w);
        int nt = ++poolSize;
        if (nt > largestPoolSize)
            largestPoolSize = nt;
        try {
            t.start();
            workerStarted = true;
        }
        finally {
            if (!workerStarted)
                workers.remove(w);
        }
    }
    return t;
}

2、当一个线程完成任务时,它会从队列中取下一个任务来执行;当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

//使用实现Runnable接口的内部Worker类处理任务
public void run() {
    try {
        hasRun = true;
        Runnable task = firstTask;
        firstTask = null;
        //主run循环处理任务, 获取工作队列中的任务执行,如果getTask获取的任务为空(即工作队列为空的时候),当前线程退出
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}

其中获取任务方法getTask内部实现如下

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            //如果当前运行的线程数poolSize大于corePoolSize,或者运行核心线程超时
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                //获取工作队列中的任务,没有任务的话,在keepAliveTime后超时返回null,当前线程处于可退出状态,线程池最终会收缩到corePoolSize的大小
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
            //获取工作队列中的任务,没有任务的话,一直阻塞
                r = workQueue.take();
            if (r != null)
                return r;
            //当前工作线程是否能够退出
            if (workerCanExit()) {
                //线程池执行退出,中断所有空闲线程
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}
文章目录
  1. 线程池中任务执行策略