java线程池的正确使用方式,completableFuture「建议收藏」

java线程池的正确使用方式,completableFuture「建议收藏」下面是最常见的线程池的使用和声明方式:publicclassThreadTest{ExecutorServicefixedThreadPool=Executors.newFixedThreadPool(50);publicvoiddothing(){for(inti=0;i<50;i++){…

大家好,欢迎来到IT知识分享网。

下面是最常见的线程池的使用和声明方式:
   

public class ThreadTest {

    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(50);
    
    public void dothing(){
        for(int i=0;i<50;i++){
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    //do something
                }
            });
        }
        fixedThreadPool.shutdown();//关闭线程池

        //此处不可以删除或注释,需要线程执行结束后再执行别的内容,即只有线程结束后才会继续向下执行
        while (!fixedThreadPool.isTerminated()) {

        }
    }
}

IT知识分享网

上面的声明方式存在下面几个问题:

1)fixedThreadPool为对象变量,每个对象都会产生新的线程池,这就意味着虽然声明的是fixedThreadPool,但实际上线程的数量是不可控的,因为可能同时产生多个对象,每个对象分别有一个线程池,所以就有可能出现多个线程池,虽然每个线程自身占用的内存有限,但是每个线程执行过程中产生的对象所占用的内存却是不可控的,可能多达几十甚至上百兆。此时则会消耗大量内存,报内存溢出错误

2)另外,建立线程池的目的之一就是因为线程的重建和销毁会消耗时间和空间,线程池可以实现线程的重复利用,节约线程的销毁和重建成本,但是上面的最后两行代码,会在任务执行完后就关闭线程池,并销毁线程,并没能充分利用线程池的优势。

3)使用executors创建线程池有四个,single、fixed、catched、secheduled,这四种线程池都存在可能导致OOM问题,single和fixed虽然限制了线程个数,但是允许线程等待的队列最大个数却是Integer.MAX_VALUE,可能会堆积大量的请求,导致OOM;catched和secheduled方式,直接线程个数就是Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

4)上面run方法里面的内容每有进行try catch捕获异常,那么如果run方法中出现了异常,就会直接导致线程终止,每次都需要重建线程,等于线程池没有发挥作用,高并发下,可能会导致CPU和内存占用飙升,甚至jvm挂掉。解决方案就是run方法内部try catch

如果确定你的并发量有限,并且每个线程占用的内存大小有限,你可以使用Executors来建立线程池,

所以使用Executors建立线程池的正确使用方式,针对问题1和2,应该将线程池声明为静态变量,如下:

IT知识分享网    public static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(50);

此时,fixedThreadPool是类变量,由所有的ThreadTest类的对象共享,注意此时不可以再有如下代码:

        fixedThreadPool.shutdown();//关闭线程池
        //此处不可以删除或注释,需要线程执行结束后再执行别的内容,即只有线程结束后才会继续向下执行        
        while(!fixedThreadPool.isTerminated()) { 
     
        }

即线程池不能关闭,如果关闭了会报如下异常:

 java.util.concurrent.RejectedExecutionException。

不能shutdown,笔者在实际场景遇到了一个问题:

上面的线程池实际是向数据库中插入数据,执行完后需要在页面展示数据坤插入的数据,因为无法使用shutdown和isTerminated方法,导致查的时候数据还没有插入。解决的方案很简单,让主线程睡眠两秒,即

IT知识分享网Thread.currentThread().sleep(2000);

。两秒足够线程池中的任务执行完毕。

如果你的并发量没有办法控制,并且每个线程占用的内存大小无法确定较小,那么你需要使用ThreadPoolExecutor的方式来创建线程。

其构造函数签名如下:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);  

参数介绍:

corePoolSize 核心线程数,指保留的线程池大小(不超过maximumPoolSize值时,线程池中最多有corePoolSize 个线程工作)。

maximumPoolSize 指的是线程池的最大大小(线程池中最大有corePoolSize 个线程可运行)

keepAliveTime 指的是空闲线程结束的超时时间(当一个线程不工作时,过keepAliveTime 长时间将停止该线程)。

unit 是一个枚举,表示 keepAliveTime 的单位(有NANOSECONDS, MICROSECONDS,

MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS,7个可选值)。

workQueue 表示存放任务的队列(存放需要被线程池执行的线程队列)。

threadFactory – 执行程序创建新线程时使用的工厂。

handler – 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

参数之间关系如下:

第一类:控制被处理线程:corePoolSize,maximumPoolSize,workQueue会决定你的线程处理顺序,首先进来线程会将该线程池的实际线程数变为corePoolSize,达到corePoolsize后,再进来的线程就会进入workQueue来排队;如果workQueue满了,再进来的线程就会继续创建线程,直到实际个数达到maximumPoolSize;

无论是对列里面的排队线程,还是实际正在处理的线程,都会占用内存,所以需要根据实际每个执行线程占用内存的大小合理corePoolSIze,maximumPoolSize,和workQueue。

池中线程数大小的选择策略:
如果纯计算的任务,多线程并不能带来性能提升,因为CPU处理能力是稀缺的资源,相反导致较多的线程切换的花销,此时建议线程数为CPU数量或加一;
如果任务包含大量IO/网络等待等,线程数 = CPU 核数 × 目标 CPU 利用率 ×(1 + 平均等待时间 / 平均工作时间)

而关于workQueue有下面三种方式:

1、直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes, 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

2、无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

3、有界队列。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开 销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

第二类:RejectedExecutionHandler:无法处理线程的handler

RejectedExecutionHandler接口提供了对于拒绝任务的处理的自定方法的机会。在ThreadPoolExecutor中已经默认包含了4中策略,因为源码非常简单,这里直接贴出来。

CallerRunsPolicy(直接运行run方法):线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

1.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  

2.                 if (!e.isShutdown()) {  

3.                     r.run();  

4.                 }  

5.             } 

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用线程本身run方法来执行。

AbortPolicy(抛异常):处理程序遭到拒绝将抛出运行时RejectedExecutionException

1.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  

2.                 throw new RejectedExecutionException();  

3.             } 

 这种策略直接抛出异常,丢弃任务。

DiscardPolicy(直接丢弃):不能执行的任务将被删除

1.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  

2.             } 

 这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。

DiscardOldestPolicy(抛弃最老的线程):如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后

重试执行程序(如果再次失败,则重复此过程)

1.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  

2.                 if (!e.isShutdown()) {  

3.                     e.getQueue().poll();  

4.                     e.execute(r);  

5.                 }  

        } 

该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。

设想:如果其他线程都还在运行,那么新来任务踢掉旧任务,缓存在queue中,再来一个任务又会踢掉queue中最老任务

当然,还有别的使用ExecutorService+FutureTask+Callable方式应该也可以,可参考这里

ThreadFactory的使用

当一个线程由于未捕获异常而退出时,JVM会把这个事件报告给应用程序提供的UncaughtExceptionHandler异常处理器(这是Thread类中的接口)。所以我们可以在创建线程的时候声明异常处理器。实例如下:
public class MyUnchecckedExceptionhandler implements UncaughtExceptionHandler {

    @Override
    public void uncaughtException(Thread t, Throwable e) {

        System.out.println(“捕获异常处理方法:” + e);
    }
}
ExecutorService exec = Executors.newCachedThreadPool(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {

                Thread thread = new Thread(r);
                thread.setUncaughtExceptionHandler(new MyUnchecckedExceptionhandler());
                return thread;
            }
});

线程池的源码

execute在哪里启动额线程?
怎么调度的 ?线程将任务执行完毕怎么加另一个任务?
第一次创建线程(Worker) 的时候,调用start方法开启线程,然后worker的run方法会启动传入的runnable的fun方法;
然后不断判断当前runnable是否空,空说明执行完,获取新runnable的直接执行run方法

线程池中线程如何复用的?

worker,大的while循环,不断到队列里面获取,如果有任务执行任务,否则sleep。sleep好了在执行任务或sleep

https://blog.csdn.net/anhenzhufeng/article/details/88870374

———————————————————————————————————————–

更新与2021年11月,最佳实践如下:

1 根据机器资源创建线程池

public class StuClassInfoThreadExecutor {
    /**
     * 线程池名称
     */
    private static final String STU_CLASS_INFO_THREAD_FACTORY_NAME = "stu-class-info-thread-pool";

    /**
     * cpu可用核数
     */
    private static final int DEFAULT_CPU_PROCESSORS = Runtime.getRuntime().availableProcessors();

    /**
     * spring支持的线程池任务包装类
     */
    private static final ThreadPoolTaskExecutor STU_CLASS_INFO_THREAD_EXECUTOR = new ThreadPoolTaskExecutor();

    /**
     * 默认线程池
     */
    private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory(STU_CLASS_INFO_THREAD_FACTORY_NAME);

    static {
        STU_CLASS_INFO_THREAD_EXECUTOR.setThreadFactory(DEFAULT_THREAD_FACTORY);
        //此线程池负责业务属于IO密集型,设置核心线程数为cpu核数*2
        STU_CLASS_INFO_THREAD_EXECUTOR.setCorePoolSize(DEFAULT_CPU_PROCESSORS * 5);
        STU_CLASS_INFO_THREAD_EXECUTOR.setMaxPoolSize(DEFAULT_CPU_PROCESSORS * 25);
        STU_CLASS_INFO_THREAD_EXECUTOR.setQueueCapacity(20);
        //默认值就是60seconds,显示声明的目的是直观看到线程池中线程的存活时间
        STU_CLASS_INFO_THREAD_EXECUTOR.setKeepAliveSeconds(60);
        //CallerRunsPolicy这个拒绝策略代表的是,如果工作列队满了(超过QueueCapacity)同时MaxPoolSize也达到了阀值,那么当前任务使用主线程调用
        STU_CLASS_INFO_THREAD_EXECUTOR.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //核心线程如果超过了keepTimeOut时间也会进行回收,设置这个参数的原因是因为其他maven-module可能不需要依赖这个线程池,从而造成线程资源的浪费
//        STU_CLASS_INFO_THREAD_EXECUTOR.setAllowCoreThreadTimeOut(true);
        STU_CLASS_INFO_THREAD_EXECUTOR.initialize();
    }

    /**
     * 获取线程池任务实例
     *
     * @return ThreadPoolTaskExecutor
     */
    public static ThreadPoolTaskExecutor getInstance() {
        return STU_CLASS_INFO_THREAD_EXECUTOR;
    }

}

使用

private List<ClassByStudentDto> getStudentClassesEnhanced(List<RegistCoreDto> registerDtoList, String cityCode){
        CompletableFuture<ListMultimap<String, ClassByStudentDto>> classIdClassDtoMultimap =
                CompletableFuture.supplyAsync(() -> this.fetchClassDetails(registerDtoList, cityCode), StuClassInfoThreadExecutor.getInstance());

        // 因为需要线程中的异常做降级(CourseBiz.getStudentClasses),所以这里取消原本的异步,使用主线程执行
//        CompletableFuture<Map<String, List<StudentLessonDto>>> registerIdToLessons =
//                CompletableFuture.supplyAsync(() -> this.getAllRegisterLessons(registerDtoList, cityCode), StuClassInfoThreadExecutor.getInstance());
        Map<String, List<StudentLessonDto>> registerIdToLessons = this.getAllRegisterLessons(registerDtoList, cityCode);

//        CompletableFuture<List<ClassByStudentDto>> results = classIdClassDtoMultimap
//                .thenCombineAsync(registerIdToLessons, (classDtoMultimap, registerLessons) ->
//                        this.getClassByStudentDtos(cityCode, classDtoMultimap, registerLessons), StuClassInfoThreadExecutor.getInstance());
        CompletableFuture<List<ClassByStudentDto>> results = classIdClassDtoMultimap
                .thenApplyAsync( (classDtoMultimap) ->
                        this.getClassByStudentDtos(cityCode, classDtoMultimap, registerIdToLessons), StuClassInfoThreadExecutor.getInstance());

        return results.exceptionally(e -> {
            alarmService.reportException(e);
            return List.of();
        }).join();
    }

CompletableFuture执行任务

在使用FutureTask来完成异步任务,通过get()方法获取结果时,会让获取结果的线程进入阻塞等待,这种方式并不是最理想的状态。

JDK8中引入了CompletableFuture,对Future进行了改进,可以在定义CompletableFuture传入回调对象,任务在完成或者异常时,自动回调。

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        // 创建CompletableFuture时传入Supplier对象
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new MySupplier());
        //执行成功时
        future.thenAccept(new MyConsumer());
        // 执行异常时
        future.exceptionally(new MyFunction());
        // 主任务可以继续处理,不用等任务执行完毕
        System.out.println("主线程继续执行");
        Thread.sleep(5000);
        System.out.println("主线程执行结束");
    }
}

class MySupplier implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {
            // 任务睡眠3s
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 3 + 2;
    }
}
// 任务执行完成时回调Consumer对象
class MyConsumer implements Consumer<Integer> {
    @Override
    public void accept(Integer integer) {
        System.out.println("执行结果" + integer);
    }
}
// 任务执行异常时回调Function对象
class MyFunction implements Function<Throwable, Integer> {
    @Override
    public Integer apply(Throwable type) {
        System.out.println("执行异常" + type);
        return 0;
    }
}
复制代码

以上代码可以通过lambda表达式进行简化。

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 任务睡眠3s
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 3 + 2;
        });
        //执行成功时
        future.thenAccept((x) -> {
            System.out.println("执行结果" + x);
        });
        future.exceptionally((type) -> {
            System.out.println("执行异常" + type);
            return 0;
        });
        System.out.println("主线程继续执行");
        Thread.sleep(5000);
        System.out.println("主线程执行结束");
    }
}
复制代码

通过示例我们发现CompletableFuture的优点:

  • 异步任务结束时,会自动回调某个对象的方法;
  • 异步任务出错时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行。

当然这些优点还不足以体现CompletableFuture的强大,还有更厉害的功能。

串行执行

多个CompletableFuture可以串行执行,如第一个任务先进行查询,第二个任务再进行更新

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        // 第一个任务
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1234);
        // 第二个任务
        CompletableFuture<Integer> secondFuture = future.thenApplyAsync((num) -> {
            System.out.println("num:" + num);
            return num + 100;
        });
        secondFuture.thenAccept(System.out::println);
        System.out.println("主线程继续执行");
        Thread.sleep(5000);
        System.out.println("主线程执行结束");
    }
}
复制代码

并行任务

CompletableFuture除了可以串行,还支持并行处理。

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        // 第一个任务
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> 1234);
        // 第二个任务
        CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> 5678);
		// 通过anyOf将两个任务合并为一个并行任务
        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(oneFuture, twoFuture);

        anyFuture.thenAccept(System.out::println);
        System.out.println("主线程继续执行");
        Thread.sleep(5000);
        System.out.println("主线程执行结束");
    }
}
复制代码

通过anyOf()可以实现多个任务只有一个成功,CompletableFuture还有一个allOf()方法实现了多个任务必须都成功之后的合并任务。
作者:小黑说Java
链接:https://juejin.cn/post/7008318996827078693
来源:稀土掘金
 

更多关于completableFuture的介绍

如何编写优雅的异步代码 — CompletableFuture – 掘金

CompletableFuture 使用详解 – 简书

    

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7509.html

(0)
上一篇 2023-01-05 23:30
下一篇 2023-01-06 02:40

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信