多线程-线程池

线程池概括与总结

Posted by Kang on September 11, 2019

基础知识

相关创建类

  Executors、ThreadPoolExecutor、ExecutorService

核心参数

  corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue等待队列、handler拒绝策略

参数的使用

  • 初始阶段:刚创建是,工作线程为0,此时当任务加入时,线程数不断增加,任务被直接执行;
  • 达到核心阶段:若当前线程数为corePoolSize,此时线程队列将起作用,任务先不断的先进入到阻塞队列中,等待有空闲线程去执行;
  • 队列满:当线程队列数量达到最大值时,则触发线程池中工作线程的继续创建,直到达到maxiumSize;
  • 达到maxiumSize:若达到maxiumSize时,仍然有任务进来,则将触发采取拒绝策略。

线程池类别

  newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor、newScheduledThreadPool
  这四种方式均不推荐使用,而直接使用ThreadPoolExecutor创建线程池,这样创建者可以更精确的掌控线程池的参数。
  ForkJoinPool,使用任务窃取算法,即自己工作完成了可以去忙碌的工作线程窃取任务执行。

Queue说明

  • SynchronousQueue:直接提交,当有新的任务到来时,必须能够提交成功否则会抛出异常,所以要求maximumPoolSize无限大,可能存在执行任务量过多。
  • LinkedBlockingQueue:链表队列,这种方式下,执行线程数永远不会超过corePoolSize,可能排队过多。
  • ArrayBlockingQueue:数组队列,有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷。若设置不合理,可能造成资源浪费或者大量任务被丢弃。

四种拒绝策略

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

线程池原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void execute(Runnable command) {
     if (command == null)
         throw new NullPointerException();
     //如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务
     int c = ctl.get();
     if (workerCountOf(c) < corePoolSize) {
         if (addWorker(command, true))
             return;
         c = ctl.get();
     }
     //如果线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中(offer添加失败时返回false)
     if (isRunning(c) && workQueue.offer(command)) {
         int recheck = ctl.get();
         if (!isRunning(recheck) && remove(command))
             reject(command);
         else if (workerCountOf(recheck) == 0)
             addWorker(null, false);
     }
     else if (!addWorker(command, false))
         reject(command);
 }

  我们看看addWorker()方法:其为将任务包装成一个工作节点,作为线程池中的一个基本工作单位。最主要的是理解其构造方法:

1
2
3
4
5
6
7
8
9
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

public void run() {
    runWorker(this);
}

  线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。
所以本质上最后启动的是Worker的runWorker方法,下面为简化后核心处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
       // task当前创建时传入了值,或者从等待队列中能取到值
        while (task != null || (task = getTask()) != null) {
            w.lock();
            ......
            try {
              beforeExecute(wt, task);
              task.run();
              afterExecute(task, thrown);
              ...
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

  可以看出,整个流程也是比较清晰的,在while中不停循环处理当前传入值或者从getTask()中获取任务去执行。最后看看getTask():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Runnable getTask() {
    // 紫旋+队列取值方式的巧妙运用
    for (;;) {
        ...
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

  整个getTask操作是一个自旋,其主要完成:

  • 1、workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
  • 2、workQueue.poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;