0003-spring 中线程池配置

0003-spring 中线程池配置0003-spring中线程池配置引用alibaba编码规范中的话【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors返回的线程池对象的

大家好,欢迎来到IT知识分享网。0003-spring

0003-spring 中线程池配置

引用alibaba编码规范中的话

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样

的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

1)FixedThreadPool 和 SingleThreadPool:

允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

2)CachedThreadPool 和 ScheduledThreadPool:

允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

1. spring 中自己已经提供了线程池策略 ThreadPoolTaskExecutor

// 部分源码
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
		implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

	private final Object poolSizeMonitor = new Object();

	private int corePoolSize = 1;

    // 最大线程数 Integer.MAX_VALUE;
	private int maxPoolSize = Integer.MAX_VALUE;

	private int keepAliveSeconds = 60;

	private int queueCapacity = Integer.MAX_VALUE;

	private boolean allowCoreThreadTimeOut = false;

上面的代码可知,spring默认的线程池最大线程数跟等待队列都是 Integer.MAX_VALUE直接使用的话也会存在OOM问题。

2. 所以我们在使用时候会自定义我们自己的线程池策略

2.1 通过配置文件yml配置

spring:
  task:
    execution:
      pool:
        core-size: 8 # 核心线程数
        max-size: 20 # 最大线程数
        queue-capacity: 100 # 等待队列
        allow-core-thread-timeout: true # 是否允许超时回收
        keep-alive: 60s # 线程存活时间
      thread-name-prefix: my_task_

使用:

​ 只需要给方法头加上 @Async,这个方法在调用的时候就是个异步方法

    @Async
    @Override
    public void testThread2() {
        log.info("testThread2 正在执行,线程{}",Thread.currentThread().getName());
    }

2.2 如果上面不能满足我们的需求,我们还可以自己新增线程池

package com.zhoust.threadbatch.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

/**
 * @author zhoust
 */
@EnableAsync
@Configuration
@Slf4j
public class ThreadPoolConfig {

    /**
     * 核心线程数(默认线程数)
     */
    @Value("${my-task.execution.pool.corePoolSize}")
    private int corePoolSize;
    /**
     * 最大线程数
     */
    @Value("${my-task.execution.pool.maxPoolSize}")
    private int maxPoolSize;
    /**
     * 允许线程空闲时间(单位:默认为秒)
     */
    @Value("${my-task.execution.pool.keepAliveTime}")
    private int keepAliveTime;
    /**
     * 缓冲队列大小
     */
    @Value("${my-task.execution.pool.queueCapacity}")
    private int queueCapacity;
    /**
     * 线程池名前缀
     */
    @Value("${my-task.execution.threadNamePrefix}")
    private String threadNamePrefix;
    /**
     * 拒绝策略 1. AbortPolicy 丢弃任务并抛异常,2. DiscardPolicy 丢弃任务不抛异常 3.DiscardOldestPolicy 丢弃队列最前面的线程(弃老)4.CallerRunsPolicy:由调用线程处理该任务
     */
    @Value("${my-task.execution.rejectedExecutionHandler}")
    private String rejectedExecutionHandler;
    /**
     * 是否回收核心线程
     */
    @Value("${my-task.execution.allowCoreThreadTimeOut}")
    private boolean allowCoreThreadTimeOut;
    /**
     * 执行完毕是否关闭线程池
     */
    @Value("${my-task.execution.waitForTasksToCompleteOnShutdown}")
    private boolean waitForTasksToCompleteOnShutdown;

    
    @Bean("asyncServiceThread")
    public Executor getThreadPool(){

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(getCorePoolSize(corePoolSize));
        // 队列允许等待的线程数
        executor.setQueueCapacity(queueCapacity);
        // 最大线程数,包含核心线程数
        executor.setMaxPoolSize(maxPoolSize);
        // 线程回收时间
        executor.setKeepAliveSeconds(keepAliveTime);
        // 4种拒绝策略 1. AbortPolicy 丢弃任务并抛异常,2. DiscardPolicy 丢弃任务不抛异常 3.DiscardOldestPolicy 丢弃队列最前面的线程(弃老)4.CallerRunsPolicy:由调用线程处理该任务
        executor.setRejectedExecutionHandler(getRejectedExecutionHandler(rejectedExecutionHandler));
        // 是否允许超时回收线程
        executor.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut);
        // 线程执行完毕后,关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
        // 线程名前缀
        executor.setThreadNamePrefix(threadNamePrefix);
        // 初始化
        executor.initialize();
        return executor;
    }

    private int getCorePoolSize(int corePoolSize){
        // 如果是-1 去机器核心数的2倍
        if(-1 == corePoolSize){
            int processors = Runtime.getRuntime().availableProcessors();
            log.info("机器核心数:{},核心线程数:{}",processors,processors * 2);
            return processors * 2;
        }
        log.info("核心线程数:{}",corePoolSize);
        return corePoolSize;
    }

    private RejectedExecutionHandler getRejectedExecutionHandler(String rejectedExecutionHandler){
        RejectedExecutionHandler reh = null;
        try{
            log.info("配置的线程拒绝策略:{}",rejectedExecutionHandler);
            reh = (RejectedExecutionHandler) Class.forName("java.util.concurrent.ThreadPoolExecutor$" + rejectedExecutionHandler).newInstance();
        }catch (Exception e){
            log.warn("获取拒绝策略失败,默认使用 CallerRunsPolicy");
            reh = new ThreadPoolExecutor.CallerRunsPolicy();
        }
        return reh;
    }

}


yml:

my-task:
  execution:
    pool:
      corePoolSize: 8 # 核心线程数
      maxPoolSize: 20 # 最大线程数
      queueCapacity: 100 # 等待队列
      allow-core-thread-timeout: true # 是否允许超时回收
      keepAliveTime: 60 # 线程存活时间
    threadNamePrefix: spring_task_
    rejectedExecutionHandler: CallerRunsPolicy # 拒绝策略 1. AbortPolicy 丢弃任务并抛异常,2. DiscardPolicy 丢弃任务不抛异常 3.DiscardOldestPolicy 丢弃队列最前面的线程(弃老)4.CallerRunsPolicy:由调用线程处理该任务
    allowCoreThreadTimeOut: true # 是否回收核心线程
    waitForTasksToCompleteOnShutdown: true # 执行完毕是否关闭线程池

使用:

指定使用哪个线程池

    @Async(value = "asyncServiceThread")
    @Override
    public void testThread1() {
        log.info("testThread1 正在执行,线程{}",Thread.currentThread().getName());
    }

调用:

    @GetMapping("/testThread")
    public String testThread(){

        for (int i = 0; i < 100; i++) {
            taskServer.testThread1();
            taskServer.testThread2();
        }
        return "succeed";
    }

补充:拒绝策略

拒绝策略就是当线程池已经满了,还有新的线程过来,此时我们怎么处理这些线程。

1.AbortPolicy(默认线程策略):丢弃任务并抛出RejectedExecutionException异常

​ 如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。

2.DiscardPolicy:丢弃任务,但是不抛出异常

​ 无关紧要的业务可以这样做。

3.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。(弃老策略)

4.CallerRunsPolicy:由调用线程处理该任务

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/33989.html

(0)
上一篇 2023-12-24 18:00
下一篇 2023-12-31 21:33

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信