大家好,欢迎来到IT知识分享网。
0003-spring 中线程池配置
引用alibaba编码规范中的话
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
1. spring 中自己已经提供了线程池策略 ThreadPoolTaskExecutor
// 部分源码
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object();
private int corePoolSize = 1;
// 最大线程数 Integer.MAX_VALUE;
private int maxPoolSize = Integer.MAX_VALUE;
private int keepAliveSeconds = 60;
private int queueCapacity = Integer.MAX_VALUE;
private boolean allowCoreThreadTimeOut = false;
上面的代码可知,spring默认的线程池最大线程数跟等待队列都是 Integer.MAX_VALUE
直接使用的话也会存在OOM问题。
2. 所以我们在使用时候会自定义我们自己的线程池策略
2.1 通过配置文件yml配置
spring:
task:
execution:
pool:
core-size: 8 # 核心线程数
max-size: 20 # 最大线程数
queue-capacity: 100 # 等待队列
allow-core-thread-timeout: true # 是否允许超时回收
keep-alive: 60s # 线程存活时间
thread-name-prefix: my_task_
使用:
只需要给方法头加上 @Async,这个方法在调用的时候就是个异步方法
@Async
@Override
public void testThread2() {
log.info("testThread2 正在执行,线程{}",Thread.currentThread().getName());
}
2.2 如果上面不能满足我们的需求,我们还可以自己新增线程池
package com.zhoust.threadbatch.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*;
/**
* @author zhoust
*/
@EnableAsync
@Configuration
@Slf4j
public class ThreadPoolConfig {
/**
* 核心线程数(默认线程数)
*/
@Value("${my-task.execution.pool.corePoolSize}")
private int corePoolSize;
/**
* 最大线程数
*/
@Value("${my-task.execution.pool.maxPoolSize}")
private int maxPoolSize;
/**
* 允许线程空闲时间(单位:默认为秒)
*/
@Value("${my-task.execution.pool.keepAliveTime}")
private int keepAliveTime;
/**
* 缓冲队列大小
*/
@Value("${my-task.execution.pool.queueCapacity}")
private int queueCapacity;
/**
* 线程池名前缀
*/
@Value("${my-task.execution.threadNamePrefix}")
private String threadNamePrefix;
/**
* 拒绝策略 1. AbortPolicy 丢弃任务并抛异常,2. DiscardPolicy 丢弃任务不抛异常 3.DiscardOldestPolicy 丢弃队列最前面的线程(弃老)4.CallerRunsPolicy:由调用线程处理该任务
*/
@Value("${my-task.execution.rejectedExecutionHandler}")
private String rejectedExecutionHandler;
/**
* 是否回收核心线程
*/
@Value("${my-task.execution.allowCoreThreadTimeOut}")
private boolean allowCoreThreadTimeOut;
/**
* 执行完毕是否关闭线程池
*/
@Value("${my-task.execution.waitForTasksToCompleteOnShutdown}")
private boolean waitForTasksToCompleteOnShutdown;
@Bean("asyncServiceThread")
public Executor getThreadPool(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(getCorePoolSize(corePoolSize));
// 队列允许等待的线程数
executor.setQueueCapacity(queueCapacity);
// 最大线程数,包含核心线程数
executor.setMaxPoolSize(maxPoolSize);
// 线程回收时间
executor.setKeepAliveSeconds(keepAliveTime);
// 4种拒绝策略 1. AbortPolicy 丢弃任务并抛异常,2. DiscardPolicy 丢弃任务不抛异常 3.DiscardOldestPolicy 丢弃队列最前面的线程(弃老)4.CallerRunsPolicy:由调用线程处理该任务
executor.setRejectedExecutionHandler(getRejectedExecutionHandler(rejectedExecutionHandler));
// 是否允许超时回收线程
executor.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut);
// 线程执行完毕后,关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
// 线程名前缀
executor.setThreadNamePrefix(threadNamePrefix);
// 初始化
executor.initialize();
return executor;
}
private int getCorePoolSize(int corePoolSize){
// 如果是-1 去机器核心数的2倍
if(-1 == corePoolSize){
int processors = Runtime.getRuntime().availableProcessors();
log.info("机器核心数:{},核心线程数:{}",processors,processors * 2);
return processors * 2;
}
log.info("核心线程数:{}",corePoolSize);
return corePoolSize;
}
private RejectedExecutionHandler getRejectedExecutionHandler(String rejectedExecutionHandler){
RejectedExecutionHandler reh = null;
try{
log.info("配置的线程拒绝策略:{}",rejectedExecutionHandler);
reh = (RejectedExecutionHandler) Class.forName("java.util.concurrent.ThreadPoolExecutor$" + rejectedExecutionHandler).newInstance();
}catch (Exception e){
log.warn("获取拒绝策略失败,默认使用 CallerRunsPolicy");
reh = new ThreadPoolExecutor.CallerRunsPolicy();
}
return reh;
}
}
yml:
my-task:
execution:
pool:
corePoolSize: 8 # 核心线程数
maxPoolSize: 20 # 最大线程数
queueCapacity: 100 # 等待队列
allow-core-thread-timeout: true # 是否允许超时回收
keepAliveTime: 60 # 线程存活时间
threadNamePrefix: spring_task_
rejectedExecutionHandler: CallerRunsPolicy # 拒绝策略 1. AbortPolicy 丢弃任务并抛异常,2. DiscardPolicy 丢弃任务不抛异常 3.DiscardOldestPolicy 丢弃队列最前面的线程(弃老)4.CallerRunsPolicy:由调用线程处理该任务
allowCoreThreadTimeOut: true # 是否回收核心线程
waitForTasksToCompleteOnShutdown: true # 执行完毕是否关闭线程池
使用:
指定使用哪个线程池
@Async(value = "asyncServiceThread")
@Override
public void testThread1() {
log.info("testThread1 正在执行,线程{}",Thread.currentThread().getName());
}
调用:
@GetMapping("/testThread")
public String testThread(){
for (int i = 0; i < 100; i++) {
taskServer.testThread1();
taskServer.testThread2();
}
return "succeed";
}
补充:拒绝策略
拒绝策略就是当线程池已经满了,还有新的线程过来,此时我们怎么处理这些线程。
1.AbortPolicy(默认线程策略):丢弃任务并抛出RejectedExecutionException异常
如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。
2.DiscardPolicy:丢弃任务,但是不抛出异常
无关紧要的业务可以这样做。
3.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。(弃老策略)
4.CallerRunsPolicy:由调用线程处理该任务
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/33989.html