本文共 7978 字,大约阅读时间需要 26 分钟。
new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start();
Thread的弊端如下:
我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
在Java中可以通过线程池来达到这样的效果。
什么时候使用线程池?
使用线程池的好处:
Java通过Executors提供四种线程池,分别为:
其实看这几种方式创建的源码就会发现:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
实际上还是利用 ThreadPoolExecutor
类实现的。
所以我们重点来看下 ThreadPoolExecutor
是怎么玩的。
首先是创建线程的 api:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, RejectedExecutionHandler handler)
这几个核心参数的作用:
corePoolSize
为线程池的基本大小。maximumPoolSize
为线程池最大线程大小。keepAliveTime
和 unit
则是线程空闲后的存活时间。workQueue
用于存放任务的阻塞队列。handler
当队列和最大线程池都满了之后的饱和策略。了解了这几个参数再来看看实际的运用。
通常我们都是使用:
threadPool.execute(new Job());
这样的方式来提交一个任务到线程池中,所以核心的逻辑就是 execute()
函数了。
在具体分析之前先了解下线程池中所定义的状态,这些状态都和线程的执行密切相关:
ThreadPoolExecutor继承自AbstractExecutorService,也是实现了ExecutorService接口。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
ctl
是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
下面再介绍下线程池的运行状态. 线程池一共有五种状态, 分别是:
RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
进入TERMINATED的条件如下:
然后看看 execute()
方法是如何处理的:
《聊聊并发》的一张图来描述这个流程:
流程聊完了再来看看上文提到了几个核心参数应该如何配置呢?
有一点是肯定的,线程池肯定是不是越大越好。
通常我们是需要根据这批任务执行的性质来确定的。
当然这些都是经验值,最好的方式还是根据实际情况测试得出最佳配置。
有运行任务自然也有关闭任务,从上文提到的 5 个状态就能看出如何来关闭线程池。
其实无非就是两个方法 shutdown()/shutdownNow()
。
但他们有着重要的区别:
shutdown()
执行后停止接受新任务,会把队列的任务执行完毕。shutdownNow()
也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。两个方法都会中断线程,用户可自行判断是否需要响应中断。
shutdownNow()
要更简单粗暴,可以根据实际场景选择不同的方法。
我通常是按照以下方式关闭线程池的:
long start = System.currentTimeMillis(); for (int i = 0; i <= 5; i++) { pool.execute(new Job()); } pool.shutdown(); while (!pool.awaitTermination(1, TimeUnit.SECONDS)) { LOGGER.info("线程还在执行。。。"); } long end = System.currentTimeMillis(); LOGGER.info("一共处理了【{}】", (end - start));
pool.awaitTermination(1, TimeUnit.SECONDS)
会每隔一秒钟检查一次是否执行完毕(状态为 TERMINATED
),当从 while 循环退出时就表明线程池已经完全终止了。
在阿里巴巴Java开发手册中明确指出,不允许使用Executors创建线程池。
public class ExecutorsDemo { private static ExecutorService executor = Executors.newFixedThreadPool(15); public static void main(String[] args) { for (int i = 0; i < Integer.MAX_VALUE; i++) { executor.execute(new SubThread()); } }}class SubThread implements Runnable { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException e) { //do nothing } }}
通过指定JVM参数:-Xmx8m -Xms8m
运行以上代码,会抛出OOM:
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371) at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
以上代码指出,ExecutorsDemo.java
的第16行,就是代码中的executor.execute(new SubThread());
通过上面的例子,我们知道了Executors
创建的线程池存在OOM的风险,那么到底是什么原因导致的呢?我们需要深入Executors
的源码来分析一下。
其实,在上面的报错信息中,我们是可以看出蛛丝马迹的,在以上的代码中其实已经说了,真正的导致OOM的其实是LinkedBlockingQueue.offer
方法。
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371) at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
如果翻看代码的话,也可以发现,其实底层确实是通过LinkedBlockingQueue
实现的:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
如果对Java中的阻塞队列有所了解的话,看到这里或许就能够明白原因了。
Java中的BlockingQueue
主要有两种实现,分别是ArrayBlockingQueue
和 LinkedBlockingQueue
。
ArrayBlockingQueue
是一个用数组实现的有界阻塞队列,必须设置容量。
LinkedBlockingQueue
是一个用链表实现的有界阻塞队列,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE
。
这里的问题就出在:**不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。**也就是说,如果我们不设置LinkedBlockingQueue
的容量的话,其默认容量将会是Integer.MAX_VALUE
。
而newFixedThreadPool
中创建LinkedBlockingQueue
时,并未指定容量。此时,LinkedBlockingQueue
就是一个无边界队列,对于一个无边界队列来说,是可以不断的向队列中加入任务的,这种情况下就有可能因为任务过多而导致内存溢出问题。
上面提到的问题主要体现在newFixedThreadPool
和newSingleThreadExecutor
两个工厂方法上,并不是说newCachedThreadPool
和newScheduledThreadPool
这两个方法就安全了,这两种方式创建的最大线程数可能是Integer.MAX_VALUE
,而创建这么多线程,必然就有可能导致OOM。
避免使用Executors创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor
的构造函数来自己创建线程池。在创建的同时,给BlockQueue
指定容量就可以了。
private static ExecutorService executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(10));
这种情况下,一旦提交的线程数超过当前可用线程数时,就会抛出java.util.concurrent.RejectedExecutionException
,这是因为当前线程池使用的队列是有边界队列,队列已经满了便无法继续处理新的请求。但是异常(Exception)总比发生错误(Error)要好。
除了自己定义ThreadPoolExecutor
外。还有其他方法。这个时候第一时间就应该想到开源类库,如apache和guava等。
推荐使用guava提供的ThreadFactoryBuilder来创建线程池。
public class ExecutorsDemo { private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); private static ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) { for (int i = 0; i < Integer.MAX_VALUE; i++) { pool.execute(new SubThread()); } }}
通过上述方式创建线程时,不仅可以避免OOM的问题,还可以自定义线程名称,更加方便的出错的时候溯源。
转载地址:http://fnfnb.baihongyu.com/