简述线程池

基本概念

  线程池顾名思义线程的池子。
  工作者线程和任务队列2个概念组成。
  工作者线程主要线程就是一个循环,循环从对列中接受任务执行,任务队列则是保存待执行的任务。
  线程池其优点是可以重用线程,避免创建大量的线程,也避免了创建线程的开销。

线程池的基本属性

  java中线程池的实现类是ThreadPoolExecutor.
  线程池的大小和以下四个参数相关。
  1.corePoolSize:核心线程数。
  2.maximumPoolSize:最大线程个数。
  3.keepAliveTime和unit:空闲线程存活时间。
  逻辑是这样的:创建一个线程池后,里面没有一个线程,当新任务到来时,如果当前线程个数小于corePoolSize,无论是否有空闲线程,都创建线程,直到=corePoolSize,而大于corePoolSize后就会进入队列里面排队,如果队列满了,那么就会创建新的线程直到数量达到maximumPoolSize。而keepAliveTime则是当当前线程数目大于corePoolSize时,空闲线程时间达到了这个值,那么这个线程就会被终结掉。
  任务拒绝策略:就是上述中maximumPoolSize的任务队列都满了。新任务来了,如何处理?
  默认会抛出异常,类型是RejectedExecutionException。
  ThreadPoolExecutor.AbortPolicy:这就是默认的方式,抛出异常
  ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛异常,也不执行
  ThreadPoolExecutor.DiscardOldestPolicy:将等待时间最长的任务扔掉,然后自己排队
  ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行
  预配置好线程池。
  工厂类Executor提供了一些预制好的线程池


public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
threadFactory));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
threadFactory);
}

  等还有些。这里只是说出几个,看看其实现就知道他的意思了,


package threadPool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));

        for (int i = 0; i < 15; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size()
                    + ",已执行玩别的任务数目:" + executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }
    }
class MyTask implements Runnable {

    private int taskNum;

    public MyTask(int num) {
        this.taskNum = num;
    }

    @Override
    public void run() {
        System.out.println("正在执行task " + taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task " + taskNum + "执行完毕");
    }
}


  结果如下:

正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 13
正在执行task 11
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 14
task 0执行完毕
正在执行task 5
task 4执行完毕
task 3执行完毕
正在执行task 7
task 2执行完毕
task 1执行完毕
正在执行task 8
正在执行task 6
task 13执行完毕
task 14执行完毕
task 12执行完毕
task 11执行完毕
task 10执行完毕
正在执行task 9
task 5执行完毕
task 7执行完毕
task 8执行完毕
task 6执行完毕
task 9执行完毕

  网上直接找了个例子跑了一下,只要理解了上面的原理,这些就很好理解了。

附上一个简单线程池实现代码:

实现思路很简单

创建多个不断循环的线程,不断监听是否有任务,没有则进入wait状态。有任务塞入队列时则唤醒之。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package tutorials;

import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPool {
private final int nThreads;
private final PoolWorker[] threads;
private final LinkedBlockingQueue queue;

public ThreadPool(int nThreads) {
this.nThreads = nThreads;
queue = new LinkedBlockingQueue();
threads = new PoolWorker[nThreads];

for (int i = 0; i < nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}

public void execute(Runnable task) {
synchronized (queue) {
queue.add(task); // 向队列中添加任务
queue.notify(); // 唤醒一个线程
}
}

private class PoolWorker extends Thread {
public void run() {
Runnable task;

while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
try {
queue.wait(); // 线程没有任务,进入睡眠
} catch (InterruptedException e) {
System.out.println("An error occurred while queue is waiting: " + e.getMessage());
}
}
// 线程被唤醒之后,会顺利执行到这里
task = queue.poll(); // 获取任务
}

// If we don't catch RuntimeException,
// the pool could leak threads
try {
task.run(); // 执行任务
} catch (RuntimeException e) {
System.out.println("Thread pool is interrupted due to an issue: " + e.getMessage());
}
}
}
}
}