第十四章. common-core 核心配置:ThreadPoolConfig 线程池详解
摘要:本章我们将深入 RVP 的异步基石 ThreadPoolConfig。我们将从“为何需要线程池”的痛点出发,通过“饭店模型”彻底理解线程池的七大参数与执行流程。然后我们将深入 Java ThreadPoolExecutor 和 RVP ThreadPoolConfig 的源码,最后实战 execute、submit 和 schedule 的正确用法。
在前面的章节中,我们已经深入分析了 common-core 中 utils 包下的所有核心工具类,从 ServletUtils 的 ThreadLocal 原理,到 RegexValidator 的三层校验架构。我们掌握了 RVP 框架提供的“工具箱”。
但 common-core 的价值不止于工具类。它还提供了“配置类”,为整个框架的性能和稳定性奠定基础。
从本章开始,我们将进入 config 包,首先要面对的就是 最核心 的 ThreadPoolConfig。线程池是后端高并发、异步化处理的基石。RVP 在 common-core 中为我们预配置了一套全局线程池,理解它、用好它,是“二开”中实现性能优化的关键。
本章学习路径

14.1. 【二开痛点】为何需要线程池?
在我们的“二开”业务中,经常会遇到一些耗时操作,例如:
- 发送通知:用户注册成功后,需要发送欢迎邮件或短信。
- 记录日志:记录非核心的、详细的操作日志到数据库或日志文件。
- 调用外部 API:调用多个(互不依赖的)第三方 API 来聚合数据。
如果我们让主线程(即处理用户 HTTP 请求的那个线程)同步 等待这些操作完成,将 极大延长 接口的响应时间,严重影响用户体验。
例如,一个“注册”接口,核心业务(写入用户表)可能只需要 50 毫秒,但发送邮件这个非核心业务需要 3 秒。如果同步执行,用户必须在浏览器前 白白等待 3 秒 才能收到“注册成功”的提示。
线程池的核心价值,就是将这些“耗时”且“非核心”的任务,从 主线程 剥离出去,交给 后台的子线程去执行,从而让主线程能够“立即返回”,实现接口“秒回”。
14.2. 【实战】同步执行的“慢”接口(pool1 案例)
我们来动手模拟这个痛点。
14.2.1. 测试准备:创建 ThreadPoolController
我们首先在 ruoyi-demo 模块中创建 ThreadPoolController,并定义一个模拟耗时 3 秒的方法。
文件路径:ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/controller/ThreadPoolController.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package org.dromara.demo.controller;
import cn.hutool.core.thread.ThreadUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.domain.R; import org.dromara.common.doc.annotation.SaIgnore; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@SaIgnore @Slf4j @RequiredArgsConstructor @RestController @RequestMapping("/pool") public class ThreadPoolController {
private void method1() { log.info("开始执行 method1 ..."); ThreadUtil.sleep(3, TimeUnit.SECONDS); }
}
|
14.2.2. pool1 接口:同步循环调用
我们添加 pool1 接口,它在 主线程 中循环调用 5 次 method1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@GetMapping("/pool1") public R<Void> pool1() { long startTime = System.currentTimeMillis(); for (int i = 0; i < 5; i++) { method1(); } long endTime = System.currentTimeMillis(); log.info("pool1 总耗时: {} ms", (endTime - startTime)); return R.ok("操作成功"); }
|
验证:我们启动项目,并调用 GET http://localhost:8080/pool/pool1。我们会发现,浏览器会“转圈”长达 15 秒 后才返回“操作成功”。查看控制台日志,会打印 pool1 总耗时: 15021 ms(类似这样的值)。
这个 15 秒的等待是无法接受的。method1 之间互不依赖,完全可以“并发”执行。这就是线程池的用武之地。
14.3. 【实战】异步执行的“快”接口(pool2 案例)
pool1 的问题在于主线程做了 不必要 的等待。我们现在使用 RVP 配置的线程池来解决这个问题。
14.3.1. 注入 RVP 预配置的线程池
RVP 已经在 ThreadPoolConfig(common-core 中)为我们配置好了全局线程池(我们将在 14.8 节深入源码),我们只需要 注入 它。
在 RVP 5.x 版本的 ThreadPoolConfig 中,默认注册的 Bean 是 ScheduledExecutorService,我们通过构造函数注入它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
import java.util.concurrent.ScheduledExecutorService;
@SaIgnore @Slf4j @RequiredArgsConstructor @RestController @RequestMapping("/pool") public class ThreadPoolController { private final ScheduledExecutorService scheduledExecutorService;
}
|
14.3.2. pool2 接口:异步提交任务
现在,我们把 5 个任务“提交”给线程池,主线程则“立即返回”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
@GetMapping("/pool2") public R<Void> pool2() { long startTime = System.currentTimeMillis(); for (int i = 0; i < 5; i++) { scheduledExecutorService.execute(this::method1); } long endTime = System.currentTimeMillis(); log.info("pool2 总耗时: {} ms", (endTime - startTime)); return R.ok("操作成功"); }
|
验证:我们重启项目,并调用 GET http://localhost:8080/pool/pool2。
结果:浏览器 瞬间 返回“操作成功”。查看控制台日志,会打印 pool2 总耗时: 3 ms(或某个极小的值)。
分析:
- 主线程:在
for 循环中,只做了 5 次“提交”动作,这个动作极快。主线程 没有 等待 15 秒,而是立即返回了 R.ok()。 - 后台线程:在接下来的 15 秒里,线程池中的 子线程(例如
schedule-pool-1, schedule-pool-2…)正在后台默默地执行 5 次 method1(我们可以在控制台看到 5 次“开始执行 method1”的日志)。
我们成功地将“耗时操作”与“主流程”解耦,实现了接口的“秒回”。但这背后是如何工作的?RVP 配置的 corePoolSize 是多少?maxPoolSize 又是多少?这些参数的含义,我们必须在理论阶段搞清楚。
14.4 深入理解:线程池的资源管理机制
在上一节,我们体验了线程池带来的性能提升。但你可能注意到,创建 ThreadPoolExecutor 时需要传入一堆参数。这不是设计者故意为难我们,而是因为线程池本质上是一个 精密的资源调度系统。
让我们来看一个完整的线程池创建代码,然后我们逐步理解每个参数的作用:
1 2 3 4 5 6 7 8 9
| ThreadPoolExecutor executor = new ThreadPoolExecutor( 3, 5, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20), new CustomThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() );
|
为什么需要这么多参数?想象一下,如果你是一家餐厅的经理,面对波动的客流,你需要考虑:平时要保留几个厨师?高峰期最多能招几个临时工?等位区要设置多少座位?当座位满了怎么办?这些决策直接影响餐厅的运营效率和成本。
线程池面临的是同样的问题:如何在 资源利用率 和 响应速度 之间找到平衡。
14.4.1 核心参数的设计思路
我们从一个实际场景开始。假设你的系统平时每秒处理 10 个请求,高峰期可能达到 50 个。如果为每个请求创建一个线程,系统很快就会因为线程过多而崩溃。这时,线程池的参数设计就显得尤为重要。
corePoolSize(核心线程数) 决定了系统的 “基础处理能力”。设置为 3 意味着:即使没有任何任务,线程池也会保持 3 个线程随时待命。为什么要保持?因为线程的创建和销毁是有开销的,保持一定数量的常驻线程可以快速响应请求。
workQueue(工作队列) 是线程池的 “缓冲区”。当 3 个核心线程都在忙时,新任务会进入这个队列等待。选择 ArrayBlockingQueue(20) 意味着最多可以缓存 20 个任务。队列的选择非常重要:
ArrayBlockingQueue:有界队列,防止内存溢出LinkedBlockingQueue:无界队列,可能导致 OOM(内存溢出)SynchronousQueue:不存储任务,直接传递
maximumPoolSize(最大线程数) 是系统的 “极限处理能力”。当队列满了,线程池会创建新线程,直到达到这个上限。设置为 5 意味着最多同时运行 5 个线程。
14.4.2 动态扩缩容机制
线程池最精妙的设计在于它的动态扩缩容机制。让我用一段代码演示这个过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| AtomicInteger counter = new AtomicInteger(0); ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 4, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), r -> new Thread(r, "demo-" + counter.getAndIncrement()) );
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor(); monitor.scheduleAtFixedRate(() -> { System.out.printf("活跃线程: %d, 队列任务: %d, 总完成: %d%n", executor.getActiveCount(), executor.getQueue().size(), executor.getCompletedTaskCount() ); }, 0, 1, TimeUnit.SECONDS);
for (int i = 1; i <= 6; i++) { final int taskId = i; executor.execute(() -> { System.out.println("Task-" + taskId + " 开始执行"); ThreadUtil.sleep(2, TimeUnit.SECONDS); }); System.out.println("提交了 Task-" + taskId); ThreadUtil.sleep(2, TimeUnit.SECONDS); }
|
运行这段代码,你会观察到线程池的扩容过程:
- Task-1、Task-2 直接被核心线程执行
- Task-3、Task-4 进入队列等待
- Task-5 提交时,队列已满,创建第 3 个线程
- Task-6 提交时,继续创建第 4 个线程(达到最大值)
30 秒后,如果没有新任务,非核心线程(第 3、4 个)会被自动回收。
14.5 任务执行流程:从提交到完成
理解了参数的含义,我们来看线程池处理任务的完整决策流程。这个流程体现了线程池的核心设计哲学:优先复用,按需扩展,适时收缩。

线程池的五种状态像量子态一样严格转换:

14.5.1 拒绝策略的选择
当线程池饱和时,拒绝策略决定了如何处理新任务。JDK 提供了四种内置策略:
1 2 3 4 5 6 7 8 9 10 11
| executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
|
RVP 选择了 CallerRunsPolicy,这是一种 “自我牺牲” 的策略:当线程池忙不过来时,提交任务的线程(通常是主线程)会亲自执行这个任务。这样做的好处是不会丢失任务,同时通过让调用者线程参与执行,自然地实现了一种反压机制。
14.6 优雅关闭:确保任务不丢失
线程池的关闭看似简单,实则大有学问。不当的关闭方式可能导致任务丢失或程序挂起。让我们通过实际代码理解两种关闭方式的区别:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| public class ThreadPoolShutdownDemo { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 2, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>() ); for (int i = 1; i <= 5; i++) { final int taskId = i; executor.execute(() -> { System.out.println("开始执行 Task-" + taskId); try { Thread.sleep(2000); } catch (InterruptedException e) { System.out.println("Task-" + taskId + " 被中断!"); return; } System.out.println("完成执行 Task-" + taskId); }); } System.out.println("所有任务已提交,队列中有 " + executor.getQueue().size() + " 个任务等待"); demonstrateShutdown(executor); } private static void demonstrateShutdown(ThreadPoolExecutor executor) throws InterruptedException { System.out.println("\n=== 执行 shutdown() ==="); executor.shutdown(); System.out.println("shutdown后,isShutdown=" + executor.isShutdown()); System.out.println("shutdown后,isTerminated=" + executor.isTerminated()); try { executor.execute(() -> System.out.println("新任务")); } catch (RejectedExecutionException e) { System.out.println("新任务被拒绝:" + e.getMessage()); } boolean terminated = executor.awaitTermination(30, TimeUnit.SECONDS); System.out.println("所有任务完成:" + terminated); } private static void demonstrateShutdownNow(ThreadPoolExecutor executor) { System.out.println("\n=== 执行 shutdownNow() ==="); List<Runnable> pendingTasks = executor.shutdownNow(); System.out.println("被取消的任务数:" + pendingTasks.size()); System.out.println("注意观察哪些任务被中断了"); } }
|
运行这段代码,你会发现:
shutdown() 会等待所有已提交的任务执行完成,包括队列中的任务shutdownNow() 会立即中断正在执行的任务,并返回队列中未执行的任务列表
14.6.1 RVP 的优雅停机策略
在实际项目中,我们通常会结合两种关闭方式,实现 “先礼后兵” 的优雅停机:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @PreDestroy public void destroy() { log.info("开始关闭线程池..."); scheduledPool.shutdown(); asyncPool.shutdown(); try { if (!scheduledPool.awaitTermination(120, TimeUnit.SECONDS)) { log.warn("等待超时,强制关闭定时任务线程池"); scheduledPool.shutdownNow(); } if (!asyncPool.awaitTermination(120, TimeUnit.SECONDS)) { log.warn("等待超时,强制关闭异步任务线程池"); asyncPool.shutdownNow(); } } catch (InterruptedException e) { log.error("关闭线程池被中断"); scheduledPool.shutdownNow(); asyncPool.shutdownNow(); Thread.currentThread().interrupt(); } log.info("线程池关闭完成"); }
|
这种策略确保了:
- 正常情况下,所有任务都能执行完成
- 异常情况下(如等待超时),也能强制终止,避免程序无法退出
- 通过日志记录关闭过程,便于问题排查
理解了线程池的工作原理和关闭机制,下一节我们将分析 RVP 项目中 ThreadPoolConfig 的具体实现,看看这些理论知识是如何应用到实际项目中的。
14.7. RVP ThreadPoolConfig 源码解析
经过前面的理论学习,我们已经彻底理解了线程池的、七大参数、执行流程和关闭策略。
现在,让我们带着这些理论知识,深入 RVP 框架的 ThreadPoolConfig 源码,看看 RVP 5.x 版本的线程池是如何在实际项目中配置的。
文件路径:ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/config/ThreadPoolConfig.java
我们首先看类的声明和核心字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Slf4j @AutoConfiguration @EnableConfigurationProperties(ThreadPoolProperties.class) public class ThreadPoolConfig {
private final int core = Runtime.getRuntime().availableProcessors() + 1;
private ScheduledExecutorService scheduledExecutorService;
}
|
这里有几个关键信息:
@EnableConfigurationProperties(ThreadPoolProperties.class):这个注解会激活 ThreadPoolProperties 类,使其从 application.yml 文件中读取以 thread.pool 为前缀的配置(例如 keepAliveSeconds, queueCapacity,尽管在 5.x 默认配置中未直接使用)。private final int core = ...:RVP 没有 将核心线程数硬编码,而是动态获取了 当前服务器 CPU 的核心数(availableProcessors()),并 加 1。
为什么是 CPU + 1?
这是一种常见的优化策略。CPU 密集型任务(如纯计算)设置为 CPU 核心数即可最大化利用率。但我们的后端任务通常还包含 IO 等待(如读写数据库、调用 API)。+ 1 是为了让 CPU 在处理 IO 等待的间隙,有额外的线程可以切换,从而最大化 CPU 的利用率。
14.7.1. RVP 5.x 的核心 Bean:ScheduledExecutorService
在 RVP 4.x 和早期 5.x 版本中,ThreadPoolConfig 会注册一个 ThreadPoolTaskExecutor。但在 最新的 RVP 5.x 源码 中,该配置已被移除,默认 只 提供了一个 Bean:ScheduledExecutorService。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
@Bean(name = "scheduledExecutorService") protected ScheduledExecutorService scheduledExecutorService() { BasicThreadFactory.Builder builder = new BasicThreadFactory.Builder() .daemon(true); if (SpringUtils.isVirtual()) { builder.namingPattern("virtual-schedule-pool-%d") .wrappedFactory(new VirtualThreadTaskExecutor().getVirtualThreadFactory()); } else { builder.namingPattern("schedule-pool-%d"); } ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( core, builder.build(), new ThreadPoolExecutor.CallerRunsPolicy() ) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); printException(r, t); } }; this.scheduledExecutorService = scheduledThreadPoolExecutor; return scheduledThreadPoolExecutor; }
|
这段代码是 RVP 线程池配置的精华所在,我们来逐一解析:
为何只配 ScheduledExecutorService?
- 这是一个 高复用 的选择。
ScheduledExecutorService(定时任务线程池)完全兼容 普通线程池的功能(execute, submit),同时 额外提供 了 schedule(延迟执行)和 scheduleAtFixedRate(周期执行)的能力。RVP 5.x 通过只提供这一个 Bean,满足了绝大多数异步和定时的场景。
线程工厂 (BasicThreadFactory.Builder)
daemon(true):将线程设置为“守护线程”。这意味着当 JVM 主线程(Spring Boot 应用)退出时,这些线程池线程会 自动关闭,不会阻止程序停止。namingPattern("schedule-pool-%d"):为线程命名。这 极其重要,当我们在日志(Log)中看到 [schedule-pool-1] 时,就能立刻知道这个日志是由该线程池打印的,极大方便了调试。- 虚拟线程支持:RVP 5.x 还前瞻性地支持了 JDK 21 的虚拟线程。
拒绝策略 (CallerRunsPolicy)
- 这是我们在 14.5 节中提到的 调用者执行策略
- 在四种策略中,
CallerRunsPolicy 是最稳妥的策略。当线程池和队列都饱和时,它会 把这个新任务交还给提交任务的线程(通常是主线程/接口线程)去自己执行。 - 这虽然会短暂地 阻塞 主线程(接口变慢),但它保证了 任务 100% 不会丢失,并且通过“反压”机制,自然地减缓了任务提交的速度。
afterExecute 异常处理钩子
- 这是最精妙的一点。
- 痛点:如果我们使用
executor.execute(Runnable) 提交任务,Runnable 的 run() 方法是不允许 throws 异常的。如果任务内部发生 RuntimeException,这个异常会在子线程中“静默地 死掉,主线程 永远不会知道。 - RVP 解决方案:RVP 重写了
afterExecute 方法,并在内部调用 printException(r, t)。printException 会检查任务(Future<?>)是否在执行中抛出了异常(ExecutionException),如果抛了,它会把异常打印到日志中。 - 结论:这个钩子确保了即使是“即发即忘”的
execute 任务,其 异常也会被捕获并记录,防止了“静默失败”。
14.7.2. @PreDestroy 与 RVP 的优雅停机
ThreadPoolConfig 中还包含了 destroy() 方法,它利用 @PreDestroy 注解,实现了我们在 14.6 节中讲到的“优雅停机”逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
@PreDestroy public void destroy() { try { log.info("====关闭后台任务任务线程池===="); ScheduledExecutorService pool = scheduledExecutorService; if (pool != null && !pool.isShutdown()) { pool.shutdown(); try { if (!pool.awaitTermination(120, TimeUnit.SECONDS)) { pool.shutdownNow(); if (!pool.awaitTermination(120, TimeUnit.SECONDS)) { log.info("Pool did not terminate"); } } } catch (InterruptedException ie) { pool.shutdownNow(); Thread.currentThread().interrupt(); } } } catch (Exception e) { log.error(e.getMessage(), e); } }
|
分析:@PreDestroy 注解保证了在 Spring Boot 应用关闭时,destroy 方法会被自动调用,从而安全地关闭 scheduledExecutorService,确保数据尽可能不丢失。
14.8. 【二开实战】线程池的三种正确用法
理解了 RVP 的配置思路,我们就可以在 “二开” 中安全、高效地使用这个由 ThreadPoolConfig 注册的 scheduledExecutorService Bean 了。
14.8.1. 测试准备:注入 ScheduledExecutorService
我们确保 ThreadPoolController 已经注入了 RVP 提供的 Bean:
1 2 3 4 5 6 7 8 9 10 11 12 13
|
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.CompletableFuture;
@RequiredArgsConstructor @RestController @RequestMapping("/pool") public class ThreadPoolController {
private final ScheduledExecutorService scheduledExecutorService; }
|
14.8.2. 用法一:execute(Runnable) (无返回值)
这是最简单的用法,适用于 不关心结果 的 “即发即忘” 任务,例如记录日志、发送通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @GetMapping("/pool3") public R<Void> pool3() { Runnable task = () -> { log.info("我执行了 (execute),线程名: {}", Thread.currentThread().getName()); }; scheduledExecutorService.execute(task); return R.ok("操作成功"); }
|
验证:调用 GET .../pool3,接口立即返回 “操作成功”。控制台会(在子线程 schedule-pool-x 中)打印出 我执行了 (execute)...。
14.8.3. 用法二:CompletableFuture (有返回值)
当我们需要从异步任务中 获取一个计算结果 时,应该使用 CompletableFuture,它是 Java 8+ 提供的更强大的异步编程工具。
为什么不使用 Future?
传统的 Future 有以下局限性:
- 只能阻塞等待:
get() 方法会阻塞当前线程,无法真正实现异步 - 无法组合:多个
Future 之间难以组合和链式调用 - 异常处理困难:需要在每个
get() 处捕获异常 - 功能单一:只能获取结果,无法注册回调
而 CompletableFuture 解决了这些问题,提供了真正的异步编程体验。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @GetMapping("/pool4") public R<String> pool4() { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { log.info("我执行了 (CompletableFuture),正在计算..."); ThreadUtil.sleep(3, TimeUnit.SECONDS); return "计算结果:123"; }, scheduledExecutorService);
future.thenAccept(result -> { log.info("异步获得结果: {}", result); }); return R.ok("任务已提交,正在后台处理"); }
|
验证:调用 GET .../pool4,接口 立即 返回 “任务已提交,正在后台处理”。
3 秒后,控制台会打印 异步获得结果: 计算结果:123。
14.8.4. 【正确实践】并发执行多个任务
使用 CompletableFuture 可以更优雅地处理并发任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| private CompletableFuture<Integer> createAsyncTask(int i) { return CompletableFuture.supplyAsync(() -> { log.info("我执行了 (task {})", i); ThreadUtil.sleep(1, TimeUnit.SECONDS); return i; }, scheduledExecutorService); }
@GetMapping("/pool6") public R<List<Integer>> pool6() { long startTime = System.currentTimeMillis(); List<CompletableFuture<Integer>> futures = new ArrayList<>(); for (int i = 0; i < 10; i++) { futures.add(createAsyncTask(i)); } CompletableFuture<Void> allFutures = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ); CompletableFuture<List<Integer>> resultFuture = allFutures.thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); List<Integer> results = resultFuture.join(); log.info("pool6 总耗时: {} ms", (System.currentTimeMillis() - startTime)); return R.ok(results); }
|
验证:调用 GET .../pool6,接口 约 1 秒 后返回所有结果 [0,1,2,3,4,5,6,7,8,9]。控制台几乎同时打印 10 条 “我执行了 (task x)”。
14.8.5. 【进阶】异步链式调用
CompletableFuture 的强大之处在于支持链式调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @GetMapping("/pool7") public R<String> pool7() { CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { log.info("第一步:获取用户ID"); return "user123"; }, scheduledExecutorService) .thenApplyAsync(userId -> { log.info("第二步:根据用户ID {} 查询订单", userId); ThreadUtil.sleep(1, TimeUnit.SECONDS); return "订单号:ORDER-" + userId; }, scheduledExecutorService) .thenApplyAsync(orderId -> { log.info("第三步:根据 {} 查询物流信息", orderId); return orderId + " -> 已发货"; }, scheduledExecutorService) .exceptionally(ex -> { log.error("处理出错", ex); return "处理失败"; }); future.thenAccept(result -> log.info("最终结果: {}", result)); return R.ok("异步链式调用已启动"); }
|
14.8.6. 用法三:schedule(...) (延迟执行)
对于延迟任务,我们仍然使用 ScheduledExecutorService 的 schedule 方法,但可以结合 CompletableFuture:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @GetMapping("/pool8") public R<Void> pool8() { log.info("pool8 接口被调用 (主线程)");
ScheduledFuture<String> scheduledFuture = scheduledExecutorService.schedule(() -> { log.info("延迟任务执行了"); return "延迟任务的结果"; }, 3, TimeUnit.SECONDS); CompletableFuture<String> completableFuture = new CompletableFuture<>(); scheduledExecutorService.execute(() -> { try { completableFuture.complete(scheduledFuture.get()); } catch (Exception e) { completableFuture.completeExceptionally(e); } }); completableFuture.thenAccept(result -> log.info("延迟任务完成,结果: {}", result) ); return R.ok("操作成功 (立即返回)"); }
|
在现代 Java 开发中,CompletableFuture 已经成为异步编程的首选工具,它让我们能够写出既高效又优雅的异步代码。