第十四章. common-core 核心配置:ThreadPoolConfig 线程池详解

第十四章. common-core 核心配置:ThreadPoolConfig 线程池详解

摘要:本章我们将深入 RVP 的异步基石 ThreadPoolConfig。我们将从“为何需要线程池”的痛点出发,通过“饭店模型”彻底理解线程池的七大参数与执行流程。然后我们将深入 Java ThreadPoolExecutor 和 RVP ThreadPoolConfig 的源码,最后实战 executesubmitschedule 的正确用法。

在前面的章节中,我们已经深入分析了 common-coreutils 包下的所有核心工具类,从 ServletUtilsThreadLocal 原理,到 RegexValidator 的三层校验架构。我们掌握了 RVP 框架提供的“工具箱”。

common-core 的价值不止于工具类。它还提供了“配置类”,为整个框架的性能和稳定性奠定基础。

从本章开始,我们将进入 config 包,首先要面对的就是 最核心ThreadPoolConfig。线程池是后端高并发、异步化处理的基石。RVP 在 common-core 中为我们预配置了一套全局线程池,理解它、用好它,是“二开”中实现性能优化的关键。

本章学习路径

线程池深入剖析


14.1. 【二开痛点】为何需要线程池?

在我们的“二开”业务中,经常会遇到一些耗时操作,例如:

  • 发送通知:用户注册成功后,需要发送欢迎邮件或短信。
  • 记录日志:记录非核心的、详细的操作日志到数据库或日志文件。
  • 调用外部 API:调用多个(互不依赖的)第三方 API 来聚合数据。

如果我们让主线程(即处理用户 HTTP 请求的那个线程)同步 等待这些操作完成,将 极大延长 接口的响应时间,严重影响用户体验。

例如,一个“注册”接口,核心业务(写入用户表)可能只需要 50 毫秒,但发送邮件这个非核心业务需要 3 秒。如果同步执行,用户必须在浏览器前 白白等待 3 秒 才能收到“注册成功”的提示。

线程池的核心价值,就是将这些“耗时”且“非核心”的任务,从 主线程 剥离出去,交给 后台的子线程去执行,从而让主线程能够“立即返回”,实现接口“秒回”。

14.2. 【实战】同步执行的“慢”接口(pool1 案例)

我们来动手模拟这个痛点。

14.2.1. 测试准备:创建 ThreadPoolController

我们首先在 ruoyi-demo 模块中创建 ThreadPoolController,并定义一个模拟耗时 3 秒的方法。

文件路径ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/controller/ThreadPoolController.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package org.dromara.demo.controller;

import cn.hutool.core.thread.ThreadUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.R;
import org.dromara.common.doc.annotation.SaIgnore;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

/**
* 线程池实战
*/
@SaIgnore // 忽略权限认证,方便测试
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/pool")
public class ThreadPoolController {

/**
* 模拟一个耗时 3 秒的方法
*/
private void method1() {
log.info("开始执行 method1 ...");
// ThreadUtil 是 Hutool 提供的工具,可以安全地休眠
ThreadUtil.sleep(3, TimeUnit.SECONDS);
}

}

14.2.2. pool1 接口:同步循环调用

我们添加 pool1 接口,它在 主线程 中循环调用 5 次 method1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 位于 ThreadPoolController.java

@GetMapping("/pool1")
public R<Void> pool1() {
long startTime = System.currentTimeMillis();

// 在主线程中循环 5 次
for (int i = 0; i < 5; i++) {
method1();
}

long endTime = System.currentTimeMillis();
log.info("pool1 总耗时: {} ms", (endTime - startTime));

return R.ok("操作成功");
}

验证:我们启动项目,并调用 GET http://localhost:8080/pool/pool1。我们会发现,浏览器会“转圈”长达 15 秒 后才返回“操作成功”。查看控制台日志,会打印 pool1 总耗时: 15021 ms(类似这样的值)。

这个 15 秒的等待是无法接受的。method1 之间互不依赖,完全可以“并发”执行。这就是线程池的用武之地。

14.3. 【实战】异步执行的“快”接口(pool2 案例)

pool1 的问题在于主线程做了 不必要 的等待。我们现在使用 RVP 配置的线程池来解决这个问题。

14.3.1. 注入 RVP 预配置的线程池

RVP 已经在 ThreadPoolConfigcommon-core 中)为我们配置好了全局线程池(我们将在 14.8 节深入源码),我们只需要 注入 它。

在 RVP 5.x 版本的 ThreadPoolConfig 中,默认注册的 Bean 是 ScheduledExecutorService,我们通过构造函数注入它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 位于 ThreadPoolController.java

// ... import ...
import java.util.concurrent.ScheduledExecutorService;

@SaIgnore
@Slf4j
@RequiredArgsConstructor // Lombok: 自动生成包含 final 字段的构造函数
@RestController
@RequestMapping("/pool")
public class ThreadPoolController {

// 【核心】通过构造函数注入 RVP 5.x 默认配置的定时任务线程池
// 它同样可以当做普通线程池使用
private final ScheduledExecutorService scheduledExecutorService;

// ... method1() ...
// ... pool1() ...
}

14.3.2. pool2 接口:异步提交任务

现在,我们把 5 个任务“提交”给线程池,主线程则“立即返回”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 位于 ThreadPoolController.java

@GetMapping("/pool2")
public R<Void> pool2() {
long startTime = System.currentTimeMillis();

// 循环 5 次
for (int i = 0; i < 5; i++) {
// 【核心】将 method1 作为一个“任务”
// 提交 (execute) 给线程池,主线程“不等待”
// this::method1 是 "方法引用",等同于 () -> this.method1()
scheduledExecutorService.execute(this::method1);
}

long endTime = System.currentTimeMillis();
log.info("pool2 总耗时: {} ms", (endTime - startTime));

return R.ok("操作成功");
}

验证:我们重启项目,并调用 GET http://localhost:8080/pool/pool2

结果:浏览器 瞬间 返回“操作成功”。查看控制台日志,会打印 pool2 总耗时: 3 ms(或某个极小的值)。

分析

  • 主线程:在 for 循环中,只做了 5 次“提交”动作,这个动作极快。主线程 没有 等待 15 秒,而是立即返回了 R.ok()
  • 后台线程:在接下来的 15 秒里,线程池中的 子线程(例如 schedule-pool-1, schedule-pool-2…)正在后台默默地执行 5 次 method1(我们可以在控制台看到 5 次“开始执行 method1”的日志)。

我们成功地将“耗时操作”与“主流程”解耦,实现了接口的“秒回”。但这背后是如何工作的?RVP 配置的 corePoolSize 是多少?maxPoolSize 又是多少?这些参数的含义,我们必须在理论阶段搞清楚。


14.4 深入理解:线程池的资源管理机制

在上一节,我们体验了线程池带来的性能提升。但你可能注意到,创建 ThreadPoolExecutor 时需要传入一堆参数。这不是设计者故意为难我们,而是因为线程池本质上是一个 精密的资源调度系统

让我们来看一个完整的线程池创建代码,然后我们逐步理解每个参数的作用:

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3, // corePoolSize - 核心线程数
5, // maximumPoolSize - 最大线程数
60L, // keepAliveTime - 空闲线程存活时间
TimeUnit.SECONDS, // unit - 时间单位
new ArrayBlockingQueue<>(20), // workQueue - 任务队列
new CustomThreadFactory(), // threadFactory - 线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // handler - 拒绝策略
);

为什么需要这么多参数?想象一下,如果你是一家餐厅的经理,面对波动的客流,你需要考虑:平时要保留几个厨师?高峰期最多能招几个临时工?等位区要设置多少座位?当座位满了怎么办?这些决策直接影响餐厅的运营效率和成本。

线程池面临的是同样的问题:如何在 资源利用率响应速度 之间找到平衡。

14.4.1 核心参数的设计思路

我们从一个实际场景开始。假设你的系统平时每秒处理 10 个请求,高峰期可能达到 50 个。如果为每个请求创建一个线程,系统很快就会因为线程过多而崩溃。这时,线程池的参数设计就显得尤为重要。

corePoolSize(核心线程数) 决定了系统的 “基础处理能力”。设置为 3 意味着:即使没有任何任务,线程池也会保持 3 个线程随时待命。为什么要保持?因为线程的创建和销毁是有开销的,保持一定数量的常驻线程可以快速响应请求。

workQueue(工作队列) 是线程池的 “缓冲区”。当 3 个核心线程都在忙时,新任务会进入这个队列等待。选择 ArrayBlockingQueue(20) 意味着最多可以缓存 20 个任务。队列的选择非常重要:

  • ArrayBlockingQueue:有界队列,防止内存溢出
  • LinkedBlockingQueue:无界队列,可能导致 OOM(内存溢出)
  • SynchronousQueue:不存储任务,直接传递

maximumPoolSize(最大线程数) 是系统的 “极限处理能力”。当队列满了,线程池会创建新线程,直到达到这个上限。设置为 5 意味着最多同时运行 5 个线程。

14.4.2 动态扩缩容机制

线程池最精妙的设计在于它的动态扩缩容机制。让我用一段代码演示这个过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建一个可观察的线程池
AtomicInteger counter = new AtomicInteger(0);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 30L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
r -> new Thread(r, "demo-" + counter.getAndIncrement())
);

// 监控线程池状态
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
System.out.printf("活跃线程: %d, 队列任务: %d, 总完成: %d%n",
executor.getActiveCount(),
executor.getQueue().size(),
executor.getCompletedTaskCount()
);
}, 0, 1, TimeUnit.SECONDS);

// 模拟任务提交
for (int i = 1; i <= 6; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Task-" + taskId + " 开始执行");
ThreadUtil.sleep(2, TimeUnit.SECONDS);
});
System.out.println("提交了 Task-" + taskId);
ThreadUtil.sleep(2, TimeUnit.SECONDS);
}

运行这段代码,你会观察到线程池的扩容过程:

  1. Task-1、Task-2 直接被核心线程执行
  2. Task-3、Task-4 进入队列等待
  3. Task-5 提交时,队列已满,创建第 3 个线程
  4. Task-6 提交时,继续创建第 4 个线程(达到最大值)

30 秒后,如果没有新任务,非核心线程(第 3、4 个)会被自动回收。

14.5 任务执行流程:从提交到完成

理解了参数的含义,我们来看线程池处理任务的完整决策流程。这个流程体现了线程池的核心设计哲学:优先复用,按需扩展,适时收缩

线程池任务执行流程 2

线程池的五种状态像量子态一样严格转换:

image-20251118095914572

14.5.1 拒绝策略的选择

当线程池饱和时,拒绝策略决定了如何处理新任务。JDK 提供了四种内置策略:

1
2
3
4
5
6
7
8
9
10
11
// 1. AbortPolicy(默认):直接抛出异常
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

// 2. CallerRunsPolicy:调用者线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

// 3. DiscardPolicy:静默丢弃
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

// 4. DiscardOldestPolicy:丢弃最老的任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

RVP 选择了 CallerRunsPolicy,这是一种 “自我牺牲” 的策略:当线程池忙不过来时,提交任务的线程(通常是主线程)会亲自执行这个任务。这样做的好处是不会丢失任务,同时通过让调用者线程参与执行,自然地实现了一种反压机制。

14.6 优雅关闭:确保任务不丢失

线程池的关闭看似简单,实则大有学问。不当的关闭方式可能导致任务丢失或程序挂起。让我们通过实际代码理解两种关闭方式的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class ThreadPoolShutdownDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);

// 提交一些任务
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("开始执行 Task-" + taskId);
try {
Thread.sleep(2000); // 每个任务需要2秒
} catch (InterruptedException e) {
System.out.println("Task-" + taskId + " 被中断!");
return;
}
System.out.println("完成执行 Task-" + taskId);
});
}

System.out.println("所有任务已提交,队列中有 " +
executor.getQueue().size() + " 个任务等待");

// 演示 shutdown() - 优雅关闭
demonstrateShutdown(executor);

// 如果要演示 shutdownNow(),请注释上面一行,取消注释下面一行
// demonstrateShutdownNow(executor);
}

private static void demonstrateShutdown(ThreadPoolExecutor executor)
throws InterruptedException {
System.out.println("\n=== 执行 shutdown() ===");
executor.shutdown();

System.out.println("shutdown后,isShutdown=" + executor.isShutdown());
System.out.println("shutdown后,isTerminated=" + executor.isTerminated());

// 尝试提交新任务
try {
executor.execute(() -> System.out.println("新任务"));
} catch (RejectedExecutionException e) {
System.out.println("新任务被拒绝:" + e.getMessage());
}

// 等待所有任务完成
boolean terminated = executor.awaitTermination(30, TimeUnit.SECONDS);
System.out.println("所有任务完成:" + terminated);
}

private static void demonstrateShutdownNow(ThreadPoolExecutor executor) {
System.out.println("\n=== 执行 shutdownNow() ===");
List<Runnable> pendingTasks = executor.shutdownNow();

System.out.println("被取消的任务数:" + pendingTasks.size());
System.out.println("注意观察哪些任务被中断了");
}
}

运行这段代码,你会发现:

  • shutdown() 会等待所有已提交的任务执行完成,包括队列中的任务
  • shutdownNow() 会立即中断正在执行的任务,并返回队列中未执行的任务列表

14.6.1 RVP 的优雅停机策略

在实际项目中,我们通常会结合两种关闭方式,实现 “先礼后兵” 的优雅停机:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@PreDestroy
public void destroy() {
log.info("开始关闭线程池...");

// 第一步:停止接收新任务,等待现有任务完成
scheduledPool.shutdown();
asyncPool.shutdown();

try {
// 第二步:等待一段时间(给任务完成的机会)
if (!scheduledPool.awaitTermination(120, TimeUnit.SECONDS)) {
// 第三步:超时后强制关闭
log.warn("等待超时,强制关闭定时任务线程池");
scheduledPool.shutdownNow();
}

if (!asyncPool.awaitTermination(120, TimeUnit.SECONDS)) {
log.warn("等待超时,强制关闭异步任务线程池");
asyncPool.shutdownNow();
}
} catch (InterruptedException e) {
// 第四步:处理中断异常
log.error("关闭线程池被中断");
scheduledPool.shutdownNow();
asyncPool.shutdownNow();
Thread.currentThread().interrupt();
}

log.info("线程池关闭完成");
}

这种策略确保了:

  1. 正常情况下,所有任务都能执行完成
  2. 异常情况下(如等待超时),也能强制终止,避免程序无法退出
  3. 通过日志记录关闭过程,便于问题排查

理解了线程池的工作原理和关闭机制,下一节我们将分析 RVP 项目中 ThreadPoolConfig 的具体实现,看看这些理论知识是如何应用到实际项目中的。


14.7. RVP ThreadPoolConfig 源码解析

经过前面的理论学习,我们已经彻底理解了线程池的、七大参数、执行流程和关闭策略。

现在,让我们带着这些理论知识,深入 RVP 框架的 ThreadPoolConfig 源码,看看 RVP 5.x 版本的线程池是如何在实际项目中配置的。

文件路径ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/config/ThreadPoolConfig.java

我们首先看类的声明和核心字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@AutoConfiguration
@EnableConfigurationProperties(ThreadPoolProperties.class)
public class ThreadPoolConfig {

/**
* 核心线程数 = cpu 核心数 + 1
*/
private final int core = Runtime.getRuntime().availableProcessors() + 1;

private ScheduledExecutorService scheduledExecutorService;

// ... (Bean 和 destroy 方法)
}

这里有几个关键信息:

  1. @EnableConfigurationProperties(ThreadPoolProperties.class):这个注解会激活 ThreadPoolProperties 类,使其从 application.yml 文件中读取以 thread.pool 为前缀的配置(例如 keepAliveSeconds, queueCapacity,尽管在 5.x 默认配置中未直接使用)。
  2. private final int core = ...:RVP 没有 将核心线程数硬编码,而是动态获取了 当前服务器 CPU 的核心数availableProcessors()),并 加 1

为什么是 CPU + 1
这是一种常见的优化策略。CPU 密集型任务(如纯计算)设置为 CPU 核心数即可最大化利用率。但我们的后端任务通常还包含 IO 等待(如读写数据库、调用 API)。+ 1 是为了让 CPU 在处理 IO 等待的间隙,有额外的线程可以切换,从而最大化 CPU 的利用率。

14.7.1. RVP 5.x 的核心 Bean:ScheduledExecutorService

在 RVP 4.x 和早期 5.x 版本中,ThreadPoolConfig 会注册一个 ThreadPoolTaskExecutor。但在 最新的 RVP 5.x 源码 中,该配置已被移除,默认 提供了一个 Bean:ScheduledExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 执行周期性或定时任务
*/
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduledExecutorService() {
// 1. 构建线程工厂
BasicThreadFactory.Builder builder = new BasicThreadFactory.Builder()
.daemon(true); // 设置为守护线程

if (SpringUtils.isVirtual()) {
// 2. (JDK 21+) 支持虚拟线程
builder.namingPattern("virtual-schedule-pool-%d")
.wrappedFactory(new VirtualThreadTaskExecutor().getVirtualThreadFactory());
} else {
// 3. 传统平台线程
builder.namingPattern("schedule-pool-%d");
}

// 4. 创建 ScheduledThreadPoolExecutor 实例
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
core, // 核心线程数 (CPU+1)
builder.build(), // 线程工厂 (带命名)
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 5. 异常处理钩子
super.afterExecute(r, t);
printException(r, t);
}
};
this.scheduledExecutorService = scheduledThreadPoolExecutor;
return scheduledThreadPoolExecutor;
}

这段代码是 RVP 线程池配置的精华所在,我们来逐一解析:

  1. 为何只配 ScheduledExecutorService

    • 这是一个 高复用 的选择。ScheduledExecutorService(定时任务线程池)完全兼容 普通线程池的功能(execute, submit),同时 额外提供schedule(延迟执行)和 scheduleAtFixedRate(周期执行)的能力。RVP 5.x 通过只提供这一个 Bean,满足了绝大多数异步和定时的场景。
  2. 线程工厂 (BasicThreadFactory.Builder)

    • daemon(true):将线程设置为“守护线程”。这意味着当 JVM 主线程(Spring Boot 应用)退出时,这些线程池线程会 自动关闭,不会阻止程序停止。
    • namingPattern("schedule-pool-%d"):为线程命名。这 极其重要,当我们在日志(Log)中看到 [schedule-pool-1] 时,就能立刻知道这个日志是由该线程池打印的,极大方便了调试。
    • 虚拟线程支持:RVP 5.x 还前瞻性地支持了 JDK 21 的虚拟线程。
  3. 拒绝策略 (CallerRunsPolicy)

    • 这是我们在 14.5 节中提到的 调用者执行策略
    • 在四种策略中,CallerRunsPolicy 是最稳妥的策略。当线程池和队列都饱和时,它会 把这个新任务交还给提交任务的线程(通常是主线程/接口线程)去自己执行
    • 这虽然会短暂地 阻塞 主线程(接口变慢),但它保证了 任务 100% 不会丢失,并且通过“反压”机制,自然地减缓了任务提交的速度。
  4. afterExecute 异常处理钩子

    • 这是最精妙的一点。
    • 痛点:如果我们使用 executor.execute(Runnable) 提交任务,Runnablerun() 方法是不允许 throws 异常的。如果任务内部发生 RuntimeException,这个异常会在子线程中“静默地 死掉,主线程 永远不会知道。
    • RVP 解决方案:RVP 重写了 afterExecute 方法,并在内部调用 printException(r, t)printException 会检查任务(Future<?>)是否在执行中抛出了异常(ExecutionException),如果抛了,它会把异常打印到日志中
    • 结论:这个钩子确保了即使是“即发即忘”的 execute 任务,其 异常也会被捕获并记录,防止了“静默失败”。

14.7.2. @PreDestroy 与 RVP 的优雅停机

ThreadPoolConfig 中还包含了 destroy() 方法,它利用 @PreDestroy 注解,实现了我们在 14.6 节中讲到的“优雅停机”逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 销毁事件
*/
@PreDestroy
public void destroy() {
try {
log.info("====关闭后台任务任务线程池====");
ScheduledExecutorService pool = scheduledExecutorService;
if (pool != null && !pool.isShutdown()) {
// 1. (优雅关闭) 先停止接收新任务,等待已提交任务执行
pool.shutdown();
try {
// 2. 等待 120 秒
if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
// 3. (强制关闭) 超时,强制中断所有任务
pool.shutdownNow();
if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
log.info("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
// 4. (处理中断)
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

分析@PreDestroy 注解保证了在 Spring Boot 应用关闭时,destroy 方法会被自动调用,从而安全地关闭 scheduledExecutorService,确保数据尽可能不丢失。


14.8. 【二开实战】线程池的三种正确用法

理解了 RVP 的配置思路,我们就可以在 “二开” 中安全、高效地使用这个由 ThreadPoolConfig 注册的 scheduledExecutorService Bean 了。

14.8.1. 测试准备:注入 ScheduledExecutorService

我们确保 ThreadPoolController 已经注入了 RVP 提供的 Bean:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 位于 ThreadPoolController.java
// ...
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.CompletableFuture;

@RequiredArgsConstructor
@RestController
@RequestMapping("/pool")
public class ThreadPoolController {

private final ScheduledExecutorService scheduledExecutorService;
// ...
}

14.8.2. 用法一:execute(Runnable) (无返回值)

这是最简单的用法,适用于 不关心结果 的 “即发即忘” 任务,例如记录日志、发送通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 位于 ThreadPoolController.java

@GetMapping("/pool3")
public R<Void> pool3() {
// 使用 Lambda 表达式定义一个 Runnable 任务
Runnable task = () -> {
log.info("我执行了 (execute),线程名: {}", Thread.currentThread().getName());
};

// 提交任务
scheduledExecutorService.execute(task);

return R.ok("操作成功");
}

验证:调用 GET .../pool3,接口立即返回 “操作成功”。控制台会(在子线程 schedule-pool-x 中)打印出 我执行了 (execute)...

14.8.3. 用法二:CompletableFuture (有返回值)

当我们需要从异步任务中 获取一个计算结果 时,应该使用 CompletableFuture,它是 Java 8+ 提供的更强大的异步编程工具。

为什么不使用 Future

传统的 Future 有以下局限性:

  • 只能阻塞等待get() 方法会阻塞当前线程,无法真正实现异步
  • 无法组合:多个 Future 之间难以组合和链式调用
  • 异常处理困难:需要在每个 get() 处捕获异常
  • 功能单一:只能获取结果,无法注册回调

CompletableFuture 解决了这些问题,提供了真正的异步编程体验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 位于 ThreadPoolController.java

@GetMapping("/pool4")
public R<String> pool4() {

// 1. 创建一个异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log.info("我执行了 (CompletableFuture),正在计算...");
ThreadUtil.sleep(3, TimeUnit.SECONDS);
return "计算结果:123";
}, scheduledExecutorService); // 使用我们的线程池

// 2. 【推荐】注册异步回调,不阻塞主线程
future.thenAccept(result -> {
log.info("异步获得结果: {}", result);
});

// 3. 接口立即返回,不等待计算完成
return R.ok("任务已提交,正在后台处理");
}

验证:调用 GET .../pool4,接口 立即 返回 “任务已提交,正在后台处理”。
3 秒后,控制台会打印 异步获得结果: 计算结果:123

14.8.4. 【正确实践】并发执行多个任务

使用 CompletableFuture 可以更优雅地处理并发任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 位于 ThreadPoolController.java

// 模拟耗时 1 秒的任务,返回 int
private CompletableFuture<Integer> createAsyncTask(int i) {
return CompletableFuture.supplyAsync(() -> {
log.info("我执行了 (task {})", i);
ThreadUtil.sleep(1, TimeUnit.SECONDS);
return i;
}, scheduledExecutorService);
}

/**
* 【正确实践】:使用 CompletableFuture 并发执行
*/
@GetMapping("/pool6")
public R<List<Integer>> pool6() {
long startTime = System.currentTimeMillis();

// 创建 10 个异步任务
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
futures.add(createAsyncTask(i));
}

// 等待所有任务完成并收集结果
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);

// 当所有任务完成后,收集结果
CompletableFuture<List<Integer>> resultFuture = allFutures.thenApply(v ->
futures.stream()
.map(CompletableFuture::join) // 此时不会阻塞,因为任务都已完成
.collect(Collectors.toList())
);

// 获取最终结果(这里会阻塞,但仅等待最慢的任务)
List<Integer> results = resultFuture.join();

log.info("pool6 总耗时: {} ms", (System.currentTimeMillis() - startTime)); // 约 1 秒
return R.ok(results);
}

验证:调用 GET .../pool6,接口 约 1 秒 后返回所有结果 [0,1,2,3,4,5,6,7,8,9]。控制台几乎同时打印 10 条 “我执行了 (task x)”。

14.8.5. 【进阶】异步链式调用

CompletableFuture 的强大之处在于支持链式调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@GetMapping("/pool7")
public R<String> pool7() {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
log.info("第一步:获取用户ID");
return "user123";
}, scheduledExecutorService)
.thenApplyAsync(userId -> {
log.info("第二步:根据用户ID {} 查询订单", userId);
ThreadUtil.sleep(1, TimeUnit.SECONDS);
return "订单号:ORDER-" + userId;
}, scheduledExecutorService)
.thenApplyAsync(orderId -> {
log.info("第三步:根据 {} 查询物流信息", orderId);
return orderId + " -> 已发货";
}, scheduledExecutorService)
.exceptionally(ex -> {
log.error("处理出错", ex);
return "处理失败";
});

// 注册完成后的回调
future.thenAccept(result -> log.info("最终结果: {}", result));

return R.ok("异步链式调用已启动");
}

14.8.6. 用法三:schedule(...) (延迟执行)

对于延迟任务,我们仍然使用 ScheduledExecutorServiceschedule 方法,但可以结合 CompletableFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@GetMapping("/pool8")
public R<Void> pool8() {
log.info("pool8 接口被调用 (主线程)");

// 延迟 3 秒后执行,并返回 CompletableFuture
ScheduledFuture<String> scheduledFuture = scheduledExecutorService.schedule(() -> {
log.info("延迟任务执行了");
return "延迟任务的结果";
}, 3, TimeUnit.SECONDS);

// 将 ScheduledFuture 转换为 CompletableFuture 以便链式调用
CompletableFuture<String> completableFuture = new CompletableFuture<>();
scheduledExecutorService.execute(() -> {
try {
completableFuture.complete(scheduledFuture.get());
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
});

// 注册回调
completableFuture.thenAccept(result ->
log.info("延迟任务完成,结果: {}", result)
);

return R.ok("操作成功 (立即返回)");
}

在现代 Java 开发中,CompletableFuture 已经成为异步编程的首选工具,它让我们能够写出既高效又优雅的异步代码。