Java(七):7.0 Java并发编程(下)

7.4 [实战] 线程池与异步编程

在前面的章节中,我们学习了如何创建和管理单个线程 (new Thread())。然而,在真实世界的服务端应用中,为每一个到来的请求或任务都创建一个新线程,是一种非常原始且危险的做法。因为线程的创建和销毁是重量级操作,会消耗大量的系统资源。如果请求量巨大,无限制地创建线程会迅速耗尽服务器资源,导致系统崩溃。

为了解决这个问题,线程池 应运而生。它是现代并发编程的基石,也是管理线程的最佳实践。

7.4.1 [核心] 为什么要使用线程池?

简单来说,手动管理线程的方式既“昂贵”又“失控”。而线程池通过对线程的复用和统一管理,带来了三大核心优势。

1. 降低资源消耗

  • 核心思想: 复用 已创建的线程,避免了频繁创建和销毁线程所带来的高昂开销。
  • 讲解:
    • 线程的创建和销毁涉及到与操作系统内核的交互、内存栈的分配与回收,这些都是成本很高的操作。
    • 线程池在启动时会预先创建一定数量的线程,当任务到来时,直接从池中取一个空闲线程来执行。任务结束后,线程并不会被销毁,而是归还给池子,等待下一个任务。
  • 比喻: 就像开一家餐厅,线程池相当于雇佣了一批固定的厨师团队(核心员工)。而不是每来一位客人就临时招聘一位厨师,客人吃完再将其解雇。

2. 提高响应速度

  • 核心思想: 消除了线程创建的延迟。
  • 讲解:
    • 当一个新任务到来时,如果需要临时创建一个新线程,这个过程是需要时间的。
    • 而使用线程池,任务可以直接交给一个处于等待状态的空闲线程执行,省去了创建线程的步骤,从而让任务能够更快地得到处理,提升了系统的响应能力。
  • 比喻: 当客人点餐后,待命的厨师可以立即开始炒菜,而无需等待 HR 完成招聘流程。

3. 提高线程的可管理性

  • 核心思想: 对线程进行统一的分配、调优和监控。
  • 讲解:
    • 如果任由代码随意创建线程,这些线程会散落在应用的各个角落,处于一种“失控”状态,难以管理。
    • 线程池作为一个中央管理器,为我们提供了强大的控制能力:
      • 控制并发数: 可以精确控制池中核心线程数和最大线程数,防止因线程过多而耗尽系统资源。
      • 统一监控: 可以方便地获取线程池的运行状态,如活动线程数、任务队列大小、已完成任务数等,便于监控和调优。
      • 统一管理: 可以统一设置线程的属性(如线程名、是否为守护线程),并能安全、平滑地关闭整个线程池。

结论: 在任何需要处理大量异步任务或并发请求的场景下,使用线程池都不仅仅是一种优化,而是一种必需。它是构建健壮、高性能并发系统的标准做法。接下来,我们将深入探索线程池的核心实现——ThreadPoolExecutor


7.4.2 [核心] ThreadPoolExecutor 详解

ThreadPoolExecutor 是 JUC 线程池框架中最核心、最底层的实现类。我们日常使用的 Executors 工具类创建的各种线程池,其内部几乎都是 ThreadPoolExecutor 的实例。因此,要精通线程池,就必须从理解它的构造函数和核心参数开始。

[高频][面试题] 七大核心参数

ThreadPoolExecutor 最常用的构造函数有七个参数,每一个都深刻影响着线程池的行为。理解它们的含义是面试的绝对高频考点。

1
2
3
4
5
6
7
8
new ThreadPoolExecutor(
4, // 核心线程数
8, // 最大线程数
60L, // 线程空闲时间
TimeUnit.SECONDS, // 线程空闲时间单位
new LinkedBlockingQueue<>(1024), // 线程队列
new ThreadPoolExecutor.DiscardOldestPolicy() // 拒绝策略
);
参数 (Parameter)类型 (Type)核心作用
corePoolSizeint核心线程数。线程池中保持存活的线程数,即使它们是空闲的。
maximumPoolSizeint最大线程数。线程池能容纳的同时执行的线程最大数量。
keepAliveTimelong空闲线程存活时间。当线程数大于 corePoolSize 时,多余的空闲线程在被销毁前等待新任务的最长时间。
unitTimeUnitkeepAliveTime 的时间单位(如秒、毫秒)。
workQueueBlockingQueue<Runnable>工作队列。用于存放等待执行的任务的阻塞队列。
threadFactoryThreadFactory线程工厂。用于创建新线程。可自定义线程名、是否为守护线程等。
handlerRejectedExecutionHandler拒绝策略。当队列和线程池都满了,无法处理新任务时所采取的策略。

线程池工作流程

image-20251118133743304

当一个新任务通过 execute() 方法提交给 ThreadPoolExecutor 时,它会遵循以下决策路径:

  1. 判断核心线程数:检查当前运行的线程数是否小于 corePoolSize
    • :直接创建一个新的 核心线程 来执行该任务,即使池中有其他空闲线程。
    • :进入步骤 2。
  2. 尝试加入工作队列:尝试将任务添加到 workQueue 中。
    • 成功:任务进入队列等待被空闲线程执行。
    • 失败(队列已满):进入步骤 3。
  3. 判断最大线程数:检查当前运行的线程数是否小于 maximumPoolSize
    • :创建一个新的 非核心线程(也叫“救急线程”)来执行该任务。
    • :进入步骤 4。
  4. 执行拒绝策略:当前线程数已达到最大值,且队列也已满。此时线程池已超负荷,必须通过指定的 RejectedExecutionHandler 来拒绝该任务。

工作队列 workQueue 的选择

workQueue 的选择对线程池的行为有决定性影响:

  • ArrayBlockingQueue: 基于数组的有界阻塞队列。必须指定容量。当队列满了之后,会触发创建非核心线程,直到达到 maximumPoolSize。有助于防止资源耗尽。
  • LinkedBlockingQueue: 基于链表的阻塞队列。如果构造时不指定容量,则默认为 Integer.MAX_VALUE,相当于一个 无界队列
    • 注意: 使用无界队列时,任务会一直被添加到队列中,导致 maximumPoolSize 参数 失效,因为线程数永远不会超过 corePoolSize。如果任务生产速度远超消费速度,可能导致内存溢出(OOM)。
  • SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等待一个相应的移除操作。它会直接将任务“递交”给一个线程。如果没有空闲线程,就会触发创建新线程(直到 maximumPoolsize),因此适合处理大量、耗时短的瞬时任务。Executors.newCachedThreadPool() 就使用了它。
  • PriorityBlockingQueue: 一个支持优先级排序的无界队列。任务会根据其优先级被执行。
workQueue 类型特点说明适用场景
ArrayBlockingQueue基于数组的有界阻塞队列,必须指定容量,队列满后会创建非核心线程需要控制资源、防止内存溢出的场景
LinkedBlockingQueue基于链表的阻塞队列,默认无界(Integer.MAX_VALUE),可能导致 OOM任务量可控、不关心线程数上限的场景
SynchronousQueue不存储元素,每个插入操作必须等待移除,直接将任务 “递交” 给线程处理大量短时任务、需要快速响应的场景
PriorityBlockingQueue支持优先级排序的无界队列,任务按优先级执行需要按优先级处理任务的场景

拒绝策略 RejectedExecutionHandler

拒绝策略类型特点说明适用场景
ThreadPoolExecutor.AbortPolicy (默认)直接抛出 RejectedExecutionException 异常,阻止系统正常工作。适用于任务非常重要,不允许丢失,且出现异常时希望立即得到通知的场景。
ThreadPoolExecutor.CallerRunsPolicy“调用者运行”策略。该任务不会被丢弃,也不会被线程池执行,而是由提交该任务的线程(调用 execute 的线程)自己来执行。这是一种有效的“反压”机制,可以减慢任务提交者的速度。当希望限制任务提交者的速率,防止提交过多任务导致线程池过载时使用。
ThreadPoolExecutor.DiscardPolicy直接静默地丢弃任务,不抛出任何异常。适用于任务不重要,即使丢失也不会对系统产生重大影响,并且希望最大化吞吐量的场景。
ThreadPoolExecutor.DiscardOldestPolicy丢弃工作队列队首的(最旧的)一个任务,然后重新尝试提交当前任务。适用于希望优先执行最新提交的任务,并且可以容忍丢失部分旧任务的场景。

7.4.3 [避坑指南] 为什么不推荐使用 Executors 工具类?

Executors 工具类提供了一系列静态工厂方法,如 newFixedThreadPool(), newCachedThreadPool() 等,它们能够让我们用一行代码就创建一个线程池,看起来非常方便。然而,这种便利背后隐藏着巨大的风险,尤其是在生产环境中。

核心论点: 阿里巴巴《Java 开发手册》等业界权威规范中,都 强制要求 开发者通过 ThreadPoolExecutor 的构造函数来创建线程池,而不是使用 Executors

两大主要隐患:OOM(内存溢出)风险

Executors 工厂方法创建的线程池,其内部参数配置存在“陷阱”,可能导致在特定场景下耗尽系统资源。

1. newFixedThreadPoolnewSingleThreadExecutor 的风险

  • 内部实现:

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
  • 问题根源: 它们都使用了一个 无界的 LinkedBlockingQueue(默认容量为 Integer.MAX_VALUE,约 21 亿)。

  • 潜在风险: 如果任务的生产速度持续快于线程池的处理速度,任务就会在队列中无限堆积。最终,这将耗尽应用的所有堆内存,导致 OutOfMemoryError,使整个应用崩溃。

2. newCachedThreadPoolnewScheduledThreadPool 的风险

  • 内部实现:

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
  • 问题根源: 它们允许创建的线程数量上限为 Integer.MAX_VALUE

  • 潜在风险: newCachedThreadPool 的设计是来一个任务就创建一个新线程(如果没有空闲线程)。如果瞬间涌入大量请求,线程池就会尝试创建海量的线程。每个线程都需要消耗一定的栈内存(通常是 1MB 左右)。这会迅速耗尽 JVM 进程的可用内存,导致 OutOfMemoryError: unable to create new native thread,同样会导致系统崩溃。

最佳实践:手动创建 ThreadPoolExecutor

规避上述风险的唯一可靠方法,就是放弃 Executors 的便利,回归到 ThreadPoolExecutor 的构造函数,手动指定每一个参数。

这样做的好处是 “所见即所得”,强迫我们开发者对线程池的每一个行为细节进行深入思考和控制。

  • 代码示例:一个配置合理的线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    package com.example;

    import java.util.concurrent.*;

    public class ThreadPoolBestPractice {

    public static ExecutorService createMyThreadPool() {
    // 获取CPU核心数作为参考
    int corePoolSize = Runtime.getRuntime().availableProcessors();
    int maximumPoolSize = corePoolSize * 2;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;

    // 关键:使用有界队列来防止资源耗尽
    BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(200);

    // 建议:使用自定义的ThreadFactory,便于给线程命名,方便排查问题
    ThreadFactory threadFactory = r -> new Thread("my-pool-" + r.hashCode());

    // 关键:选择一个合适的拒绝策略来处理过载任务
    RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();

    // 手动创建ThreadPoolExecutor
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize,
    maximumPoolSize,
    keepAliveTime,
    unit,
    workQueue,
    threadFactory,
    handler
    );

    System.out.println("自定义线程池创建成功!");
    return executor;
    }
    }

结论: Executors 工具类是学习和简单测试时的“好朋友”,但对于严肃的生产级应用,它却是“危险的敌人”。养成手动创建和配置 ThreadPoolExecutor 的习惯,是每一位专业 Java 程序员必备的素养。


7.4.4 [进阶] 带返回值的任务:Callable 与 Future

到目前为止,我们提交给线程池的任务都是 Runnable,它的 run() 方法没有返回值。那么,如果我们需要异步执行一个任务,并获取其执行结果,应该怎么做呢?JUC 为此提供了 CallableFuture

Callable<V> 接口

Callable 接口可以看作是 Runnable 的增强版,它弥补了 Runnable 的两个核心短板。

  • Runnable 的局限:

    1. run() 方法没有返回值,无法获取任务执行结果。
    2. run() 方法不能抛出受检异常,异常处理很麻烦。
  • Callable 的增强:
    Callable 是一个泛型接口,其唯一的 call() 方法签名如下:

    1
    V call() throws Exception;
    1. 可以有返回值: 方法返回一个 V 类型的结果。
    2. 可以抛出异常: 允许在方法签名中声明抛出异常,使得异常处理更加直接。
Future<V> 接口

当你将一个 Callable 任务提交给线程池时,由于任务是异步执行的,你不可能立即拿到结果。线程池会立刻返回一个 Future 对象。

Future 对象就像是一张“提货单”或者一个“承诺”,它代表了未来某个时刻将会完成的任务的结果。我们可以通过这张“提货单”在未来的任意时刻去查询任务状态或提取最终结果。

  • Future 的核心方法:
方法 (Method)核心作用行为说明
V get()获取 异步任务的执行结果。阻塞式。如果任务尚未完成,调用此方法的线程会一直阻塞,直到拿到结果。
V get(long timeout, TimeUnit unit)带超时 地获取结果。在指定时间内阻塞等待。如果超时任务仍未完成,会抛出 TimeoutException
boolean isDone()判断任务是否已 完成非阻塞。可以用来轮询任务状态,避免因调用 get() 而无限期阻塞。
boolean cancel(boolean mayInterrupt)尝试 取消 任务。如果任务已完成或已被取消,则失败。否则成功取消。mayInterrupt 参数表示是否要中断正在执行该任务的线程。
boolean isCancelled()判断任务是否已被 取消非阻塞
三者关系与代码示例

ExecutorServicesubmit() 方法可以将一个 Callable 任务提交到线程池,并返回一个 Future 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.*;

public class CallableFutureExample {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 创建一个线程池
ExecutorService executor = Executors.newSingleThreadExecutor();

// 2. 创建一个Callable任务,该任务会计算1到100的和
Callable<Integer> myTask = () -> {
System.out.println("子线程 " + Thread.currentThread().getName() + " 开始计算...");
TimeUnit.SECONDS.sleep(2); // 模拟耗时计算
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
};

// 3. 提交Callable任务,获取Future对象(“提货单”)
System.out.println("主线程:提交任务给线程池。");
Future<Integer> future = executor.submit(myTask);

// 4. 主线程可以继续做其他事情...
System.out.println("主线程:我先去忙别的了...");
TimeUnit.SECONDS.sleep(1);
System.out.println("主线程:忙完了,现在来看看子线程的结果。");

// 5. 通过Future.get()获取结果(如果任务没执行完,这里会阻塞)
// ExecutionException 会包装Callable中抛出的原始异常
Integer result = future.get();
System.out.println("主线程:拿到任务执行结果 -> " + result);

// 6. 关闭线程池
executor.shutdown();
}
}
  • 关于 FutureTask: 这是一个很巧妙的类,它同时实现了 RunnableFuture 接口。因此,你可以将一个 Callable 包装成 FutureTask,然后像 Runnable 一样提交给线程池,同时这个 FutureTask 对象本身就可以用来获取结果。

Future 的局限性

尽管 FutureCallable 解决了有返回值和异常处理的问题,但它自身的设计也存在明显的局限性,这也是后来 CompletableFuture 诞生的原因。

  1. 阻塞式获取结果: Future 的主要缺点是它的 get() 方法是 阻塞的。这使得异步编程的优势大打折扣。虽然我们的任务是异步执行的,但为了获取结果,主线程往往还是得停下来等待,整个流程又变成了“同步”模式。

  2. 缺乏完成回调: 我们无法为 Future 任务的完成注册一个回调函数。也就是说,我们不能方便地实现“当任务完成后,自动执行下一个动作”这样的逻辑。我们只能通过 isDone() 轮询或者 get() 阻塞的方式来被动地等待任务完成。

  3. 组合能力弱: 对于多个 Future 任务,我们很难实现复杂的组合。比如,“等待两个 Future 都完成后,将它们的结果合并处理”,或者“等待多个 Future 中任意一个完成后就继续”等场景,Future 接口本身并未提供优雅的支持。

这些局限性,促使了 Java 8 中更强大的现代异步编程工具——CompletableFuture 的诞生。


7.4.5 [Java 8+] 现代异步编程:CompletableFuture

前情回顾与引出

在上一节我们看到,Future 只能通过 get() 阻塞等待结果,这就像发了快递后只能傻站在门口等快递员。而本节要介绍的 CompletableFuture(后文简称 CF),则让我们可以 “登记个电话号码,快递到了自动通知”,彻底摆脱阻塞等待的困境。

本节学习路径

我们会按照这样的顺序展开:首先理解 CF 相比 Future 的核心改进,然后掌握它的三大能力(创建任务、处理结果、组合任务),最后通过一个服务编排的实战案例将知识串联起来。


一、CF 是什么:从被动容器到主动装配线

如果说 Future 仅仅是一个异步结果的被动容器,那么 CompletableFuture 则是一个功能完备、可主动编排的异步任务 “装配线”。我们先用一个对比表格直观感受两者的差异:

对比维度FutureCompletableFuture
获取结果方式只能调用 get() 阻塞等待注册回调函数,任务完成后自动执行
任务编排能力无,只能拿到单个任务的结果可以链式调用、串联、组合多个任务
异常处理需要手动 try-catch提供 exceptionallyhandle 等方法
适用场景简单的 “提交-等待” 场景复杂的异步流程编排

CF 的核心设计理念,就是用一种非阻塞、事件驱动的方式来处理异步结果。这里的 “回调函数” 指的是我们预先注册的一段代码,当异步任务完成时,线程池会自动调用这段代码,而主线程在此期间可以继续处理其他事情。我们通过代码对比来感受这种差异:

1
2
3
4
5
6
7
8
9
// 传统 Future 的方式:主线程被阻塞
Future<String> future = executor.submit(() -> fetchDataFromRemote());
String data = future.get(); // 主线程卡在这里,啥也干不了
processData(data);

// CompletableFuture 的方式:注册回调,主线程不阻塞
CompletableFuture.supplyAsync(() -> fetchDataFromRemote(), executor)
.thenAccept(data -> processData(data)); // 注册回调函数
// 主线程继续执行,不会被卡住

在第二种方式中,thenAccept 注册的回调函数会在数据获取完成后自动被调用,整个过程主线程不会被阻塞。


二、创建异步任务:选对方法,配好线程池

创建 CompletableFuture 通常使用两个静态工厂方法:

方法选择:有无返回值

1
2
3
4
5
6
7
8
9
// 1. supplyAsync:用于有返回值的任务
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
return "返回一个字符串";
});

// 2. runAsync:用于无返回值的任务(仅执行动作)
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
System.out.println("执行一个动作,不返回结果");
});

这两个方法的选择很简单:如果你的任务需要返回结果供后续处理,用 supplyAsync;如果只是执行某个动作(比如发送通知、记录日志),用 runAsync

线程池配置:为什么必须自定义 上述代码中,我们没有指定线程池参数。在这种情况下,CF 会使用 JVM 的全局线程池 ForkJoinPool.commonPool()。但这在生产环境中是一个隐患:

1
2
3
4
5
6
7
8
9
// 问题场景演示
// 假设你的业务代码这样写
CompletableFuture.supplyAsync(() -> callSlowAPI()); // 使用默认线程池

// 同时,你的代码中还有并行流
list.parallelStream().map(...).collect(...); // 也使用 commonPool

// 问题:两者共用同一个线程池,会互相干扰!
// 如果 callSlowAPI 耗时很长,会导致并行流也变慢

ForkJoinPool.commonPool() 是 JVM 内置的一个通用工作窃取线程池,它同时被并行流(parallel stream)、CF 等多个框架共享。如果你的业务任务(比如调用远程服务)耗时较长,会占用这个池中的线程,导致并行流等其他功能性能下降。因此强烈建议为业务任务创建专用的线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 推荐做法:为业务创建专用线程池
ExecutorService bizExecutor = Executors.newFixedThreadPool(10, new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("biz-async-" + counter.incrementAndGet());
return t;
}
});

// 明确指定线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟调用远程服务
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "远程数据";
}, bizExecutor); // 使用自定义线程池

三、结果处理与转换:then 系列方法的三兄弟

这是 CF 最强大的地方。它提供了一系列 then* 方法,让我们可以像搭积木一样串联异步操作。这些方法的核心区别在于它们对上一步结果的处理方式。

方法对比与选择

方法作用参数类型返回类型典型使用场景
thenApply转换结果Function<T, U>CompletableFuture<U>需要对结果进行转换,如字符串转整数
thenAccept消费结果Consumer<T>CompletableFuture<Void>拿到结果后执行操作,不需要返回新值
thenRun执行动作RunnableCompletableFuture<Void>不关心上一步结果,只要上一步完成就执行

我们通过一个完整的例子来演示这三个方法的使用:

场景:从远程获取一个商品 ID 字符串,将其转换为整数,然后乘以价格系数,最后打印结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ExecutorService executor = Executors.newFixedThreadPool(3);

CompletableFuture.supplyAsync(() -> {
// 步骤1:模拟从远程服务获取商品ID
System.out.println("[步骤1] 线程 " + Thread.currentThread().getName() + " 获取商品ID");
return "12345";
}, executor)
.thenApply(idStr -> {
// 步骤2:将字符串ID转换为整数
// 这里使用 thenApply 是因为需要返回转换后的结果
System.out.println("[步骤2] 线程 " + Thread.currentThread().getName() + " 转换ID为整数");
return Integer.parseInt(idStr);
})
.thenApply(id -> {
// 步骤3:将ID乘以价格系数10
// 继续使用 thenApply 进行计算
System.out.println("[步骤3] 线程 " + Thread.currentThread().getName() + " 计算价格");
return id * 10;
})
.thenAccept(price -> {
// 步骤4:打印最终价格
// 使用 thenAccept 因为这一步只是输出,不需要返回值
System.out.println("[步骤4] 线程 " + Thread.currentThread().getName() + " 最终价格: " + price);
})
.thenRun(() -> {
// 步骤5:记录处理完成日志
// 使用 thenRun 因为这一步不需要上一步的结果
System.out.println("[步骤5] 线程 " + Thread.currentThread().getName() + " 处理流程结束");
});

executor.shutdown();

上面的代码中,我们将一个完整的业务流程拆解为 5 个步骤,每个步骤都通过链式调用串联起来。注意我们没有调用任何 get() 方法,整个流程是非阻塞的。

关于 Async 变体:什么时候需要切换线程池

细心的你可能注意到,每个 then* 方法都有对应的 *Async 版本(如 thenApplyAsyncthenAcceptAsync)。它们的区别在于任务执行的线程:

1
2
3
CompletableFuture.supplyAsync(() -> "数据", executor)
.thenApply(s -> s.toUpperCase()) // 在完成 supplyAsync 的那个线程中执行
.thenApplyAsync(s -> s + "!", executor); // 重新提交到 executor 线程池执行

什么时候需要使用 *Async 版本?

  1. 当前回调任务很重:如果 thenApply 中的操作很耗时,使用 thenApplyAsync 可以避免阻塞完成通知的线程。
  2. 需要切换线程上下文:比如前面的任务在 IO 线程池执行,而后续任务需要在计算线程池执行。

大多数情况下,如果回调操作很轻量(比如简单的类型转换),直接用 thenApply 即可,可以减少线程切换开销。


四、组合多个任务:实现复杂的业务编排

在实际业务中,我们经常需要同时发起多个异步调用,然后将结果汇总。CF 提供了一系列组合方法来满足不同的编排需求。

场景 1:AND 关系 - 两个任务都完成后合并结果

假设我们需要同时查询用户的基本信息和账户余额,然后将两者拼接起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ExecutorService executor = Executors.newFixedThreadPool(5);

CompletableFuture<String> userInfoFuture = CompletableFuture.supplyAsync(() -> {
// 模拟查询用户信息
sleep(1000);
return "用户:张三";
}, executor);

CompletableFuture<String> balanceFuture = CompletableFuture.supplyAsync(() -> {
// 模拟查询账户余额
sleep(1500);
return "余额:1000元";
}, executor);

// 使用 thenCombine 合并两个结果
CompletableFuture<String> combinedFuture = userInfoFuture.thenCombine(
balanceFuture,
(userInfo, balance) -> userInfo + ", " + balance // 合并函数
);

// 这里为了演示我们调用 get(),实际应该继续用 thenAccept
System.out.println(combinedFuture.get()); // 输出: 用户:张三, 余额:1000元
// 总耗时约 1.5 秒(取决于最慢的那个任务)

executor.shutdown();

这里的 thenCombine 接收两个参数:另一个 CompletableFuture 和一个合并函数。只有当两个 future 都完成时,合并函数才会被调用。

场景 2:OR 关系 - 任意一个任务完成即可

假设我们同时向两个镜像服务器请求数据,谁先返回就用谁的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CompletableFuture<String> server1 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "服务器1的数据";
}, executor);

CompletableFuture<String> server2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "服务器2的数据";
}, executor);

// 使用 applyToEither,哪个先完成就用哪个的结果
CompletableFuture<String> fastest = server1.applyToEither(
server2,
result -> "获取到:" + result
);

System.out.println(fastest.get()); // 输出: 获取到:服务器2的数据
// 总耗时约 1 秒

场景 3:等待全部完成 - allOf

当我们需要等待多个任务全部完成,但不需要合并它们的结果时,可以使用 allOf

1
2
3
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
all.join(); // 等待所有任务完成
// join() 方法类似 get(),但它抛出的是非受检异常(RuntimeException),不需要显式捕获

这里的 join() 方法和 get() 方法的作用相同,都是阻塞等待任务完成。区别在于 get() 抛出受检异常(必须 try-catch 或声明抛出),而 join() 抛出非受检异常(可以不处理)。在链式调用中,join() 使用起来更方便。


五、实战案例:微服务场景下的服务编排

现在我们将所学知识应用到一个真实场景中。假设我们正在开发一个电商系统的商品详情页,为了展示完整信息,需要并行调用三个下游服务:

  1. 商品服务:获取商品基本信息(耗时 1 秒)
  2. 价格服务:获取价格信息(耗时 2 秒)
  3. 库存服务:获取库存信息(耗时 1 秒)

如果串行调用,总耗时是 1 + 2 + 1 = 4 秒。我们用 CF 来实现并行调用,将耗时缩短到 2 秒(最长任务的耗时)。

步骤 1:准备线程池和模拟服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.*;

public class ProductDetailService {
// 为服务调用创建专用线程池
private static final ExecutorService serviceExecutor = Executors.newFixedThreadPool(10);

// 模拟调用商品服务
static String fetchProductInfo(long productId) {
sleep(1000);
return "商品ID:" + productId + ", 名称:iPhone 15";
}

// 模拟调用价格服务
static String fetchPriceInfo(long productId) {
sleep(2000);
return "价格:¥5999";
}

// 模拟调用库存服务
static String fetchStockInfo(long productId) {
sleep(1000);
return "库存:100件";
}

static void sleep(long millis) {
try { TimeUnit.MILLISECONDS.sleep(millis); }
catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}

步骤 2:并行发起调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
long productId = 12345L;
System.out.println("开始获取商品详情...");
long startTime = System.currentTimeMillis();

// 并行发起三个异步调用
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(
() -> fetchProductInfo(productId),
serviceExecutor
);

CompletableFuture<String> priceFuture = CompletableFuture.supplyAsync(
() -> fetchPriceInfo(productId),
serviceExecutor
);

CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(
() -> fetchStockInfo(productId),
serviceExecutor
);

步骤 3:等待所有任务完成

1
2
3
4
5
6
7
8
9
10
// 使用 allOf 等待所有任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(
productFuture,
priceFuture,
stockFuture
);

// join() 会阻塞当前线程,直到所有任务完成
// 这里阻塞是必要的,因为我们需要等所有数据都拿到才能组装结果
allTasks.join();

步骤 4:组合结果并输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    // 此时所有 future 都已完成,调用 get() 不会阻塞
try {
String finalResult = String.format(
"===== 商品详情页 =====\n%s\n%s\n%s",
productFuture.get(),
priceFuture.get(),
stockFuture.get()
);

System.out.println(finalResult);
System.out.println("总耗时: " + (System.currentTimeMillis() - startTime) + " ms");
// 预期输出约 2000 ms,而不是 4000 ms

} catch (Exception e) {
System.err.println("获取商品详情失败: " + e.getMessage());
} finally {
serviceExecutor.shutdown();
}
}

运行结果

1
2
3
4
5
6
开始获取商品详情...
===== 商品详情页 =====
商品ID:12345, 名称:iPhone 15
价格:¥5999
库存:100件
总耗时: 2010 ms

通过并行化,我们成功将原本 4 秒的串行调用优化到了 2 秒。在实际的微服务架构中,这种优化对于提升用户体验至关重要。


7.5 [Java 21+] 并发革命:虚拟线程 (Virtual Threads)

前情回顾与本章定位

在上一节,我们见识了 CompletableFuture 如何通过回调机制将异步任务编排得行云流水。但你可能已经察觉到一个矛盾:为了避免阻塞宝贵的线程,我们不得不把简单直观的同步代码拆解成复杂的异步链。这就像为了省油而把汽车改装成电动车,虽然达到了目的,但整个驾驶体验都变了。

Java 21 带来的虚拟线程,则要从根本上终结这种 “为性能而牺牲简洁” 的痛苦权衡。它让我们重新拥有了 “同步的写法,异步的性能” 这个看似矛盾的完美组合。


7.5.1 为什么需要虚拟线程?平台线程的三重困境

[困境一] 平台线程的 “重量级” 枷锁

自 Java 1.0 诞生以来,我们使用的 java.lang.Thread,本质上都是对操作系统线程的一层薄薄的封装,这种线程被称为 平台线程(Platform Threads)。它们的核心特点,也是其最大的局限,可以用三个关键词概括:

维度平台线程特性带来的限制
映射关系与 OS 线程 1:1 绑定能创建的线程数受限于操作系统
内存成本每个线程预留独立栈空间(64 位 Linux 约 1MB)创建几千个线程就消耗数 GB 内存
调度成本由操作系统内核负责上下文切换线程数量增多时 CPU 大量时间花在调度上

一个残酷的数字对比

1
2
3
// 在一台 8GB 内存的服务器上
// 平台线程模式:创建 8000 个线程就接近内存极限
// 虚拟线程模式:可以轻松创建 1,000,000 个线程

形象比喻:平台线程就像 重型卡车——马力强劲,但造价高昂、油耗惊人,而且停车场(内存)能容纳的数量非常有限。你不可能让一万辆重型卡车同时在城市里穿行。

[困境二] I/O 密集型应用的死胡同

现代后端应用的典型画像是:80% 的时间在等待 I/O,只有 20% 的时间在真正计算。这种场景下,最符合人类思维的编程方式是 “一个请求一个线程(Thread-Per-Request)”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 理想中清晰易懂的代码
public void handleUserRequest(long userId) {
// 步骤1:查询用户信息(等待数据库返回,线程阻塞)
UserInfo user = userDao.findById(userId);

// 步骤2:查询订单列表(等待数据库返回,线程阻塞)
List<Order> orders = orderDao.findByUserId(userId);

// 步骤3:调用推荐服务(等待HTTP响应,线程阻塞)
Recommendations recs = recommendClient.getRecommendations(userId);

// 步骤4:渲染页面
return renderPage(user, orders, recs);
}

这段代码清晰地映射了业务流程,调试时也能看到完整的调用栈。但它有一个致命缺陷:线程在等待 I/O 的过程中完全被浪费了

当 “简单直观的代码” 遇上 “昂贵稀缺的线程”,冲突便无可避免

  • 如果采用 Thread-Per-Request 模式,一万个并发请求就需要一万个平台线程,服务器瞬间崩溃
  • 如果限制线程数(使用线程池),那么多余的请求只能在队列中等待,系统吞吐量严重受限

[困境三] 被迫走向复杂的异步世界

为了打破上述死胡同,Java 社区发明了一系列异步编程工具(CompletableFuture、响应式编程),其核心思想只有一个:绝不能让宝贵的平台线程因为等待 I/O 而阻塞

于是,清晰的同步代码被迫变成了这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 为了性能而被迫复杂化的代码
public CompletableFuture<PageData> handleUserRequestAsync(long userId) {
return userDao.findByIdAsync(userId)
.thenCompose(user -> {
CompletableFuture<List<Order>> ordersFuture = orderDao.findByUserIdAsync(userId);
CompletableFuture<Recommendations> recsFuture = recommendClient.getRecommendationsAsync(userId);

return ordersFuture.thenCombine(recsFuture, (orders, recs) -> {
return new PageData(user, orders, recs);
});
})
.thenApply(data -> renderPage(data));
}

我们用性能换来了:

  • 代码的支离破碎:业务逻辑被回调函数切割成碎片
  • 调试的噩梦:异常堆栈变得难以理解
  • 认知负担的倍增:每个开发者都要深入理解异步编程模型

整个生态陷入了一个两难困境:要么选择简单但无法扩展,要么选择高性能但极度复杂。

平台线程的时代,已经走到了它的黄昏。 我们需要一种新的模式,来终结这种痛苦的权衡。这就是虚拟线程诞生的历史使命。


7.5.2 虚拟线程的核心理念:M: N 调度与协作式让出

虚拟线程的解决方案,并非对平台线程的修修补补,而是引入了一套全新的、从底层架构上就完全不同的运作模式。

[革命性设计一] 轻量级本质

虚拟线程的第一个颠覆性设计,就是彻底改变了 “线程” 的存在形式:

对比维度平台线程虚拟线程
本质操作系统内核线程的封装JVM 管理的纯 Java 对象
存储位置操作系统内核空间Java 堆内存
栈空间预分配固定大小(约 1MB)初始仅几百字节,按需增长
创建成本需要系统调用,耗时约 1 毫秒仅分配对象,耗时约 1 微秒
数量上限通常几千到几万几十万到数百万

这带来了什么?虚拟线程的创建和销毁成本,几乎和创建一个普通 Java 对象相当。我们终于可以毫不吝啬地创建海量线程——几万、几十万、甚至上百万个,而无需担心耗尽系统资源。

新的比喻:如果说平台线程是 重型卡车,那么虚拟线程就是 共享单车。我们可以在城市的各个角落投放百万辆共享单车,让每个有需求的人都能随时骑走一辆,用完就还,成本低到可以忽略不计。

[革命性设计二] M: N 调度模型

虚拟线程打破了与操作系统线程 1:1 的强绑定关系,采用了一种更加高效的 M: N 调度模型

  • M:代表应用中创建的大量(M 个)虚拟线程
  • N:代表 JVM 使用的少量(N 个)平台线程,这些平台线程被称为 载体线程
  • N 的默认值:通常等于 CPU 核心数

JVM 内置了一个调度器(默认使用 ForkJoinPool),它的工作就是将这 M 个虚拟线程,轮流 “骑” 在 N 个载体线程上执行

1
2
3
4
5
6
7
虚拟线程层:  [VT-1] [VT-2] [VT-3] [VT-4] [VT-5] ... [VT-100万]
↓ ↓ ↓ ↓
↓ └──────┼──────┘
↓ ↓
载体线程层: [PT-1] [PT-2] ... [PT-8] (假设8核CPU)
↓ ↓
操作系统: [OS线程1] [OS线程2] ... [OS线程8]

协作式调度:不阻塞载体线程

M: N 调度只是基础,虚拟线程真正的强大之处在于其 协作式调度 机制。让我们详细拆解当一个虚拟线程遇到 I/O 阻塞时的完整流程:

场景:虚拟线程 VT-1 正在执行一个网络请求

1
2
3
// 在虚拟线程 VT-1 中执行
Socket socket = new Socket("api.example.com", 80);
byte[] data = socket.getInputStream().read(); // 阻塞点

底层发生了什么(这个过程对开发者完全透明):

  1. 执行阶段:VT-1 被装载(mount)在载体线程 PT-1 上运行
  2. 遇到阻塞:执行到 read() 时,需要等待网络数据到达
  3. 魔法开始
    • JDK 的 I/O 库检测到这是一个阻塞操作
    • 通知 JVM 调度器:“VT-1 需要等待,请释放 PT-1”
  4. 卸载(Unmount)
    • VT-1 立即从 PT-1 上 “卸载”
    • VT-1 的栈帧(几百字节)被保存到堆内存
    • VT-1 的状态标记为 “等待 I/O”
  5. 继续利用
    • 调度器从就绪队列中取出另一个虚拟线程 VT-2
    • 将 VT-2 “装载” 到刚释放的 PT-1 上
    • PT-1 继续运行 VT-2 的任务,一刻也没有闲着
  6. I/O 完成
    • 网络数据到达,操作系统通知 JVM
    • VT-1 的状态从 “等待” 变为 “就绪”
  7. 重新装载(Mount)
    • 调度器在某个载体线程空闲时,将 VT-1 重新装载
    • 从之前保存的栈帧恢复执行,继续执行 read() 之后的代码

关键点:整个过程中,载体线程 PT-1 几乎一刻也没有被浪费。它没有因为 VT-1 的 I/O 等待而阻塞,而是立即切换去服务其他虚拟线程。

[核心优势] “同步的写法,异步的性能” 通过这套 “卸载-装回” 的协作机制,虚拟线程实现了一个看似矛盾的效果:

开发者用最简单、最直观的同步阻塞方式编写业务代码,而 JVM 在底层自动将其转换为非阻塞的高性能执行模式。

对比两种编程范式:

1
2
3
4
5
6
7
8
9
10
11
12
// 传统异步方式:为了性能而复杂
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> callServiceA(), executor)
.thenCompose(resultA -> callServiceB(resultA))
.thenApply(resultB -> processData(resultB));

// 虚拟线程方式:简单的同步写法,自动获得异步性能
Thread.ofVirtual().start(() -> {
String resultA = callServiceA(); // 看似阻塞,实际会自动让出载体线程
String resultB = callServiceB(resultA);
String finalResult = processData(resultB);
});

我们不再需要在 CompletableFuture 的回调链中挣扎,也不再需要为线程池的配置绞尽脑汁。我们可以重新回到那个清晰的 “一个请求一个线程” 模型,因为现在,线程变得像对象一样廉价且丰富。这就是虚拟线程为 Java 并发编程带来的革命性解放。


7.5.3 API 详解:创建与管理虚拟线程

尽管虚拟线程的内部机制复杂精妙,但 Java 设计者为其配备了一套极其简洁优雅的 API,目标只有一个:鼓励开发者采纳 “一个任务一个线程” 的新范式

[方式一] Thread.ofVirtual() 直接创建

Java 21 引入了全新的 Thread.Builder 接口,用于以流畅的链式调用方式创建线程。Thread.ofVirtual() 是获取虚拟线程构建器的入口方法。

基础用法

1
2
3
4
5
6
7
8
// 创建并启动一个虚拟线程
Thread vt = Thread.ofVirtual().start(() -> {
System.out.println("Hello from virtual thread!");
System.out.println("线程信息: " + Thread.currentThread());
// 输出示例: VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1
});

vt.join(); // 等待虚拟线程完成

配置线程名称

1
2
3
4
5
6
7
8
9
10
11
12
// 使用构建器设置线程名称(支持自动递增)
Thread.Builder builder = Thread.ofVirtual().name("worker-", 0);

Runnable task = () -> {
System.out.println("执行线程: " + Thread.currentThread().getName());
};

Thread t1 = builder.start(task); // 创建 worker-0
Thread t2 = builder.start(task); // 创建 worker-1
Thread t3 = builder.start(task); // 创建 worker-2

t1.join(); t2.join(); t3.join();

与平台线程的对比

1
2
3
4
5
6
7
8
9
10
// 旧方式:创建平台线程
Thread platformThread = new Thread(() -> {
System.out.println("Platform thread");
});
platformThread.start();

// 新方式:创建虚拟线程
Thread virtualThread = Thread.ofVirtual().start(() -> {
System.out.println("Virtual thread");
});

[方式二] Executors.newVirtualThreadPerTaskExecutor() 推荐实践

虽然可以手动创建单个虚拟线程,但在处理大量并发任务时,使用 ExecutorService 仍然是更好的选择,因为它提供了统一的生命周期管理和任务提交接口。

核心特性

1
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

这个方法创建的 ExecutorService 有以下特点:

  1. 不复用线程:为每个提交的任务创建一个全新的虚拟线程
  2. 无界容量:不会因为任务过多而拒绝(因为虚拟线程廉价)
  3. 自动管理:虚拟线程在任务完成后自动销毁

为什么这是推荐方式?

传统线程池虚拟线程执行器
需要精心配置核心线程数、最大线程数无需配置,自动适应任务数量
线程会被复用,需要考虑 ThreadLocal 污染每个任务独立线程,天然避免状态污染
任务过多会被拒绝或排队可以轻松处理百万级并发任务

实战示例:并发处理批量 I/O 任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class VirtualThreadBatchProcessing {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();

// 使用 try-with-resources 确保 executor 自动关闭
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {

// 提交 10000 个模拟的 I/O 任务
for (int i = 0; i < 10000; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// 模拟调用远程服务(阻塞 100ms)
Thread.sleep(Duration.ofMillis(100));

if (taskId % 1000 == 0) {
System.out.println("任务 #" + taskId + " 完成,线程: " +
Thread.currentThread());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

} // executor.close() 会自动调用,等待所有任务完成

long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("\n10000 个任务总耗时: " + elapsedTime + " ms");
// 预期输出约 100-200ms(取决于CPU核心数)
// 如果用传统线程池,可能需要数秒甚至数十秒
}
}

对比:传统线程池的困境

1
2
3
4
5
6
7
// 如果用传统固定大小线程池处理上述任务
ExecutorService traditionalPool = Executors.newFixedThreadPool(100);

// 问题:
// 1. 10000个任务需要排队,只能由100个线程串行处理
// 2. 总耗时 = 10000 / 100 * 100ms = 10秒
// 3. 如果增加线程数到10000,会直接导致内存溢出

[对比总结] 何时用哪种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 场景1:单次简单任务,使用 Thread.ofVirtual()
Thread.ofVirtual().start(() -> sendNotification(user));

// 场景2:批量任务处理,使用 ExecutorService
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (User user : users) {
executor.submit(() -> sendNotification(user));
}
}

// 场景3:需要获取任务结果
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
futures.add(executor.submit(() -> fetchData()));
}

for (Future<String> future : futures) {
String result = future.get();
processResult(result);
}
}

7.5.4 适用场景与性能考量

虚拟线程虽然强大,但它并非解决所有并发问题的 “银弹”。理解其适用边界,并根据任务特性做出正确选择,是高级 Java 工程师的必备能力。

[黄金法则] I/O 密集型 vs. CPU 密集型

这是决定是否使用虚拟线程的 最核心判断标准

I/O 密集型:虚拟线程的主场

特征详细说明典型场景
定义任务大部分时间在等待 I/O,CPU 处于空闲-
网络 I/O等待 HTTP 响应、RPC 调用返回微服务调用、REST API
磁盘 I/O等待文件读写完成日志写入、文件处理
数据库 I/O等待 SQL 查询结果JDBC 查询、ORM 操作
消息队列等待消息到达或发送确认Kafka、RabbitMQ

为何适用:在这些场景下,虚拟线程的协作式调度能发挥最大威力。当一个虚拟线程等待 I/O 时,它会主动让出载体线程,让系统可以用极少的平台线程支撑起百万级并发连接。

性能提升示例

1
2
3
4
5
6
7
8
// 场景:需要并发调用 1000 个微服务接口
// 每个接口响应时间 100ms

// 方案1:传统固定线程池(100个线程)
// 总耗时 = 1000 / 100 * 100ms = 1000ms

// 方案2:虚拟线程
// 总耗时 = 约 100ms(所有请求并发发出)

CPU 密集型:平台线程池的阵地

特征详细说明典型场景
定义任务持续占用 CPU 进行计算,几乎没有等待-
数学计算复杂的算法、统计分析数据挖掘、机器学习
编解码视频编码、图像处理多媒体应用
加密运算大量加密解密操作安全相关服务
数据处理大规模排序、聚合计算ETL 任务

为何不适用:对于 CPU 密集型任务,系统瓶颈在于 CPU 核心数,而非线程数。即使创建一百万个虚拟线程去执行计算,它们最终还是要排队抢占有限的 CPU 核心。此时虚拟线程的调度优势无法体现,反而可能因额外的调度开销而略微降低性能。

最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// CPU 密集型任务:使用平台线程池
int cpuCores = Runtime.getRuntime().availableProcessors();
ExecutorService cpuBoundExecutor = Executors.newFixedThreadPool(cpuCores);

cpuBoundExecutor.submit(() -> {
// 执行复杂计算
performHeavyComputation();
});

// I/O 密集型任务:使用虚拟线程
ExecutorService ioBoundExecutor = Executors.newVirtualThreadPerTaskExecutor();

ioBoundExecutor.submit(() -> {
// 执行网络请求
String data = httpClient.get("https://api.example.com");
processData(data);
});

[编程范式] 重回 “一个任务一个线程” 模型

在虚拟线程时代,我们应该重新拥抱这个最古老、最直观的并发模型。

过去的困境

1
2
3
4
5
6
7
8
9
10
// 传统做法:因为线程昂贵,必须使用线程池
ExecutorService threadPool = Executors.newFixedThreadPool(200);

public void handleRequest(Request req) {
threadPool.submit(() -> {
// 任务1
// 任务2
// ...
});
}

现在的解放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 虚拟线程做法:每个请求创建一个新线程
public void handleRequest(Request req) {
Thread.ofVirtual().start(() -> {
// 任务1:查询用户信息(阻塞等待)
User user = userService.getUser(req.getUserId());

// 任务2:查询订单(阻塞等待)
List<Order> orders = orderService.getOrders(user.getId());

// 任务3:调用推荐服务(阻塞等待)
Recommendations recs = recService.getRecommendations(user);

// 任务4:渲染结果
renderResponse(user, orders, recs);
});
}

编码范式的三大转变

维度传统平台线程虚拟线程时代
线程创建精心管理线程池,复用线程随需创建,用后即弃
代码风格异步回调、Future 链式调用简单的同步顺序代码
心智模型时刻担心线程是否够用完全不用考虑线程数量

理解虚拟线程的边界至关重要:

1. 它不能让单个操作变快

1
2
3
4
5
// 误解:虚拟线程能加速数据库查询
String result = database.query("SELECT..."); // 耗时 100ms

// 真相:无论用平台线程还是虚拟线程,这个查询仍然需要 100ms
// 虚拟线程解决的是"吞吐量",而非"延迟"

2. 它不等于无限的 CPU

1
2
3
4
5
6
7
// 误解:创建 100 万个虚拟线程进行计算就能提速
for (int i = 0; i < 1_000_000; i++) {
Thread.ofVirtual().start(() -> computeHash(data));
}

// 真相:所有虚拟线程最终还是要在有限的 CPU 核心上执行
// 如果只有 8 核,同时只能真正并行 8 个计算任务

3. 它依赖于适配的 I/O 库

虚拟线程的协作式调度能力,依赖于 JDK 内部的 I/O、网络和并发库已为其做了适配:

已适配未适配
java.net.Socket❌ 某些旧的第三方网络库
java.nio❌ 未优化的 JNI 本地方法
java.io.InputStream❌ 直接的系统调用
java.util.concurrent.* (除 synchronized)❌ 某些遗留的同步代码

性能基准参考(在 16 核机器上):

1
2
3
4
5
6
7
8
9
// 测试场景:10000 个并发 HTTP 请求,每个请求耗时 100ms

// 平台线程池(200个线程):
// 总耗时约 5 秒
// 原因:10000 / 200 * 100ms = 5000ms

// 虚拟线程:
// 总耗时约 100ms
// 原因:所有请求几乎同时发出,只受网络带宽限制