线程池与生产者/消费者模型

ULT&KLT



JVM在采用的是KLT模式,虽然并发程度高,但是可能会造成资源的浪费。

线程池重用资源

系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互。在这种情形下,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池

线程池的目的就是执行提交给线程池的任务,执行完一个任务后不会退出,而是继续等待或执行新任务。
线程池主要由两个概念组成:一个是任务队列;另一个是工作者线程。工作者线程主体就是一个循环,循环从队列中接受任务并执行,任务队列保存待执行的任务

线程池的优点:
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

线程池的处理流程如下:
1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。

2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。

3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

与数据库连接池类似的是,线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable对象或Callable对象传给线程池,线程池就会启动一个线程来执行它们的run()或call()方法,当run()或call()方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个Runnable对象的run()或call()方法。

Executors实现

从Java 5开始,Java内建支持线程池。Java 5新增了一个Executors工厂类来产生线程池,该工厂类包含如下几个静态工厂方法来创建线程池。

  • newCachedThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中。

  • newFixedThreadPool(int nThreads):创建一个可重用的、具有固定线程数的线程池。

  • newSingleThreadExecutor():创建一个只有单线程的线程池,它相当于调用newFixedThread Pool()方法时传入参数为1

  • newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在指定延迟后执行线程任务。corePoolSize指池中所保存的线程数,即使线程是空闲的也被保存在线程池内。

  • newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。

    上面5个方法中的前3个方法返回一个ExecutorService对象,该对象代表一个线程池,它可以执行Runnable对象或Callable对象所代表的线程;而后2个方法返回一个ScheduledExecutorService线程池,它是ExecutorService的子类,它可以在指定延迟后执行线程任务。

当用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列,调用shutdown()方法后的线程池不再接收新任务,但会将以前所有已提交任务执行完成。当线程池中的所有任务都执行完成后,池中的所有线程都会死亡;另外也可以调用线程池的shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

使用线程池来执行线程任务的步骤如下。
(1)调用Executors类的静态工厂方法创建一个ExecutorService对象,该对象代表一个线程池。
(2)创建Runnable实现类或Callable实现类的实例,作为线程执行任务。
(3)调用ExecutorService对象的submit()方法来提交Runnable实例或Callable实例。
(4)当不想提交任何任务时,调用ExecutorService对象的shutdown()方法来关闭线程池。

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MyTread implements Runnable{
public void run(){
for(int i = 0; i < 100; i++){
System.out.println(Thread.currentThread().getName() + "i的值为:" + i);
}
}
}

public class TreadPoolTest{
public static void main(String[] args){
throws Exception{
ExecutorService pool = Executor.newFixedThreadPool(6);
pool.submit(new MyTread());
pool.submit(new MyTread());
pool.shutdown();
}
}
}

ThreadPoolExecutor


❑ Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
❑ ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
❑ ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
❑ Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
❑ Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。

IDEA TIPS:选中类或接口,右键查看类图,然后选中要看的类或接口右键选中要看的方法,即可出现一个继承图

ThreadPoolExecutor执行execute方法分下面4种情况。
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

创建

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

创建一个线程池时需要输入几个参数,如下。

1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。

2)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。 —— 任务队列满了且总线程池池未满时创建临时线程,超过活跃时间后停止

3)keepAliveTime(线程活动保持时间):当线程池中的线程个数大于corePoolSize时额外空闲线程的存活时间。也就是说,一个非核心线程,在空闲等待新任务时,会有一个最长等待时间,即keepAliveTime,如果到了时间还是没有新任务,就会被终止。如果该值为0,则表示所有线程都不会超时终止。TimeUnit(线程活动保持时间的单位)

4)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:
❑ ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
❑ LinkedBlockingQueue:基于链表的阻塞队列,可以指定最大长度,但默认是无界的。此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
❑ SynchronousQueue:没有实际存储空间的同步阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
❑ PriorityBlockingQueue:基于堆的无界阻塞优先级队列。

5)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。
❑ AbortPolicy:直接抛出异常。
❑ CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行。
❑ DiscardOldestPolicy:将等待时间最长的任务扔掉,然后自己排队。
❑ DiscardPolicy:静默处理,忽略新任务,不抛出异常,也不执行。

拒绝策略只有在队列有界,且maximumPoolSize有限的情况下才会触发。如果队列无界,服务不了的任务总是会排队,但这不一定是期望的结果,因为请求处理队列可能会消耗非常大的内存,甚至引发内存不够的异常。如果队列有界但maxi-mumPoolSize无限,可能会创建过多的线程,占满CPU和内存,使得任何任务都难以完成。

6)线程池还可以接受一个参数:ThreadFactory。它是一个接口,这个接口根据Runnable创建一个Thread, ThreadPoolExecutor的默认实现是Executors类中的静态内部类DefaultThreadFactory,主要就是创建一个线程,给线程设置一个名称,设置daemon属性为false,设置线程优先级为标准默认优先级,线程名称的格式为:pool-<线程池编号>-thread-<线程编号>。如果需要自定义一些线程的属性,比如名称,可以实现自定义的ThreadFactory。

查看状态

1
2
3
4
5
6
7
8
//返回当前线程个数
public int getPoolSize()
//返回线程池曾经达到过的最大线程个数
public int getLargestPoolSize()
//返回线程池自创建以来所有已完成的任务数
public long getCompletedTaskCount()
//返回所有任务数,包括所有已完成的加上所有排队待执行的
public long getTaskCount()


用一个int记录线程池的状态

线程池与生产消费者模式

ThreadPoolExecutor实现了生产者/消费者模式,工作者线程就是消费者,任务提交者就是生产者,线程池自己维护任务队列。当我们碰到类似生产者/消费者问题时,应该优先考虑直接使用线程池,而非“重新发明轮子”,应自己管理和维护消费者线程及任务队列

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ProductThread extends Thread {
private int taskNum;
private ArrayBlockingQueue queue;
public ProductThread(int taskNum,ArrayBlockingQueue queue) {
this.taskNum = taskNum;
this.queue = queue;
}
public void run() {
try {
//模拟生产
Thread.currentThread().sleep(5000);
System.out.println("开始生产");
queue.add(taskNum);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ConsumerThread extends Thread {
private ArrayBlockingQueue queue;
public ConsumerThread(ArrayBlockingQueue queue) {
this.queue = queue;
}

public void run() {
System.out.println("准备消费");
int taskNum;
try {
taskNum = (int) queue.take();
System.out.println("消费了"+taskNum);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

测试主方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ProductAndConsumer {

public static void main(String[] args) {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(20);
//为多生产者和多消费者分别开创的线程池
ThreadPoolExecutor productPool =
new ThreadPoolExecutor(10,20,60,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(5),new ThreadPoolExecutor.CallerRunsPolicy());
ThreadPoolExecutor consumerPool =
new ThreadPoolExecutor(10,20,60,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(5),new ThreadPoolExecutor.CallerRunsPolicy());

System.out.println("start");
for(int i = 0;i<100;i++) {

productPool.execute(new ProductThread(i,queue));
consumerPool.execute(new ConsumerThread(queue));
}
productPool.shutdown();
consumerPool.shutdown();
}
}