Springboot自定义ThreadPoolTaskExecutor线程池多线程并发执行异步方法

1. 背景

当前因为工作需求,要发送大量Http请求,经过实践遍历发送需要6小时才能发送完毕,如果单线程发送请求会导致主线程阻塞。就会存在以下问题:

  1. 前端用户等待响应时间过长,无法进行下一步操作,不利于用户操作系统
  2. 响应时间过长超过Tomcat服务器会话时间,导致前后端的请求重新连接,这会出现抛出java.io.IOException: 你的主机中的软件中止了一个已建立的连接;重而终止了还未完成的Http发送任务
  3. 如果主线程其他任务如:定时Excel数据批量导入,文件上传等等;很容易因为文件格式问题,导致抛出异常,从而把Http的任务中断
  4. 夜长梦多,长时间发送请求,无法判断是否执行完毕;如果抛出异常,或是需要重启服务,无法判断执行到什么阶段,定位重发位置需要耗费很多时间精力,得不偿失,只能全部重发(一样面临以上问题)。

2. 解决思路

经过考虑可以使用多线程来解决问题,主线程负责开子线程处理请求。其中根据业务需要可以将多线程分为以下2类

  • 任务类型1. 开完子线程就返回响应用户的请求。
  • 任务类型2. 开完子线程后等待子线程响应结果后,再响应用户请求

3. 具体实现

Springboot中已经有集成的ThreadPoolTaskExecutor线程池可以供调用(注意区分Java自带的ThreadPoolExecutor),虽然2种线程池的提供的具体方法不一定一样,但是调度线程过程原理是通用,下面贴出ThreadPoolExecutor的调度线程过程作为创建ThreadPoolTaskExecutor的参考。

4. 开始编写代码

@EnableAsync 注解启用Spring 异步方法执行功能

当前的线程池 :

  • 核心线程池(CorePool) : 10 ;
  • 最大线程池(MaxPool) : 20 ;
  • 队列长度(QueueCapacity) : 200;
  • 拒绝策略(RejectedExecutionHandler) : CallerRunsPolicy

表示:

  • 如果 请求创建的线程数 <= 10 + 200, 只会使用核心线程池的10个线程执行任务;
  • 如果 10 + 200 < 请求创建的线程数 , 创建额外的10个线线程处理任务, 线程池中同时拥有20个线程处理任务

    使用CallerRunsPolicy策略,主线程还没将 (请求创建的线程数 – 10 + 200 ) 的子线程创建完毕, 主线程处于阻塞状态,需要将所有子线程创建完毕才可以返回响应,即使任务是任务类型1,它的响应时间也会无限接近于 任务类型2;根据业务需求可以通过设置更长的队列长度来解决

    线程处理完任务后,(最大线程池 – 核心线程池) > 0 && 经过60s空闲时间,就会回收除核心线程池外的空闲线程

@EnableAsync //注解启用Spring 异步方法执行功能@Configurationpublic class AsyncTaskConfig {    @Bean("AsyncTaskThreadPool")    public Executor threadPoolExecutor() {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        // 核心线程数:线程池创建时候初始化的线程数        executor.setCorePoolSize(10);        // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程        executor.setMaxPoolSize(20);        // 缓冲队列:用来缓冲执行任务的队列        executor.setQueueCapacity(200);        // 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁        executor.setKeepAliveSeconds(60);        // 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池        executor.setThreadNamePrefix("AsyncTaskThreadPool-");        // 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)        //AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常        //DiscardPolicy:丢弃任务,但是不抛出异常。可能导致无法发现系统的异常状态        //DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务        //CallerRunsPolicy:不丢弃任务 由调用线程处理该任务        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        // 线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭        executor.setAllowCoreThreadTimeOut(true);        executor.initialize();        return executor;    }}

@Async 根据添加到自定义线程池中执行

@Component
public class AsyncRequestComponent {

    @Async("AsyncTaskThreadPool")
    public CompletableFuture<String> mockHttpRequest(String url) throws InterruptedException, IOException {
        String threadName = Thread.currentThread().getName();
        System.out.println("线程" + threadName + "开始调用,输出:" + url);
        String result = "";
        // HttpClient RestTemplate Feign or 其他Http请求框架
        // 或者其他需要 异步并发 的 逻辑代码 
        // Thread.sleep(50);
        result = HttpClientUtil.get(url);

        System.out.println("线程" + threadName + "调用结束,输出:" + url);
        return CompletableFuture.completedFuture(result);
    }
}

@Slf4j
@SpringBootTest
class DemoTestApplicationTests {
    @Autowired
    private AsyncRequestComponent asyncRequestComponent;

     @Test
    void testAsyncThreadPerformance() throws InterruptedException, ExecutionException {

        long start = System.currentTimeMillis();
        List<CompletableFuture<String>> list = new ArrayList<>();
        // 通过修改循环次数来查看 任务数<缓存队列+核心线程 和 任务数>缓存队列+核心线程 的主线程花费时间
        // 如果 任务数<缓存队列+核心线程 主线程不需要等待
        // 如果 任务数>缓存队列+核心线程 主线程需要等待 因为后面任务还没创建成功
        for(int i=0 ;i < 2000; i++){
            CompletableFuture<String> future= asyncRequestComponent.mockHttpRequest(i);
            list.add(future);
        }

        // 任务类型1 主线程不需要等待所有子线程执行完毕,将下面的for循环注释
        // 任务类型2 主线程需要等待所有子线程执行完毕:需要遍历future执行get方法等待子线程执行完毕
        for(Future<?> future : list){
            while (true) {//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
                if (future.isDone() && !future.isCancelled()) { //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。
                    String result = (String) future.get();//获取结果
                    System.out.println("任务i=" + result + "获取完成!" + new Date());
                    break;//当前future获取结果完毕,跳出while
                } else {
                    Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
                }
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("主线程花费时间:"+(end-start));
    }

}

5. 结论

  • 如果希望主线程永远不会阻塞,不设置队列长度,因为默认长度设置为Integer.MAX_VALUE; 因为队列长度很长,不会额外再创建线程处理了,所以核心线程(CorePool) = 最大线程(MaxPool)
  • 如果本身业务用户可以容忍一定范围的响应延迟,这就需要根据实际出发,统计子线程任务的平均执行实际,判断核心线程数、最大线程数、缓冲队列的容量

题外话:如何创建线程池

CPU密集型:对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 8 核的 CPU,每个核一个线程,理论上创建 8 个线程就就可以;理论公式:最大线程数 = CPU核数 + 1

IO 密集型任务: IO 密集型任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。理论公式 : 最大线程数 = 最大线程数 = CPU核心数 / (1 – IO_TIME/REQUEST_TIME) ,比如在某个请求中,请求时长为10秒,调用IO时间为8秒,这时我们阻塞占百分比就是80%,有效利用CPU占比就是20%,假设是八核CPU,我们线程数就是8 / (1 – 80%) = 8 / 0.2 = 40个

最大线程数:

  • CPU密集型:CPU 核心数 + 1
  • IO密集型:CPU 核心数 / (1 – IO_TIME/REQUEST_TIME)

核心线程数: 核心线程数 = 最大线程数 * 20%

获取CPU核心数命令:

window – Java : Runtime.getRuntime().availableProcessors()//获取逻辑核心数,如6核心12线程,那么返回的是12

linux – 物理CPU: cat /proc/cpuinfo| grep “physical id”| sort| uniq| wc -l

linux – 物理CPU中core: cat /proc/cpuinfo| grep “cpu cores”| uniq

linux – 逻辑CPU: cat /proc/cpuinfo| grep “processor”| wc -l

偷懒大法:

  • 核心线程数:CPU核心数
  • 最大线程数:CPU逻辑CPU( CPU核心数 * 2)

参考资料:

https://blog.csdn.net/weixin_49258262/article/details/125463819?spm=1001.2014.3001.5506
https://blog.csdn.net/Wei_Naijia/article/details/127028398?spm=1001.2014.3001.5506
https://blog.csdn.net/wozyb1995/article/details/125044992?spm=1001.2014.3001.5506
https://blog.csdn.net/iampatrick_star/article/details/124490586?spm=1001.2014.3001.5506
https://www.cnblogs.com/651434092qq/p/14240406.html
https://juejin.cn/post/6948034657321484318
https://juejin.cn/post/7072281409053786120

版权声明:本文为博主作者:Mr丶LuoT4原创文章,版权归属原作者,如果侵权,请联系我们删除!

原文链接:https://blog.csdn.net/Process_ing/article/details/129764905

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
青葱年少的头像青葱年少普通用户
上一篇 2024年1月8日
下一篇 2024年1月8日

相关推荐