大家好,欢迎来到IT知识分享网。
文章内容引用自 咕泡科技
咕泡出品,必属精品
文章目录
1为什么要使用线程池
我们都知道线程的作用,能够异步处理任务,并且能处理多个任务。
但是无限制的使用线程,线程之间的创建、销毁,切换,都会带来一定的消耗!
所以,为了控制线程的数量,复用已有线程,同时减少线程切换带来的开销,,线程池这种池化技术就出来了!!
给同学们总结了应付面试的要点:
- 降低资源消耗:重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。
线程池核心设计思想:
固定的线程数,来消费我们不定量的task
本文是对源码层面对线程池解析,有关线程池的使用,大家可以移步这篇文章:
链接: Java并发编程——四种线程池的使用及分析
2几种常用线程池介绍
大致给出几种常用线程池介绍:
newFixedThreadPool
:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。newSingleThreadExecutor
: 创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。newCachedThreadPool
:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动回收newScheduledThreadPool
: 创建一个可以指定线程的数量的线程池,但是这个线程池还带有延迟和周期性执行任务的功能,类似定时器。newWorkStealingPool
:适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中。
其实,除了newWorkStealingPool,线程池都是对ThreadPoolExecutor
的一层封装,并且,建议大家不要用这些封装的,用底层的ThreadPoolExecutor
,这样你就逼着自己去把线程池的一些参数去搞明白!!并且能提供比封装的更多功能!比如监控!
这是阿里开发手册中的建议哦
今天我们就去看下ThreadPoolExecutor
中怎么去实现固定的线程数,来消费我们不定量的task。
闲话不多说,让我们从初始化进入看源码的正题:
3从初始化开始
我们先看下初始化(构造)5个参数:
public ThreadPoolExecutor(int corePoolSize,//主线程数
int maximumPoolSize, //最大线程数
long keepAliveTime, //线程存活时间(除主线程外,其他的线程在没有任务执行的时候需要回收,多久后回收)
TimeUnit unit, //存活时间的时间单位
BlockingQueue<Runnable>workQueue//阻塞队列,我们需要执行的task都在该队列) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
唉?里面的this有7个参数,于是点进this构造方法
public ThreadPoolExecutor(int corePoolSize,//主线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//线程存活时间(除主线程外,其他的线程在没有任务执行的时候需要回收,多久后回收)
TimeUnit unit,//存活时间的时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列,我们需要执行的task都在该队列
ThreadFactory threadFactory,//生成thread的工厂
RejectedExecutionHandler handler//拒绝饱和策略,当队列满了并且线程个数达到maximunPoolSize后采取的策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
//对数值传递不合理及最大线程数小于主线程数的情况做异常处理
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
//这里能看出来这三个参数不能传null或不传
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null :AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
System.getSecurityManager
:这是一个安全管理器,当运行未知的Java程序的时候,该程序可能有恶意代码(删除系统文件、重启系统等),为了防止运行恶意代码对系统产生影响,需要对运行的代码的权限进行控制,这时候就要启用Java安全管理器。这里不必深入,往下需要看很深。。。
总结:初始化(构造函数)就是赋了一些初始值
初始化完成之后,就该执行了
4执行任务execute
public void execute(Runnable command) {
if (command == null)//如果要执行的任务是空的,异常
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
int c = ctl.get();//这里是啥见下文解释
//高三位代表线程池的状态,低29位代表线程池中的线程数量
//如果当前线程数小于主线程数,添加线程
if (workerCountOf(c) < corePoolSize) {
//workerCountOf见下文
if (addWorker(command, true))//addWorker才是添加线程的方法
return;
c = ctl.get();
}
//如果超过主线程数,将任务添加至workqueue 阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果线程池关闭,移除并拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
//否则如果当前线程池线程空,则添加一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//阻塞队列已满,添加失败,采用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
上边代码里的
ctl是个啥?:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
定义于threadPoolExecutor
类中的一个原子性的32位二进制int数值
Java用这个二进制数的高三位代表线程池的状态,低29位代表线程池中的线程数量
workerCountOf是个啥?
private static int workerCountOf(int c) {
return c & CAPACITY; //CAPACITY 与(&) 上面的ctl
}
作用是:获取ctl的低29位,获取当前线程池中的线程数
那CAPACITY又是个啥:
给出源码中的定义:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//Integer类的一个常量
@Native public static final int SIZE = 32;
总结:这个方法就是做了一些线程的相关判断
5添加线程addWorker
让我们看看添加线程的方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //这是goto语句 下面我会写一个demo讲解这玩意
for (;;) {
//大自旋检查线程池的状态。阻塞队列是否为空等判断
int c = ctl.get();
int rs = runStateOf(c);//线程池运行状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;//线程池被关或者任务为null(无任务可执行)
for (;;) {
//小自旋
int wc = workerCountOf(c);
//如果现有线程数大于最大值,或者大于等于最大线程数(主线程数)返回false
//意味着线程都是满的
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas添加线程 线程+1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果失败了,继续自旋外层循环判断
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//开启一个线程,Worker实现了runnable接口
w = new Worker(firstTask);//这里点进去看worker的run方法
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加至wokers
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加成功
if (workerAdded) {
t.start();//启动线程,会调用我们线程的run接口,也就是我们worker的run
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
goto语句写法:
retry:
for (int i = 0; i < 3; i++) {
for (int j = 3; j < 10; j++) {
//if (j == 4) {
// break retry;
//跳出外面循环 //
}
if(j == 7){
continue retry;
//继续外面循环
}
System.out.println(i+":"+j);
}
}
总结:
方法开始就是一个两层的嵌套自旋,大自旋判断线程池状态,状态正常,不断的小自旋判断能不能加
能加,就开启一个新的线程,经过判断无异常加入到线程池,start开启
开启这个新的线程会执行这个线程的run方法,上面有写在哪里点进去,点进去后发现:
public void run() {
runWorker(this);
}
继续点进去:
6运行新的线程runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//只要一直能获取到task,就一直会执行,不会关闭,所以线程也不会销毁,线程销毁只有当task为null
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//调用线程执行之前方法,这里可以重写,放一些我们自己的业务前置逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
//调用task的run方法,这里就是去执行我们的业务逻辑了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//调用线程执行之后方法,这里可以重写,放一些我们自己的业务后置逻辑
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
总结:通过自旋判断任务是否为空,不为空就去执行,为空就去取任务
7线程回收复用的关键:getTask():
看一下取任务的逻辑:
getTask():
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//自旋获取
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//获取线程池状态
// Check if queue empty only if necessary.必要时检 查空,状态是否停止或者shutdown
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;//线程池状态异常或无任务返回null
}
//获取线程数量
int wc = workerCountOf(c);
// Are workers subject to culling?
//线程数大于主线程数时,或者allowCoreThreadTimeOut参数为 true, allowCoreThreadTimeOut默认为false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//超过最大线程,或者timed为true && (wc大于1个,并且任务队列为空)的时候
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//线程数-1,并且返回null,该线程结束
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//线程复用的关键就在这里了↓↓↓↓↓↓↓↓↓↓↓↓
//如果timed是true,超过时间不阻塞,不然一直阻塞,不回收
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //移除并返回队列头部的元素,如果为空,超过时间返回null
workQueue.take();//移除并返回队列头部的元素,如果为空,一直阻塞
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
8线程的回收
这里有一个面试问题:
线程池里的线程被销毁有什么条件?或者说线程池的线程什么时候被销毁?
上面的代码不是写了嘛,runWorker里的自旋判断,当gettask()为null也就是当无任务时,线程会减少(销毁)
那核心线程呢?核心线程什么时候销毁?
线程池被关闭,或者调用allowCoreThreadTimeOut
方法,等待一个时间单位后会销毁(创建时传的变量)
threadPoolExecutor.allowCoreThreadTimeOut(true);
9线程的复用
另外线程是怎么实现复用的?
一直不关闭,阻塞,等任务
好,至此为止,创建一个线程池然后添加线程执行任务的代码逻辑就结束了,不知道同学们学习到了多少呢?
如果你的公司要自己写一个线程池,你能不能参照这个写出来呢?
极其简化的图
10超过核心线程数小于最大线程数的那一撮所谓的临时线程
如果你只是背了八股文,各种所谓的面试秘籍会告诉你线程池有核心线程有临时线程,并发高时会创建临时线程帮忙,并发低时销毁这些临时线程,面试官问你?哪些是临时线程?你要是按八股文这样回答那就说明你没用过(起码没点进去看过代码),面试铁定是挂了。
其实没有什么临时线程,所谓的核心线程数是要保留几个线程
假如我们设置了核心数为3,最大数为10.
并发量刚经历一波高峰期,线程数量为我们的最大线程数10,然后这段时间并发很低,那超过核心线程数的线程会被回收,回收的是哪7个?
谁先执行完,谁先被回收被回收的就是所谓的临时的,最后剩下的3个就是核心的线程
核心线程只是几个一直被阻塞等待任务的线程而已
可能上一波并发高峰它还不是核心线程,但是它跑得慢,于是被留下来当核心了,下一波并发高峰,它先跑了,于是核心线程又换成了另外一批。
11拒绝策略
那线程已经达到最大线程数,如果满了,而且阻塞队列也满了,你的任务经过消峰限流等中间件的阻拦,还在源源不断的进来,已经处理不过来了,怎么办?
文章最开头,线程池创建时有个拒绝策略
阻塞队列已经满了,但是你还是源源不断地进来,会触发拒绝策略,报错
execute方法的末尾,调用了拒绝策略:
//阻塞队列已满,添加失败,采用拒绝策略
else if (!addWorker(command, false))
reject(command);
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + //就是这个错误
" rejected from " +
e.toString());
}
}
栗子:核心3最大5队列5,我用100个任务同时执行:
但是一般我们会重写拒绝策略,创建线程池时当成变量传入,毕竟并发一高就报错这谁顶得住啊!!!
只要实现一下RejectedExecutionHandler这个接口就可以了
public class ExecJavaTemplate implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//在这里根据自己的业务写一个暂存的逻辑,然后塞回去即可
//比较懒的话暂存策略也不用写,不断地塞回去就行,慢就慢点,别让它报错即可
System.out.println("进入拒绝策略");
executor.execute(r); //再次调用execute
}
}
那有人说了阻塞队列我设置成无限大,不就不会有上面的问题了。
如果阻塞队列是无限的,会发生什么?
首先我想到的就是OOM了,你的队列无限大,内存够不够?
不够你的内存不久溢出了,项目挂掉然后紧急排查错误,发现你这个开发将队列设置为无限大
好嘛,公司严格的话给你一个线上一级错误警告处理,这找谁说理去
12线程设置多少合理
线程数 = CPU可用核心数/(1 – 阻塞系数),其中阻塞系数的取值在0和1之间。阻塞系数=阻塞时间/(阻塞时间+计算时间)。
- 线程的 CPU 耗时所占比例越高,就需要越少的线程
- 线程的 IO 耗时所占比例越高,就需要越多的线程
- 针对不同的程序,进行对应的实际测试就可以得到最合适的选择
- 线程数 >= CPU 核心数
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/16351.html