从源码学习线程池的使用原理及核心思想解析[亲测有效]

从源码学习线程池的使用原理及核心思想解析[亲测有效]  我们都知道线程的作用,能够异步处理任务,并且能处理多个任务。  但是无限制的使用线程,线程之间的创建、销毁,切换,都会带来一定的消耗!  所以,为了控制线程的数量,复用已有线程,同时减少线程切换带来的开销,,线程池这种池化技术就出来了!!给同学们总结了应付面试的要点:线程池核心设计思想:固定的线程数,来消费我们不定量的task本文是对源码层面对线程池解析,有关线程池的使用,大家可以移步这篇文章:链接:Java并发编程——四种线程池的使用及分析大致给出几种常用线程池介绍:  其实,除了newWo

大家好,欢迎来到IT知识分享网。

文章内容引用自 咕泡科技
咕泡出品,必属精品

1为什么要使用线程池

  我们都知道线程的作用,能够异步处理任务,并且能处理多个任务。
  但是无限制的使用线程,线程之间的创建、销毁,切换,都会带来一定的消耗!
  所以,为了控制线程的数量,复用已有线程,同时减少线程切换带来的开销,,线程池这种池化技术就出来了!!

给同学们总结了应付面试的要点:

  1. 降低资源消耗:重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性:使用线程池可以进行统一的分配、调优和监控。
  4. 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。

线程池核心设计思想:
固定的线程数,来消费我们不定量的task

本文是对源码层面对线程池解析,有关线程池的使用,大家可以移步这篇文章:
链接: Java并发编程——四种线程池的使用及分析

2几种常用线程池介绍

大致给出几种常用线程池介绍:

  • newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。
  • newSingleThreadExecutor: 创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
  • newCachedThreadPool:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动回收
  • newScheduledThreadPool: 创建一个可以指定线程的数量的线程池,但是这个线程池还带有延迟和周期性执行任务的功能,类似定时器。
  • newWorkStealingPool:适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中。

  其实,除了newWorkStealingPool,线程池都是对ThreadPoolExecutor的一层封装,并且,建议大家不要用这些封装的,用底层的ThreadPoolExecutor,这样你就逼着自己去把线程池的一些参数去搞明白!!并且能提供比封装的更多功能!比如监控!
  这是阿里开发手册中的建议哦

  今天我们就去看下ThreadPoolExecutor中怎么去实现固定的线程数,来消费我们不定量的task。

闲话不多说,让我们从初始化进入看源码的正题:

3从初始化开始

我们先看下初始化(构造)5个参数:

public ThreadPoolExecutor(int corePoolSize,//主线程数
						int maximumPoolSize, //最大线程数
						long keepAliveTime, //线程存活时间(除主线程外,其他的线程在没有任务执行的时候需要回收,多久后回收)
						TimeUnit unit, //存活时间的时间单位
						BlockingQueue<Runnable>workQueue//阻塞队列,我们需要执行的task都在该队列) { 
   
	this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
	}

唉?里面的this有7个参数,于是点进this构造方法

    public ThreadPoolExecutor(int corePoolSize,//主线程数
                              int maximumPoolSize,//最大线程数
                              long keepAliveTime,//线程存活时间(除主线程外,其他的线程在没有任务执行的时候需要回收,多久后回收)
                              TimeUnit unit,//存活时间的时间单位
                              BlockingQueue<Runnable> workQueue,//阻塞队列,我们需要执行的task都在该队列
                              ThreadFactory threadFactory,//生成thread的工厂
                              RejectedExecutionHandler handler//拒绝饱和策略,当队列满了并且线程个数达到maximunPoolSize后采取的策略) { 
   
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            //对数值传递不合理及最大线程数小于主线程数的情况做异常处理
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
        //这里能看出来这三个参数不能传null或不传
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ? null :AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

System.getSecurityManager:这是一个安全管理器,当运行未知的Java程序的时候,该程序可能有恶意代码(删除系统文件、重启系统等),为了防止运行恶意代码对系统产生影响,需要对运行的代码的权限进行控制,这时候就要启用Java安全管理器。这里不必深入,往下需要看很深。。。

总结:初始化(构造函数)就是赋了一些初始值

初始化完成之后,就该执行了

4执行任务execute

    public void execute(Runnable command) { 
   
        if (command == null)//如果要执行的任务是空的,异常
            throw new NullPointerException();
        /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
        int c = ctl.get();//这里是啥见下文解释
        //高三位代表线程池的状态,低29位代表线程池中的线程数量 
        //如果当前线程数小于主线程数,添加线程
        if (workerCountOf(c) < corePoolSize) { 
   //workerCountOf见下文
            if (addWorker(command, true))//addWorker才是添加线程的方法
                return;
            c = ctl.get();
        }
        //如果超过主线程数,将任务添加至workqueue 阻塞队列
        if (isRunning(c) && workQueue.offer(command)) { 
   
            int recheck = ctl.get();
            //如果线程池关闭,移除并拒绝
            if (! isRunning(recheck) && remove(command))
                reject(command);
                //否则如果当前线程池线程空,则添加一个线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //阻塞队列已满,添加失败,采用拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

上边代码里的
ctl是个啥?:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

定义于threadPoolExecutor类中的一个原子性的32位二进制int数值
Java用这个二进制数的高三位代表线程池的状态,低29位代表线程池中的线程数量

workerCountOf是个啥?

private static int workerCountOf(int c)  { 
    
	return c & CAPACITY; //CAPACITY 与(&) 上面的ctl
}

作用是:获取ctl的低29位,获取当前线程池中的线程数

那CAPACITY又是个啥:
给出源码中的定义:

    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
	
	//Integer类的一个常量
    @Native public static final int SIZE = 32;

总结:这个方法就是做了一些线程的相关判断

5添加线程addWorker

让我们看看添加线程的方法:

    private boolean addWorker(Runnable firstTask, boolean core) { 
   
        retry:  //这是goto语句 下面我会写一个demo讲解这玩意
        for (;;) { 
   //大自旋检查线程池的状态。阻塞队列是否为空等判断
            int c = ctl.get();
            int rs = runStateOf(c);//线程池运行状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;//线程池被关或者任务为null(无任务可执行)
			
            for (;;) { 
   //小自旋
                int wc = workerCountOf(c);
                //如果现有线程数大于最大值,或者大于等于最大线程数(主线程数)返回false
                //意味着线程都是满的
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                    //cas添加线程 线程+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //如果失败了,继续自旋外层循环判断
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try { 
   
        //开启一个线程,Worker实现了runnable接口
            w = new Worker(firstTask);//这里点进去看worker的run方法
            final Thread t = w.thread;
            if (t != null) { 
   
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try { 
   
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) { 
   
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //添加至wokers
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally { 
   
                    mainLock.unlock();
                }
                //添加成功
                if (workerAdded) { 
   
                    t.start();//启动线程,会调用我们线程的run接口,也就是我们worker的run
                    workerStarted = true;
                }
            }
        } finally { 
   
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

goto语句写法:

	retry:
	for (int i = 0; i < 3; i++) { 
   
		for (int j = 3; j < 10; j++) { 
   
			//if (j == 4) { 
   
			// break retry; 
			//跳出外面循环 // 
		}
		if(j == 7){ 
   
		 	continue retry;
		  	//继续外面循环 
		}
		  System.out.println(i+":"+j); 
		} 
	}

总结:
方法开始就是一个两层的嵌套自旋,大自旋判断线程池状态,状态正常,不断的小自旋判断能不能加
能加,就开启一个新的线程,经过判断无异常加入到线程池,start开启

开启这个新的线程会执行这个线程的run方法,上面有写在哪里点进去,点进去后发现:

        public void run() { 
   
            runWorker(this);
        }

继续点进去:

6运行新的线程runWorker

    final void runWorker(Worker w) { 
   
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try { 
   
        	//只要一直能获取到task,就一直会执行,不会关闭,所以线程也不会销毁,线程销毁只有当task为null
            while (task != null || (task = getTask()) != null) { 
   
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try { 
   
                	//调用线程执行之前方法,这里可以重写,放一些我们自己的业务前置逻辑
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try { 
   
                    	//调用task的run方法,这里就是去执行我们的业务逻辑了
                        task.run();
                    } catch (RuntimeException x) { 
   
                        thrown = x; throw x;
                    } catch (Error x) { 
   
                        thrown = x; throw x;
                    } catch (Throwable x) { 
   
                        thrown = x; throw new Error(x);
                    } finally { 
   
                    	//调用线程执行之后方法,这里可以重写,放一些我们自己的业务后置逻辑
                        afterExecute(task, thrown);
                    }
                } finally { 
   
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally { 
   
            processWorkerExit(w, completedAbruptly);
        }
    }

总结:通过自旋判断任务是否为空,不为空就去执行,为空就去取任务

7线程回收复用的关键:getTask():

看一下取任务的逻辑:
getTask():

    private Runnable getTask() { 
   
        boolean timedOut = false; // Did the last poll() time out?
		//自旋获取
        for (;;) { 
   
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池状态

            // Check if queue empty only if necessary.必要时检 查空,状态是否停止或者shutdown
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 
   
                decrementWorkerCount();
                return null;//线程池状态异常或无任务返回null
            }
			//获取线程数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //线程数大于主线程数时,或者allowCoreThreadTimeOut参数为 true, allowCoreThreadTimeOut默认为false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
			//超过最大线程,或者timed为true && (wc大于1个,并且任务队列为空)的时候
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) { 
   
                //线程数-1,并且返回null,该线程结束
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try { 
   
            //线程复用的关键就在这里了↓↓↓↓↓↓↓↓↓↓↓↓
            	//如果timed是true,超过时间不阻塞,不然一直阻塞,不回收
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //移除并返回队列头部的元素,如果为空,超过时间返回null
                    workQueue.take();//移除并返回队列头部的元素,如果为空,一直阻塞
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) { 
   
                timedOut = false;
            }
        }
    }

8线程的回收

这里有一个面试问题:
线程池里的线程被销毁有什么条件?或者说线程池的线程什么时候被销毁?
上面的代码不是写了嘛,runWorker里的自旋判断,当gettask()为null也就是当无任务时,线程会减少(销毁)
那核心线程呢?核心线程什么时候销毁?
线程池被关闭,或者调用allowCoreThreadTimeOut方法,等待一个时间单位后会销毁(创建时传的变量)

        threadPoolExecutor.allowCoreThreadTimeOut(true);

9线程的复用

另外线程是怎么实现复用的?
一直不关闭,阻塞,等任务

好,至此为止,创建一个线程池然后添加线程执行任务的代码逻辑就结束了,不知道同学们学习到了多少呢?
如果你的公司要自己写一个线程池,你能不能参照这个写出来呢?
极其简化的图
![在这里插入图片描述](https://img-blog.csdnimg.cn/0af816025ba346dfa22611f642356a81.png

10超过核心线程数小于最大线程数的那一撮所谓的临时线程

如果你只是背了八股文,各种所谓的面试秘籍会告诉你线程池有核心线程有临时线程,并发高时会创建临时线程帮忙,并发低时销毁这些临时线程,面试官问你?哪些是临时线程?你要是按八股文这样回答那就说明你没用过(起码没点进去看过代码),面试铁定是挂了。

其实没有什么临时线程,所谓的核心线程数是要保留几个线程

假如我们设置了核心数为3,最大数为10.

并发量刚经历一波高峰期,线程数量为我们的最大线程数10,然后这段时间并发很低,那超过核心线程数的线程会被回收,回收的是哪7个?

谁先执行完,谁先被回收被回收的就是所谓的临时的,最后剩下的3个就是核心的线程
核心线程只是几个一直被阻塞等待任务的线程而已
可能上一波并发高峰它还不是核心线程,但是它跑得慢,于是被留下来当核心了,下一波并发高峰,它先跑了,于是核心线程又换成了另外一批。

11拒绝策略

那线程已经达到最大线程数,如果满了,而且阻塞队列也满了,你的任务经过消峰限流等中间件的阻拦,还在源源不断的进来,已经处理不过来了,怎么办?
文章最开头,线程池创建时有个拒绝策略
阻塞队列已经满了,但是你还是源源不断地进来,会触发拒绝策略,报错
execute方法的末尾,调用了拒绝策略:

        //阻塞队列已满,添加失败,采用拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    final void reject(Runnable command) { 
   
        handler.rejectedExecution(command, this);
    }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
   
            throw new RejectedExecutionException("Task " + r.toString() + //就是这个错误
                                                 " rejected from " +
                                                 e.toString());
        }
    }

栗子:核心3最大5队列5,我用100个任务同时执行:
在这里插入图片描述
但是一般我们会重写拒绝策略,创建线程池时当成变量传入,毕竟并发一高就报错这谁顶得住啊!!!

只要实现一下RejectedExecutionHandler这个接口就可以了

public class ExecJavaTemplate implements RejectedExecutionHandler { 
    
	@Override 
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
   
		//在这里根据自己的业务写一个暂存的逻辑,然后塞回去即可
		//比较懒的话暂存策略也不用写,不断地塞回去就行,慢就慢点,别让它报错即可
		System.out.println("进入拒绝策略"); 
		executor.execute(r); //再次调用execute
	} 
}

在这里插入图片描述
那有人说了阻塞队列我设置成无限大,不就不会有上面的问题了。
如果阻塞队列是无限的,会发生什么?
首先我想到的就是OOM了,你的队列无限大,内存够不够?
不够你的内存不久溢出了,项目挂掉然后紧急排查错误,发现你这个开发将队列设置为无限大
好嘛,公司严格的话给你一个线上一级错误警告处理,这找谁说理去

12线程设置多少合理

线程数 = CPU可用核心数/(1 – 阻塞系数),其中阻塞系数的取值在0和1之间。阻塞系数=阻塞时间/(阻塞时间+计算时间)。

  1. 线程的 CPU 耗时所占比例越高,就需要越少的线程
  2. 线程的 IO 耗时所占比例越高,就需要越多的线程
  3. 针对不同的程序,进行对应的实际测试就可以得到最合适的选择
  4. 线程数 >= CPU 核心数

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/16351.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信