Java(七):7.0 Java并发编程(下)

7.4 [实战] 线程池与异步编程

在前面的章节中,我们学习了如何创建和管理单个线程 (new Thread())。然而,在真实世界的服务端应用中,为每一个到来的请求或任务都创建一个新线程,是一种非常原始且危险的做法。因为线程的创建和销毁是重量级操作,会消耗大量的系统资源。如果请求量巨大,无限制地创建线程会迅速耗尽服务器资源,导致系统崩溃。

为了解决这个问题,线程池应运而生。它是现代并发编程的基石,也是管理线程的最佳实践。

7.4.1 [核心] 为什么要使用线程池?

简单来说,手动管理线程的方式既“昂贵”又“失控”。而线程池通过对线程的复用和统一管理,带来了三大核心优势。

1. 降低资源消耗
  • 核心思想: 复用已创建的线程,避免了频繁创建和销毁线程所带来的高昂开销。
  • 讲解:
    • 线程的创建和销毁涉及到与操作系统内核的交互、内存栈的分配与回收,这些都是成本很高的操作。
    • 线程池在启动时会预先创建一定数量的线程,当任务到来时,直接从池中取一个空闲线程来执行。任务结束后,线程并不会被销毁,而是归还给池子,等待下一个任务。
  • 比喻: 就像开一家餐厅,线程池相当于雇佣了一批固定的厨师团队(核心员工)。而不是每来一位客人就临时招聘一位厨师,客人吃完再将其解雇。
2. 提高响应速度
  • 核心思想: 消除了线程创建的延迟。
  • 讲解:
    • 当一个新任务到来时,如果需要临时创建一个新线程,这个过程是需要时间的。
    • 而使用线程池,任务可以直接交给一个处于等待状态的空闲线程执行,省去了创建线程的步骤,从而让任务能够更快地得到处理,提升了系统的响应能力。
  • 比喻: 当客人点餐后,待命的厨师可以立即开始炒菜,而无需等待HR完成招聘流程。
3. 提高线程的可管理性
  • 核心思想: 对线程进行统一的分配、调优和监控。
  • 讲解:
    • 如果任由代码随意创建线程,这些线程会散落在应用的各个角落,处于一种“失控”状态,难以管理。
    • 线程池作为一个中央管理器,为我们提供了强大的控制能力:
      • 控制并发数: 可以精确控制池中核心线程数和最大线程数,防止因线程过多而耗尽系统资源。
      • 统一监控: 可以方便地获取线程池的运行状态,如活动线程数、任务队列大小、已完成任务数等,便于监控和调优。
      • 统一管理: 可以统一设置线程的属性(如线程名、是否为守护线程),并能安全、平滑地关闭整个线程池。

结论: 在任何需要处理大量异步任务或并发请求的场景下,使用线程池都不仅仅是一种优化,而是一种必需。它是构建健壮、高性能并发系统的标准做法。接下来,我们将深入探索线程池的核心实现——ThreadPoolExecutor


7.4.2 [核心] ThreadPoolExecutor 详解

ThreadPoolExecutor是JUC线程池框架中最核心、最底层的实现类。我们日常使用的Executors工具类创建的各种线程池,其内部几乎都是ThreadPoolExecutor的实例。因此,要精通线程池,就必须从理解它的构造函数和核心参数开始。

[高频][面试题] 七大核心参数

ThreadPoolExecutor最常用的构造函数有七个参数,每一个都深刻影响着线程池的行为。理解它们的含义是面试的绝对高频考点。

参数 (Parameter)类型 (Type)核心作用
corePoolSizeint核心线程数。线程池中保持存活的线程数,即使它们是空闲的。
maximumPoolSizeint最大线程数。线程池能容纳的同时执行的线程最大数量。
keepAliveTimelong空闲线程存活时间。当线程数大于corePoolSize时,多余的空闲线程在被销毁前等待新任务的最长时间。
unitTimeUnitkeepAliveTime的时间单位(如秒、毫秒)。
workQueueBlockingQueue<Runnable>工作队列。用于存放等待执行的任务的阻塞队列。
threadFactoryThreadFactory线程工厂。用于创建新线程。可自定义线程名、是否为守护线程等。
handlerRejectedExecutionHandler拒绝策略。当队列和线程池都满了,无法处理新任务时所采取的策略。

线程池工作流程

当一个新任务通过execute()方法提交给ThreadPoolExecutor时,它会遵循以下决策路径:

image-20250713152622620

  1. 判断核心线程数:检查当前运行的线程数是否小于corePoolSize

    • :直接创建一个新的核心线程来执行该任务,即使池中有其他空闲线程。
    • :进入步骤2。
  2. 尝试加入工作队列:尝试将任务添加到workQueue中。

    • 成功:任务进入队列等待被空闲线程执行。
    • 失败(队列已满):进入步骤3。
  3. 判断最大线程数:检查当前运行的线程数是否小于maximumPoolSize

    • :创建一个新的非核心线程(也叫“救急线程”)来执行该任务。
    • :进入步骤4。
  4. 执行拒绝策略:当前线程数已达到最大值,且队列也已满。此时线程池已超负荷,必须通过指定的RejectedExecutionHandler来拒绝该任务。


工作队列 workQueue 的选择

workQueue的选择对线程池的行为有决定性影响:

  • ArrayBlockingQueue: 基于数组的有界阻塞队列。必须指定容量。当队列满了之后,会触发创建非核心线程,直到达到maximumPoolSize。有助于防止资源耗尽。
  • LinkedBlockingQueue: 基于链表的阻塞队列。如果构造时不指定容量,则默认为Integer.MAX_VALUE,相当于一个无界队列
    • 注意: 使用无界队列时,任务会一直被添加到队列中,导致maximumPoolSize参数失效,因为线程数永远不会超过corePoolSize。如果任务生产速度远超消费速度,可能导致内存溢出(OOM)。
  • SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等待一个相应的移除操作。它会直接将任务“递交”给一个线程。如果没有空闲线程,就会触发创建新线程(直到maximumPoolsize),因此适合处理大量、耗时短的瞬时任务。Executors.newCachedThreadPool()就使用了它。
  • PriorityBlockingQueue: 一个支持优先级排序的无界队列。任务会根据其优先级被执行。
workQueue 类型特点说明适用场景
ArrayBlockingQueue基于数组的有界阻塞队列,必须指定容量,队列满后会创建非核心线程需要控制资源、防止内存溢出的场景
LinkedBlockingQueue基于链表的阻塞队列,默认无界(Integer.MAX_VALUE),可能导致 OOM任务量可控、不关心线程数上限的场景
SynchronousQueue不存储元素,每个插入操作必须等待移除,直接将任务 “递交” 给线程处理大量短时任务、需要快速响应的场景
PriorityBlockingQueue支持优先级排序的无界队列,任务按优先级执行需要按优先级处理任务的场景
拒绝策略 RejectedExecutionHandler
拒绝策略类型特点说明适用场景
ThreadPoolExecutor.AbortPolicy (默认)直接抛出RejectedExecutionException异常,阻止系统正常工作。适用于任务非常重要,不允许丢失,且出现异常时希望立即得到通知的场景。
ThreadPoolExecutor.CallerRunsPolicy“调用者运行”策略。该任务不会被丢弃,也不会被线程池执行,而是由提交该任务的线程(调用execute的线程)自己来执行。这是一种有效的“反压”机制,可以减慢任务提交者的速度。当希望限制任务提交者的速率,防止提交过多任务导致线程池过载时使用。
ThreadPoolExecutor.DiscardPolicy直接静默地丢弃任务,不抛出任何异常。适用于任务不重要,即使丢失也不会对系统产生重大影响,并且希望最大化吞吐量的场景。
ThreadPoolExecutor.DiscardOldestPolicy丢弃工作队列队首的(最旧的)一个任务,然后重新尝试提交当前任务。适用于希望优先执行最新提交的任务,并且可以容忍丢失部分旧任务的场景。

7.4.3 [避坑指南] 为什么不推荐使用 Executors 工具类?

Executors工具类提供了一系列静态工厂方法,如newFixedThreadPool(), newCachedThreadPool()等,它们能够让我们用一行代码就创建一个线程池,看起来非常方便。然而,这种便利背后隐藏着巨大的风险,尤其是在生产环境中。

核心论点: 阿里巴巴《Java开发手册》等业界权威规范中,都强制要求开发者通过ThreadPoolExecutor的构造函数来创建线程池,而不是使用Executors

两大主要隐患:OOM(内存溢出)风险

Executors工厂方法创建的线程池,其内部参数配置存在“陷阱”,可能导致在特定场景下耗尽系统资源。

1. newFixedThreadPoolnewSingleThreadExecutor 的风险

  • 内部实现:

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
  • 问题根源: 它们都使用了一个无界的LinkedBlockingQueue(默认容量为Integer.MAX_VALUE,约21亿)。

  • 潜在风险: 如果任务的生产速度持续快于线程池的处理速度,任务就会在队列中无限堆积。最终,这将耗尽应用的所有堆内存,导致**OutOfMemoryError**,使整个应用崩溃。

2. newCachedThreadPoolnewScheduledThreadPool 的风险

  • 内部实现:
    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
  • 问题根源: 它们允许创建的线程数量上限为Integer.MAX_VALUE
  • 潜在风险: newCachedThreadPool的设计是来一个任务就创建一个新线程(如果没有空闲线程)。如果瞬间涌入大量请求,线程池就会尝试创建海量的线程。每个线程都需要消耗一定的栈内存(通常是1MB左右)。这会迅速耗尽JVM进程的可用内存,导致**OutOfMemoryError: unable to create new native thread**,同样会导致系统崩溃。
最佳实践:手动创建 ThreadPoolExecutor

规避上述风险的唯一可靠方法,就是放弃Executors的便利,回归到ThreadPoolExecutor的构造函数,手动指定每一个参数。

这样做的好处是**“所见即所得”**,强迫我们开发者对线程池的每一个行为细节进行深入思考和控制。

  • 代码示例:一个配置合理的线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    package com.example;

    import java.util.concurrent.*;

    public class ThreadPoolBestPractice {

    public static ExecutorService createMyThreadPool() {
    // 获取CPU核心数作为参考
    int corePoolSize = Runtime.getRuntime().availableProcessors();
    int maximumPoolSize = corePoolSize * 2;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;

    // 关键:使用有界队列来防止资源耗尽
    BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(200);

    // 建议:使用自定义的ThreadFactory,便于给线程命名,方便排查问题
    ThreadFactory threadFactory = r -> new Thread("my-pool-" + r.hashCode());

    // 关键:选择一个合适的拒绝策略来处理过载任务
    RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();

    // 手动创建ThreadPoolExecutor
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize,
    maximumPoolSize,
    keepAliveTime,
    unit,
    workQueue,
    threadFactory,
    handler
    );

    System.out.println("自定义线程池创建成功!");
    return executor;
    }
    }

结论: Executors 工具类是学习和简单测试时的“好朋友”,但对于严肃的生产级应用,它却是“危险的敌人”。养成手动创建和配置ThreadPoolExecutor的习惯,是每一位专业Java程序员必备的素养。


7.4.4 [进阶] 带返回值的任务:Callable 与 Future

到目前为止,我们提交给线程池的任务都是Runnable,它的run()方法没有返回值。那么,如果我们需要异步执行一个任务,并获取其执行结果,应该怎么做呢?JUC为此提供了CallableFuture

Callable<V> 接口

Callable接口可以看作是Runnable的增强版,它弥补了Runnable的两个核心短板。

  • Runnable的局限:

    1. run()方法没有返回值,无法获取任务执行结果。
    2. run()方法不能抛出受检异常,异常处理很麻烦。
  • Callable的增强:
    Callable是一个泛型接口,其唯一的call()方法签名如下:

    1
    V call() throws Exception;
    1. 可以有返回值: 方法返回一个V类型的结果。
    2. 可以抛出异常: 允许在方法签名中声明抛出异常,使得异常处理更加直接。
Future<V> 接口

当你将一个Callable任务提交给线程池时,由于任务是异步执行的,你不可能立即拿到结果。线程池会立刻返回一个Future对象。

Future对象就像是一张“提货单”或者一个“承诺”,它代表了未来某个时刻将会完成的任务的结果。我们可以通过这张“提货单”在未来的任意时刻去查询任务状态或提取最终结果。

  • Future 的核心方法:
方法 (Method)核心作用行为说明
V get()获取异步任务的执行结果。阻塞式。如果任务尚未完成,调用此方法的线程会一直阻塞,直到拿到结果。
V get(long timeout, TimeUnit unit)带超时地获取结果。在指定时间内阻塞等待。如果超时任务仍未完成,会抛出TimeoutException
boolean isDone()判断任务是否已完成非阻塞。可以用来轮询任务状态,避免因调用get()而无限期阻塞。
boolean cancel(boolean mayInterrupt)尝试取消任务。如果任务已完成或已被取消,则失败。否则成功取消。mayInterrupt参数表示是否要中断正在执行该任务的线程。
boolean isCancelled()判断任务是否已被取消非阻塞
三者关系与代码示例

ExecutorServicesubmit()方法可以将一个Callable任务提交到线程池,并返回一个Future对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.*;

public class CallableFutureExample {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 创建一个线程池
ExecutorService executor = Executors.newSingleThreadExecutor();

// 2. 创建一个Callable任务,该任务会计算1到100的和
Callable<Integer> myTask = () -> {
System.out.println("子线程 " + Thread.currentThread().getName() + " 开始计算...");
TimeUnit.SECONDS.sleep(2); // 模拟耗时计算
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
};

// 3. 提交Callable任务,获取Future对象(“提货单”)
System.out.println("主线程:提交任务给线程池。");
Future<Integer> future = executor.submit(myTask);

// 4. 主线程可以继续做其他事情...
System.out.println("主线程:我先去忙别的了...");
TimeUnit.SECONDS.sleep(1);
System.out.println("主线程:忙完了,现在来看看子线程的结果。");

// 5. 通过Future.get()获取结果(如果任务没执行完,这里会阻塞)
// ExecutionException 会包装Callable中抛出的原始异常
Integer result = future.get();
System.out.println("主线程:拿到任务执行结果 -> " + result);

// 6. 关闭线程池
executor.shutdown();
}
}
  • 关于FutureTask: 这是一个很巧妙的类,它同时实现了RunnableFuture接口。因此,你可以将一个Callable包装成FutureTask,然后像Runnable一样提交给线程池,同时这个FutureTask对象本身就可以用来获取结果。

Future 的局限性

尽管FutureCallable解决了有返回值和异常处理的问题,但它自身的设计也存在明显的局限性,这也是后来CompletableFuture诞生的原因。

  1. 阻塞式获取结果: Future的主要缺点是它的get()方法是阻塞的。这使得异步编程的优势大打折扣。虽然我们的任务是异步执行的,但为了获取结果,主线程往往还是得停下来等待,整个流程又变成了“同步”模式。

  2. 缺乏完成回调: 我们无法为Future任务的完成注册一个回调函数。也就是说,我们不能方便地实现“当任务完成后,自动执行下一个动作”这样的逻辑。我们只能通过isDone()轮询或者get()阻塞的方式来被动地等待任务完成。

  3. 组合能力弱: 对于多个Future任务,我们很难实现复杂的组合。比如,“等待两个Future都完成后,将它们的结果合并处理”,或者“等待多个Future中任意一个完成后就继续”等场景,Future接口本身并未提供优雅的支持。

这些局限性,促使了Java 8中更强大的现代异步编程工具——CompletableFuture的诞生。


7.4.5 [Java 8+] 现代异步编程:CompletableFuture

CompletableFuture (CF) 是对Future的革命性增强。如果说Future仅仅是一个异步结果的被动容器,那么CompletableFuture则是一个功能完备、可主动编排的异步任务“装配线”。

核心优势:彻底摆脱 Future.get() 的阻塞

CompletableFuture的核心设计理念,就是用一种非阻塞、事件驱动的方式来处理异步结果。

  1. 非阻塞 (Non-blocking): 它通过注册回调函数的方式,让你可以在任务完成时自动执行后续操作,而无需主线程傻傻地阻塞等待。
  2. 链式调用 (Fluent API): 它的API设计得像Stream一样,可以进行优雅的链式调用(.thenApply().thenAccept()...),将复杂的异步处理流程清晰地串联起来。
  3. 组合能力 (Combinable): 提供了强大的方法来组合多个异步任务,轻松实现“AND”、“OR”以及串行依赖等复杂的业务逻辑。
  4. 完善的异常处理: 提供了exceptionallyhandle等方法,可以优雅地在异步链中捕获和处理异常。

核心用法

1. 创建异步任务

创建CompletableFuture通常使用两个静态方法,它们默认使用全局的ForkJoinPool.commonPool()来执行任务。为了避免业务任务与框架任务(如并行流)互相干扰,强烈建议为其指定自定义的业务线程池

  • supplyAsync(Supplier<U> supplier, Executor executor): 用于执行有返回值的任务。
  • runAsync(Runnable runnable, Executor executor): 用于执行无返回值的任务。
1
2
3
4
5
6
7
8
// 建议:为你的业务创建一个专用的线程池
ExecutorService myBizExecutor = Executors.newFixedThreadPool(10);

// 创建一个有返回值的异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// ... 执行耗时操作
return "Hello, CompletableFuture";
}, myBizExecutor);

2. 结果处理与转换(then...系列)

这是CompletableFuture最强大的地方,它提供了一系列方法来注册任务完成后的回调。

方法作用参数类型返回类型
thenApply(Function fn)转换结果。接收上一步结果,处理后返回新结果Function<T, U>CompletableFuture<U>
thenAccept(Consumer action)消费结果。接收上一步结果,执行操作,无返回值Consumer<T>CompletableFuture<Void>
thenRun(Runnable action)执行动作。不关心上一步结果,仅在上一步完成后执行一个RunnableRunnableCompletableFuture<Void>
  • 代码示例:

    1
    2
    3
    4
    5
    6
    CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> "123") // 1. 异步提供字符串"123"
    .thenApply(Integer::parseInt) // 2. 将字符串结果转换为Integer
    .thenApply(i -> i * 10); // 3. 将Integer结果乘以10

    // thenAccept消费最终结果
    cf.thenAccept(result -> System.out.println("最终结果: " + result)); // 输出: 最终结果: 1230
  • 关于*Async变体: 上述每个方法几乎都有一个对应的*Async版本(如thenApplyAsync)。它们的区别在于,*Async版本可以让你指定一个线程池来执行当前的回调任务,从而实现更精细的线程调度。如果不指定,则默认使用ForkJoinPool.commonPool()

3. 组合多个任务

  • AND 关系 - thenCombine: 当两个CompletableFuture完成时,将它们的结果合并处理。

    1
    2
    3
    4
    5
    6
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "World");

    // 合并cf1和cf2的结果
    CompletableFuture<String> combined = cf1.thenCombine(cf2, (res1, res2) -> res1 + ", " + res2);
    System.out.println(combined.get()); // 输出: Hello, World
  • OR 关系 - applyToEither / acceptEither: 当两个CompletableFuture任意一个完成时,就使用它的结果进行下一步操作。

  • 等待全部 - allOf: 等待所有给定的CompletableFuture都执行完毕。返回CompletableFuture<Void>

  • 等待任一 - anyOf: 等待任意一个CompletableFuture执行完毕。返回CompletableFuture<Object>

实战场景:服务编排

CompletableFuture在微服务架构中的服务编排场景下大放异彩。假设为了展示一个商品详情页,你需要并行调用多个下游服务:

  1. 获取商品基本信息
  2. 获取商品价格信息
  3. 获取商品库存信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.example;

import java.util.concurrent.*;

import static com.example.Main.ServiceOrchestration.*;

public class Main {

static class ServiceOrchestration {
// 模拟调用下游服务
static String getProductInfo(long id) { try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) {} return "商品信息"; }
static String getPriceInfo(long id) { try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) {} return "价格信息"; }
static String getStockInfo(long id) { try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) {} return "库存信息"; }


}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
long productId = 1L;
System.out.println("开始并行获取商品详情...");
long startTime = System.currentTimeMillis();
// 1. 并行发起三个异步调用
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> getProductInfo(productId), executor);
CompletableFuture<String> priceFuture = CompletableFuture.supplyAsync(() -> getPriceInfo(productId), executor);
CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> getStockInfo(productId), executor);
CompletableFuture.allOf(productFuture, priceFuture, stockFuture).join(); // join()会阻塞等待,类似get()但抛出非受检异常
// 3. 组合结果
String result = String.format("【%s】\n【%s】\n【%s】",
productFuture.get(), priceFuture.get(), stockFuture.get());

System.out.println("成功获取所有信息:\n" + result);
System.out.println("总耗时: " + (System.currentTimeMillis() - startTime) + " ms"); // 耗时约等于最长的那个任务(2秒)

executor.shutdown();

}
}

结论: CompletableFuture 通过其强大的编排能力,将我们从Future.get()的阻塞泥潭中解放出来,使得编写高效、清晰、非阻塞的异步代码成为可能,是Java 8以后处理复杂异步场景的首选工具。


7.5 [Java 21+] 并发革命:虚拟线程 (Virtual Threads)

在深入学习虚拟线程的具体用法之前,我们必须首先回答一个根本性问题:长久以来,我们已经拥有了稳定可靠的线程池和功能强大的CompletableFuture,Java为何还要在21世纪20年代投入巨大精力,去创造一种全新的线程模型?

答案在于,我们过去所有的并发方案,都是在为一个“先天不足”的旧模型“打补丁”。而虚拟线程,则是要从根源上推翻这个旧模型。

7.5.1 为什么需要虚拟线程?平台线程的黄昏

[核心痛点] 平台线程的“重量级”本质

自Java诞生以来,我们所使用的java.lang.Thread实例,都被称为平台线程(Platform Threads)。它们的核心特点,也是其最大的局限,在于:

  1. 与操作系统线程的1:1映射

    • 每一个Java平台线程,都与一个操作系统(OS)内核线程一一对应。Java线程只是操作系统线程的一个薄层封装。这意味着,我们能创建的线程数量,直接受限于操作系统的能力。
  2. 高昂的资源成本

    • 内存成本: 操作系统为每个线程都预留了一大块独立的栈内存(在64位Linux上通常是1MB)。无论你的线程实际用不用得到,这1MB的空间都会被保留。因此,仅仅创建几千个线程,就可能消耗掉数GB的内存。
    • CPU成本: 线程的调度和上下文切换完全由操作系统内核负责。当线程数量增多时,CPU需要花费大量时间在不同线程之间切换状态,而不是执行真正的业务逻辑。这种开销是巨大的。

一个贴切的比喻:平台线程就像是重量级的洲际导弹发射井。它威力强大,但建造和维护成本极高,占地面积巨大,而且一个国家(操作系统)能拥有的数量也极其有限。

[历史包袱] I/O密集型应用在传统模型下的困境

现代大多数后端应用,本质上都是I/O密集型(I/O-intensive)的。这意味着程序的大部分时间不是在进行CPU计算,而是在等待:等待网络数据、等待数据库返回结果、等待文件读写完成。

在这样的背景下,最简单、最符合人类思维的编程模型是“一个请求一个线程(Thread-Per-Request)”:

1
2
3
4
5
6
7
8
9
10
11
// 理想中简单直观的代码
public void handleRequest(Request request) {
// 1. 调用用户服务(阻塞等待I/O)
UserInfo userInfo = userClient.getUser(request.getUserId());

// 2. 调用商品服务(阻塞等待I/O)
ProductInfo productInfo = productClient.getProduct(request.getProductId());

// 3. 渲染页面
renderPage(userInfo, productInfo);
}

这段代码清晰地反映了业务流程。但它有一个致命问题:在处理请求的过程中,线程会因为等待I/O而长时间阻塞

当这种简单模型遇上“昂贵”的平台线程,冲突便产生了。 如果有一万个用户同时请求,我们就需要一万个平台线程。按照我们之前的分析,这将瞬间耗尽服务器的所有资源,导致系统崩溃。

为了解决这个冲突,Java社区被迫走向了复杂的异步编程。我们使用CompletableFuture等工具,通过回调函数将业务逻辑拆得支离破碎,目的只有一个:决不能阻塞宝贵的平台线程

1
2
3
4
5
6
7
8
9
10
// 为了性能而变得复杂的代码
public CompletableFuture<Void> handleRequestAsync(Request request) {
CompletableFuture<UserInfo> userFuture = fetchUser(request.getUserId());
CompletableFuture<ProductInfo> productFuture = fetchProduct(request.getProductId());

return CompletableFuture.allOf(userFuture, productFuture)
.thenAccept(v -> {
renderPage(userFuture.join(), productFuture.join());
});
}

我们用性能换来了代码的复杂、调试的困难和逻辑的晦涩。整个生态都陷入了一种两难的境地:要么选择简单直观但无法扩展的代码,要么选择性能卓越但极其复杂的代码。

平台线程的时代,已经走到了它的黄昏。 我们需要一种新的模式,来终结这种痛苦的权衡。这就是虚拟线程诞生的历史使命:让我们能以同步的方式写代码,同时获得异步的性能。


7.5.2 虚拟线程的核心理念:M:N调度与协作

虚拟线程的解决方案,并非对平台线程进行修补,而是引入了一套全新的、从根本上不同的运作模式。

[革命性设计] 轻量级本质

虚拟线程的第一个颠覆性设计,就是它的“轻量”。

  • JVM管理的对象: 与平台线程不同,虚拟线程不再是操作系统线程的直接映射。它是一个纯粹由JVM在内部管理和调度的Java对象,存活于Java堆内存之上。
  • 极低的内存占用: 平台线程需要预先分配一个巨大的栈(Stack),而虚拟线程的栈则非常小(仅几百字节),并且可以根据需要动态增长和收缩。

这带来了什么?

因为创建和销毁一个虚拟线程的成本极低,几乎和创建一个普通Java对象相当,我们终于可以毫不吝啬地创建海量的虚拟线程——几万、几十万、甚至数百万个,而无需担心耗尽系统内存。

新的比喻:如果说平台线程是洲际导弹发射井,那么虚拟线程就是轻便的无人机。我们可以从一艘航空母舰(JVM)上,轻松地同时起飞成千上万架无人机(虚拟线程),让它们去执行各自的任务。

[底层揭秘] M:N调度模型

虚拟线程打破了与操作系统线程1:1的强绑定关系,采用了一种更为高效的M:N调度模型

  • M: 代表我们应用中创建的大量(M个)虚拟线程。
  • N: 代表JVM使用的一小部分(N个)平台线程,这些平台线程被称为载体线程(Carrier Threads)

JVM内置了一个调度器(默认是ForkJoinPool),它的工作就是将这M个虚拟线程,轮流“骑”到N个载体线程上去执行。通常,载体线程的数量N默认等于CPU的核心数。

[关键机制] 协作式调度:不阻塞载体线程

M:N调度只是基础,虚拟线程真正的魔法在于其协作式调度机制,它保证了宝贵的载体线程永不被无效阻塞。

让我们来看一下当一个虚拟线程遇到I/O阻塞时,发生了什么:

  1. 虚拟线程VT1正在载体线程PT1上运行。
  2. VT1的代码执行到一个阻塞I/O操作,比如socket.read()
  3. 魔法开始: JDK的网络库函数会通知JVM调度器:“VT1要开始等待数据了”。
  4. JVM调度器立即做出反应:
    • 卸载 (Unmount): 将VT1从载体线程PT1上“卸下”,并将其状态(主要是它那小小的栈)保存到堆内存中。
    • 换上 (Mount): 马上从等待执行的虚拟线程队列中,取出另一个就绪的虚拟线程VT2,“装载”到PT1上,让PT1继续执行VT2的任务。
  5. I/O完成: 当VT1等待的网络数据到达后,网络设备会通知操作系统,操作系统再通知JVM。
  6. VT1的状态从“等待”变回“就绪”,JVM调度器会在未来的某个时刻,当有空闲的载体线程时,再将它“装载”回去,从它上次中断的地方继续执行。

整个过程中,载体线程PT1几乎一刻也没有闲着。 它没有因为VT1的I/O等待而被阻塞,而是马上去服务其他虚拟线程了。

[核心优势总结] “同步的写法,异步的性能”

通过这套行云流水的“卸载-装回”机制,虚拟线程实现了惊人的效果:

开发者可以用最简单、最直观的同步阻塞方式来编写业务代码,而JVM则在底层自动将其转换为非阻塞的高性能执行模式。

我们不再需要在CompletableFuture的回调地狱中挣扎,也不再需要为线程池的配置而绞尽脑汁。我们可以重新回到那个清晰的“一个请求一个线程”模型,因为现在,我们拥有了几乎无限且廉价的线程供应。这就是虚拟线程为Java并发编程带来的解放。


7.5.3 API详解:创建与管理虚拟线程

尽管虚拟线程的内部机制非常复杂,但Java的设计者们为其配备了一套极其简洁和现代的API,旨在鼓励开发者采纳“一个任务一个线程”的新模式。

[基础用法] Thread.ofVirtual()

Java 21引入了一个新的Thread.Builder接口,用于以一种流畅的、链式调用的方式创建线程。Thread.ofVirtual()就是获取一个虚拟线程的构建器。

这是对传统new Thread(...)构造函数的现代替代方案。

  • 代码示例1:直接创建并启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    package com.example;

    public class VirtualThreadCreation {
    public static void main(String[] args) throws InterruptedException {
    // 直接调用 Thread.ofVirtual() 方法获取 Builder 实现,并链式调用 start
    Thread vt = Thread.ofVirtual().start(() -> {
    System.out.println("Hello from a virtual thread!");
    System.out.println("当前线程信息: " + Thread.currentThread());
    });

    vt.join(); // 同样可以 join
    }
    }

    输出会清晰地显示线程类型为VirtualThread

  • 代码示例2:使用构建器进行配置
    Thread.Builder允许我们在创建线程时设置名称等属性。

    1
    2
    package com.example;

public class VirtualThreadCreation {
public static void main(String[] args) throws InterruptedException {
// 使用构建器来创建具有特定名称模式的线程
Thread.Builder builder = Thread.ofVirtual().name(“my-worker-”, 0); // 名称会自动递增,如my-worker-0, my-worker-1…

    Runnable task = () -> System.out.println("执行任务的线程: " + Thread.currentThread().getName());

        Thread t1 = builder.start(task); // 创建并启动名为 my-worker-0 的虚拟线程
        Thread t2 = builder.start(task); // 创建并启动名为 my-worker-1 的虚拟线程

        t1.join();
        t2.join();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

##### **[推荐实践] `Executors.newVirtualThreadPerTaskExecutor()`**

虽然我们可以手动创建单个虚拟线程,但在处理大量并发任务时,使用`ExecutorService`仍然是更好的选择,因为它提供了更完善的生命周期管理。

为此,Java 21提供了一个全新的工厂方法:`Executors.newVirtualThreadPerTaskExecutor()`。

* **核心行为**:

* 这个方法创建的`ExecutorService`**不会复用任何线程**。
* 它为提交给它的**每一个任务**,都创建一个**全新的虚拟线程**来执行。

* **为什么这是推荐的方式?**

* 因为虚拟线程的创建成本极低,所以“池化”它们已经没有意义。为每个任务创建一个新的、干净的线程,反而简化了上下文管理,避免了使用`ThreadLocal`时可能产生的复杂问题。
* 这个`ExecutorService`封装了虚拟线程的创建细节,并提供了统一的任务提交和关闭接口,非常方便。

* **代码示例:处理批量I/O任务**
使用`try-with-resources`语法可以确保`ExecutorService`被自动、安全地关闭,这是处理短生命周期任务时的最佳实践。

```java
package com.example;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VirtualThreadExecutorExample {
public static void main(String[] args) {
// 使用try-with-resources确保executor被自动关闭
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {

// 提交10个模拟的I/O密集型任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("开始执行任务 #" + taskId + " in " + Thread.currentThread());
try {
// 模拟网络请求或数据库查询等I/O阻塞
Thread.sleep(Duration.ofSeconds(1));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务 #" + taskId + " 执行完毕。");
});
}

} // 在这里,executor.close()会被自动调用,它会等待所有已提交的任务完成

System.out.println("所有任务已提交,主线程退出。");
}
}

总结: Executors.newVirtualThreadPerTaskExecutor()是我们在虚拟线程时代处理并发任务的“首选武器”。它让我们彻底告别了对传统线程池的复杂配置(核心数、最大数、队列容量、拒绝策略等),让并发编程回归到只关注业务逻辑本身的简单与纯粹。


7.5.4 最佳实践与适用场景分析

虚拟线程虽然强大,但它并非解决所有并发问题的“银弹”。理解其适用边界,并根据任务特性做出正确的选择,是衡量一个高级Java工程师能力的重要标准。

[场景辨析] I/O密集型 vs. CPU密集型

这是决定是否使用虚拟线程的黄金法则

  1. **I/O密集型: **虚拟线程的主场

    • 定义: 任务的大部分时间都在等待I/O操作完成,CPU处于空闲状态。
    • 示例:
      • Web服务器处理HTTP请求。
      • 调用RPC、微服务接口。
      • 访问数据库、Redis缓存。
      • 读写文件、操作消息队列。
    • 为何适用: 在这些场景下,虚拟线程的协作式调度机制能发挥最大威力。当一个虚拟线程等待I/O时,它会让出宝贵的平台载体线程给其他任务使用,从而让CPU得到充分利用,系统可以用极少的平台线程支撑起海量的并发连接。
  2. CPU密集型: 平台线程池的阵地

    • 定义: 任务需要持续占用CPU进行大量的数学或逻辑运算,几乎没有等待时间。
    • 示例:
      • 复杂的科学计算、数据分析。
      • 视频编码、图像渲染。
      • 大规模的数据排序、加密解密。
    • 为何不适用: 对于CPU密集型任务,程序的瓶颈在于CPU核心的数量,而不是线程数量。即使你创建一百万个虚拟线程去执行计算,它们最终还是要抢占仅有的那几个(比如8个或16个)CPU核心。在这种情况下,虚拟线程的调度优势无法体现,反而可能因为额外的调度开销而略微降低性能。
    • 最佳实践: 对于CPU密集型任务,仍然应该使用一个大小与CPU核心数相当的平台线程池 (Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()))。这能确保每个CPU核心都有一个线程在满负荷工作,同时避免了过多的线程上下文切换开销。
[模式回归] “一个任务一个线程”(Thread-Per-Task)模型

在虚拟线程时代,我们应该重新拥抱这个最古老、最直观的并发模型。

  • 旧时代的困境: 由于平台线程的昂贵,我们不得不放弃这个模型,转而使用复杂的线程池和异步回调。
  • 新时代的回归: 虚拟线程的廉价特性,使得为每一个到来的请求或任务都创建一个新的虚拟线程,成为了最高效、最简单的选择。

编码范式转变:

  • 过去: 我们需要精心管理一个线程池,任务来了先提交到队列,然后由池中有限的线程去处理。
  • 现在 (对于I/O密集型任务): 直接使用 Executors.newVirtualThreadPerTaskExecutor(),或者手动 Thread.ofVirtual().start()。不再需要池化,不再需要队列,代码逻辑回归到线性的、同步的、易于理解和调试的状态。
[性能考量] 虚拟线程并非银弹
  1. 它不能让单个操作变快: 虚拟线程解决的是吞吐量的问题,而不是延迟的问题。它能让你的服务器同时处理更多的请求,但不会让单个数据库查询变得更快。
  2. 它不等于无限的CPU: 虚拟线程的魔力在于“等待”时让出CPU。如果你的所有任务都在疯狂计算,那么系统的瓶颈依然是CPU本身。
  3. 它依赖于现代化的JDK库: 虚拟线程的协作式调度能力,依赖于JDK内部的I/O、网络和并发库都已为虚拟线程进行了适配。如果你使用了某些旧的、未适配的第三方库或者自己编写的JNI本地方法,它们在阻塞时可能仍然会“钉住”平台线程。

结论: 在开始一个新项目或重构旧项目时,首先要做的就是分析其核心任务的类型。如果是典型的Web应用或微服务,那么大胆地采用虚拟线程将极大地简化你的代码并提升系统吞吐能力。如果它是一个计算中心或数据处理引擎,那么传统的平台线程池依然是你的不二之选。


7.5.5 [高频][避坑指南] 虚拟线程的陷阱与注意事项

虚拟线程虽然极大地简化了并发编程,但它并非没有“脾气”。如果不了解其内在的一些限制,我们可能会在不经意间写出性能不佳甚至有问题的代码。

[头号陷阱] synchronized 的“钉住”问题

这是使用虚拟线程时最重要、最需要警惕的一个问题。

  • 什么是“钉住”?
    “钉住”是指,当一个虚拟线程在执行某些特定代码时,它会被强制性地“钉”在它的载体平台线程上。在此期间,JVM调度器无法将这个虚拟线程从其载体上卸载,即使它遇到了I/O阻塞。

  • synchronized 为何会导致钉住?
    synchronized关键字的实现,在底层与操作系统线程的某些原生数据结构(如监视器Monitor)紧密相关。当一个虚拟线程进入synchronized代码块时,为了维持锁的正确性,JVM无法安全地将其与底层的平台线程分离。

  • 灾难性后果
    如果在synchronized同步块内部执行了一个阻塞I/O操作,灾难就发生了:

    1. 虚拟线程被“钉住”在载体线程上。
    2. 虚拟线程开始等待I/O,进入阻塞状态。
    3. 由于无法被卸载,其身下的载体平台线程也随之被阻塞。这完全违背了虚拟线程设计的初衷!一个宝贵的平台线程就此被无效占用,整个系统的高吞吐能力会因此受到严重影响。如果多个虚拟线程都发生这种情况,会迅速耗尽载体线程池,使虚拟线程的优势荡然无存。
  • 代码示例(错误示范):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 错误实践:在虚拟线程中,于synchronized块内执行阻塞I/O
    public class PinningProblem {
    private final Object lock = new Object();

    public void badBlockingOperation() throws IOException {
    synchronized (lock) {
    // 当前虚拟线程已被“钉住”到其载体平台线程上
    System.out.println(Thread.currentThread() + " 进入同步块,即将阻塞...");

    // 这个阻塞的I/O操作,将会阻塞宝贵的平台线程!
    System.in.read();

    System.out.println(Thread.currentThread() + " 阻塞结束,退出同步块。");
    }
    }
    }
  • [解决方案] 优先使用 JUC Lock
    在虚拟线程环境中,应始终优先使用 java.util.concurrent.locks.ReentrantLock 来替代 synchronized 关键字。
    ReentrantLock及其它JUC包下的锁,是纯Java实现的,它们被设计为“虚拟线程友好”的,在加锁和解锁时会与JVM调度器协作,不会导致“钉住”问题。

[其他要点] 其他注意事项
  1. 谨慎使用 ThreadLocal

    • 考量: 虚拟线程的生命周期通常很短(一个任务的长度)。虽然这天然地帮助我们避免了传统线程池中因线程复用而导致的ThreadLocal内存泄漏,但如果我们在极短时间内创建了数百万个虚拟线程,并且每个线程都关联了一个ThreadLocal变量,那么这些变量累积起来可能会对GC造成一定的压力。
    • 建议: 保持ThreadLocal中存放的对象尽可能小。在新代码中,优先考虑通过方法参数传递上下文,而不是依赖ThreadLocal
  2. 警惕本地方法调用

  • 如果虚拟线程调用了一个本地方法(JNI),而这个本地方法内部执行了阻塞操作,那么它同样会“钉住”载体线程。因为JVM无法看到C/C++等本地代码的内部,无法对其进行协作式调度。
    * 建议: 在与包含JNI的库交互时,需格外小心,并查阅其文档是否对虚拟线程兼容。
  1. 不要池化虚拟线程

    • 这是一个反模式。反复强调:虚拟线程被设计为用后即焚的,它们非常廉价,无需池化。
    • 试图创建一个“虚拟线程池”来复用虚拟线程,是完全没有必要的,反而会增加代码的复杂性。直接使用Executors.newVirtualThreadPerTaskExecutor(),享受“一个任务一个新线程”的简单与高效。

本章总结: 虚拟线程是Java并发迈向未来的关键一步,它极大地简化了I/O密集型应用的开发。要用好它,核心在于两点:

明确其适用场景(I/O密集型),并坚决避免“钉住”陷阱(优先使用ReentrantLock替代synchronized

掌握了这些,你就能在新的并发时代中游刃有余。


7.6 [工具] 使用 Hutool 简化并发编程

在掌握了JUC的底层原理和复杂API之后,我们常常会思考一个问题:在日常开发中,有没有更便捷、更不易出错的方式来使用这些强大的并发工具?答案是肯定的。

Hutool是一个“瑞士军刀”般的Java工具库,它极大地简化了Java的各种常用操作。其并发模块(包含在hutool-core中)正是为了解决原生JUC API“繁琐”和“易错”的痛点而设计的。

7.6.1 Hutool 并发工具概述与设计哲学

设计哲学

Hutool的并发工具,其设计初衷可以归结为以下几点:

  1. 极致简化: 将原生API(如ThreadPoolExecutor的长构造函数)进行高度封装,提供一目了然、一行调用的静态方法或链式构建器。
  2. 内建最佳实践: Hutool的工具在设计上就遵循了业界推荐的最佳实践。例如,它创建的线程池默认使用有界队列,从源头上帮助开发者避免了使用原生Executors时可能遇到的OOM风险。
  3. 提升代码可读性与开发效率: 大量使用链式调用(Fluent API),让代码逻辑像自然语言一样流畅;将繁琐的try-catch等样板代码内置,让开发者能更专注于业务本身。
核心组件概览

本章我们将重点介绍Hutool并发包中的几个核心利器:

  • ThreadUtil: 线程相关的静态工具类。它提供了大量便捷方法,如快速在线程池中执行任务、安全地休眠线程、创建线程池等。
  • ExecutorBuilder: 线程池构建器。这是对ThreadPoolExecutor构造函数最优雅的封装,通过链式调用的方式,让配置一个参数复杂的线程池变得清晰易读。
  • AsyncUtil: 异步工具类。提供了一些简化CompletableFuture等异步操作的静态方法。
  • ConcurrencyTester: 并发测试器。一个非常实用的小工具,可以用极简的代码模拟高并发场景,用于快速验证代码的线程安全性或进行简单的性能基准测试。
引入Hutool依赖

要在项目中使用这些功能,首先需要添加Hutool的Maven依赖。对于并发工具,我们通常只需要引入其核心包hutool-core即可。

1
2
3
4
5
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.8.27</version>
</dependency>

接下来,我们将从ThreadUtilExecutorBuilder开始,逐一学习如何运用这些工具来“降维打击”日常的并发编程任务。


7.6.2 线程与线程池的快捷方式

1. ThreadUtil - 线程操作万能工具箱

ThreadUtil是Hutool中用于线程相关操作的静态工具类,它将大量原本需要繁琐API调用和异常处理的逻辑,封装成了一行行的便捷方法。

  • 常用方法速查表
方法名功能描述
execute(Runnable task)**(主力)**快速将任务提交到全局共享线程池,适合临时的、非核心的异步任务。
sleep(long millis)以非受检异常方式封装Thread.sleep(),代码更简洁,无需显式try-catch
newExecutor(...)快速创建线程池,提供了比原生Executors更安全的默认值。
newNamedThreadFactory(...)**(推荐)**创建带自定义名称前缀的线程工厂,极大方便问题排查。
getStackTrace(Thread t)获取指定线程的堆栈信息,可用于日志记录和问题分析。
ThreadUtil 实战场景详解

场景1:execute - 快速执行“用后即焚”的异步任务

背景:在一个用户注册成功的主流程中,我们需要异步发送一封欢迎邮件。这个操作不应阻塞注册流程,但重要性不是最高,也没必要为此专门创建一个完整的线程池。ThreadUtil.execute正是为此而生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;

public class ThreadUtilScene1 {
public static void main(String[] args) {
System.out.println("主线程:用户注册流程开始...");

// 模拟注册耗时
ThreadUtil.sleep(1000);
String userEmail = "user_" + RandomUtil.randomString(6) + "@example.com";
System.out.println("主线程:用户 " + userEmail + " 注册成功!");

// 使用ThreadUtil.execute()异步发送邮件
ThreadUtil.execute(() -> {
System.out.println("异步任务:开始为 " + userEmail + " 发送欢迎邮件...");
// 模拟邮件发送耗时
ThreadUtil.sleep(2000);
System.out.println("异步任务:欢迎邮件已发送至 " + userEmail);
});

System.out.println("主线程:注册成功响应已返回给用户,无需等待邮件发送完成。");
}
}
// 输出 (顺序可能略有不同):
// 主线程:用户注册流程开始...
// 主线程:用户 user_xxxxxx@example.com 注册成功!
// 主线程:注册成功响应已返回给用户,无需等待邮件发送完成。
// 异步任务:开始为 user_xxxxxx@example.com 发送欢迎邮件...
// 异步任务:欢迎邮件已发送至 user_xxxxxx@example.com

小结execute是处理简单、临时性异步逻辑的最佳选择。一行代码即可实现,但需注意其使用的是全局共享线程池,不适用于执行关键业务或可能长时间阻塞的任务。


场景2:sleep & newNamedThreadFactory - 编写更专业、更可维护的线程代码

背景:我们需要创建一个专门处理定时上报任务的线程池。为了在日志和监控中清晰地识别这些线程,我们需要为它们命名。同时,在任务循环中需要使用sleep来控制上报频率,我们希望代码尽可能简洁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.example;

import cn.hutool.core.thread.ThreadUtil;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ThreadUtilScene2 {
public static void main(String[] args) {
// 1. 使用ThreadUtil创建带名字的线程工厂
// 线程名将是 "metric-reporter-0", "metric-reporter-1", ...
// 第二个参数是是否开启守护线程
ThreadFactory namedThreadFactory = ThreadUtil.newNamedThreadFactory("metric-reporter-", false);

// 2. 创建一个单线程的线程池,并使用我们自定义的工厂
ExecutorService executor = Executors.newSingleThreadExecutor(namedThreadFactory);

System.out.println("启动监控数据上报任务...");

executor.execute(() -> {
int reportCount = 0;
while (reportCount < 3) {
System.out.println(Thread.currentThread().getName() + ": 上报第 " + (++reportCount) + " 次监控数据...");
// 模拟数据上报...

// 3. 使用ThreadUtil.sleep,代码更整洁
// 对比原生: try { Thread.sleep(3000); } catch (InterruptedException e) { ... }
System.out.println("上报完成,休眠3秒...");
ThreadUtil.sleep(3000);
}
System.out.println(Thread.currentThread().getName() + ": 任务完成。");
});

executor.shutdown();
}
}
// 输出:
// 启动监控数据上报任务...
// metric-reporter-0: 上报第 1 次监控数据...
// 上报完成,休眠3秒...
// metric-reporter-0: 上报第 2 次监控数据...
// 上报完成,休眠3秒...
// metric-reporter-0: 上报第 3 次监控数据...
// 上报完成,休眠3秒...
// metric-reporter-0: 任务完成。

小结newNamedThreadFactory是创建专业线程池不可或缺的一步,它能极大提升系统的可维护性。而ThreadUtil.sleep则是在日常编码中提升代码整洁度的小技巧。


2. ExecutorBuilder - 优雅地构建专业线程池

我们已经知道,直接使用ThreadPoolExecutor的构造函数来创建线程池是最佳实践,但这带来了记忆大量参数和顺序的负担。ExecutorBuilder通过优雅的链式API,完美地解决了这一问题。

  • 常用方法速查表
方法名功能描述
create()**(入口)**静态工厂方法,获取一个ExecutorBuilder实例,开启构建之旅。
setCorePoolSize(int)设置核心线程数。
setMaxPoolSize(int)设置最大线程数。
setKeepAliveTime(...)设置非核心线程的空闲存活时间。
setWorkQueue(...)**(关键)**设置工作队列。这是决定线程池行为的核心配置。
setThreadFactory(...)设置线程工厂,通常与ThreadUtil.newNamedThreadFactory配合使用。
setHandler(...)设置拒绝策略,用于处理线程池超载时的任务。
build()**(出口)**完成所有配置,构建并返回最终的ExecutorService实例。
ExecutorBuilder 实战场景详解

场景1:构建一个标准的、高可用的业务线程池

背景:我们需要为系统中的核心业务(例如处理订单、执行交易)创建一个专用的、健壮的线程池。它必须具备:明确的线程数限制、有界的任务队列、合理的拒绝策略以及可识别的线程名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.example;

import cn.hutool.core.thread.ExecutorBuilder;
import cn.hutool.core.thread.ThreadUtil;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutorBuilderScene1 {
public static void main(String[] args) {
System.out.println("准备构建一个生产级的业务线程池...");

// 使用ExecutorBuilder的链式API进行配置
ExecutorService bizExecutor = ExecutorBuilder.create()
// 设置核心线程数为CPU核心数
.setCorePoolSize(Runtime.getRuntime().availableProcessors())
// 设置最大线程数为核心线程数的2倍
.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2)
// 设置非核心线程空闲60秒后被销毁
.setKeepAliveTime(60L, TimeUnit.SECONDS)
// 设置容量为500的有界队列,防止OOM
.setWorkQueue(new ArrayBlockingQueue<>(500))
// 使用Hutool工具创建带名字的线程工厂,方便定位问题
.setThreadFactory(ThreadUtil.newNamedThreadFactory("biz-order-handler-", false))
// 设置拒绝策略为“调用者运行”,这是一种有效的流量削峰/反压机制
.setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
// 完成构建
.build();

System.out.println("生产级线程池构建完成!\n" + bizExecutor);

// 提交一个任务测试
bizExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 开始执行核心业务...");
ThreadUtil.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 核心业务执行完毕。");
});

// 在实际应用中,线程池会作为单例一直运行,这里为了演示而关闭
bizExecutor.shutdown();
}
}
// 输出 (线程名中的数字可能不同):
// 准备构建一个生产级的业务线程池...
// 生产级线程池构建完成!
// java.util.concurrent.ThreadPoolExecutor@...[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
// biz-order-handler-0 开始执行核心业务...
// biz-order-handler-0 核心业务执行完毕。

小结ExecutorBuilder通过链式调用将ThreadPoolExecutor的复杂配置过程变得类型安全、顺序无关且极易阅读,是创建生产级线程池无可争议的最佳方式。


场景2:利用ExecutorBuilder的智能、安全的默认值

背景:有时我们只需要一个简单的线程池,不想配置所有细节,但又担心原生Executors的OOM陷阱。ExecutorBuilder在这里再次展现了它的优势。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.example;

import cn.hutool.core.thread.ExecutorBuilder;
import cn.hutool.core.thread.ThreadUtil;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public class ExecutorBuilderScene2 {
public static void main(String[] args) {
System.out.println("使用ExecutorBuilder的默认配置创建一个相对安全的线程池...");

// 只配置核心和最大线程数,让其他参数使用Hutool的默认值
ExecutorService saferExecutor = ExecutorBuilder.create()
.setCorePoolSize(2)
.setMaxPoolSize(4)
.build();

// 打印出线程池的实际信息,观察其队列类型和容量
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) saferExecutor;
System.out.println("线程池类型: " + threadPoolExecutor.getClass().getName());
System.out.println("工作队列类型: " + threadPoolExecutor.getQueue().getClass().getName());
System.out.println("工作队列容量: " + threadPoolExecutor.getQueue().remainingCapacity());

saferExecutor.shutdown();
}
}
// 输出:
// 使用ExecutorBuilder的默认配置创建一个相对安全的线程池...
// 线程池类型: java.util.concurrent.ThreadPoolExecutor
// 工作队列类型: java.util.concurrent.LinkedBlockingQueue
// 工作队列容量: 1024

小结:从输出可见,当不指定工作队列时,ExecutorBuilder默认使用了一个容量为1024的有界队列,而不是Integer.MAX_VALUE。这个设计决策极大地提升了安全性,有效防止了因任务堆积导致的内存溢出,同时保持了创建过程的简洁。


7.6.3 异步任务与协调工具

本节我们将学习Hutool中用于简化FutureCompletableFuture操作的工具。虽然原生JUC的功能已经很强大,但Hutool在一些常见协作模式上,提供了更为简洁的“语法糖”。

1. AsyncUtil - 异步操作简化器

AsyncUtil (cn.hutool.core.thread.AsyncUtil) 提供了一系列静态方法,旨在用更少的代码处理异步任务的结果获取和等待。

  • 常用方法速查表
方法名功能描述
waitAll()**(推荐)**阻塞等待所有给定的CompletableFuture执行完毕。是对CompletableFuture.allOf(...).join()的封装,代码更简洁。
get(Future<T> future)以非受检异常的方式获取Future的结果,帮你省去编写try-catch块的麻烦。
waitAny()等待任意一个任务执行完毕
AsyncUtil 实战场景详解

场景1:waitAll - 更优雅地等待多个并行任务完成

背景:再次回到我们的服务编排场景。主线程需要并行发起对“商品信息”、“价格信息”、“库存信息”三个服务的调用,并且必须等待这三个调用全部成功返回后,才能进行最终的数据聚合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.example;


import cn.hutool.core.thread.AsyncUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncUtilScene1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);

System.out.println("主线程:开始并行调用多个下游服务...");

// 1. 并行发起三个异步调用
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(1000);
return "商品基本信息";
}, executor);

CompletableFuture<String> priceFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(2000);
return "商品价格";
}, executor);

CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(1500);
return "商品库存";
}, executor);

System.out.println("主线程:所有服务已发起,等待全部完成...");

// 2. 使用AsyncUtil.waitAll优雅地等待
// 对比原生JDK: CompletableFuture.allOf(productFuture, priceFuture, stockFuture).join();
AsyncUtil.waitAll(productFuture, priceFuture, stockFuture);

System.out.println("主线程:所有服务均已返回!");

// 3. 获取并聚合结果
String result = StrUtil.format("聚合结果:\n - {}\n - {}\n - {}",
productFuture.get(), priceFuture.get(), stockFuture.get());

System.out.println(result);

executor.shutdown();
}
}
// 输出:
// 主线程:开始并行调用多个下游服务...
// 主线程:所有服务已发起,等待全部完成...
// (等待约2秒后)
// 主线程:所有服务均已返回!
// 聚合结果:
// - 商品基本信息
// - 商品价格
// - 商品库存

小结AsyncUtil.waitAll为“等待所有异步任务完成”这一常见模式提供了一个语义更清晰、代码更简洁的静态方法。它虽然只是对原生API的简单封装,但在提升代码可读性上效果显著。


7.6.4 并发测试好帮手:ConcurrencyTester

在编写了一个声称是“线程安全”的类或方法后,我们如何快速、方便地验证它呢?传统的方式是手动编写测试代码,创建多个线程,并使用CountDownLatch等同步器来确保它们尽可能地同时开始执行,代码相对繁琐。

Hutool的ConcurrencyTester (cn.hutool.core.thread.ConcurrencyTester) 就是为了解决这个问题而生的。

由来与用途

ConcurrencyTester 的核心使命是用最简单的API,模拟出高并发的测试场景。它主要用于:

  1. 验证线程安全性: 直观地暴露代码在并发访问下可能出现的竞态条件(Race Condition)和数据不一致问题。
  2. 进行简单性能测试: 粗略地衡量一段代码在多线程并发执行下的性能表现。

它是一个纯粹的测试辅助工具,不应用于生产业务逻辑中。

ConcurrencyTester 核心API

它的API极其精简:

方法名功能描述
ConcurrencyTester(int threadCount)**(入口)**构造函数,创建一个测试器,并指定需要模拟的并发线程数。
test(Runnable runnable)**(核心)**执行并发测试。它会创建并启动threadCount个线程,并使用内部的CyclicBarrier确保所有线程准备就绪后,尽可能地同时开始执行runnable中的逻辑。
getInterval()获取从第一个线程开始到最后一个线程结束的总耗时**(单位:毫秒)**。
ConcurrencyTester 实战场景详解

场景1:直观验证 i++ 的线程不安全性

背景:在7.2.1节,我们从理论上分析了i++操作并非原子性,在多线程下会导致计数错误。现在,我们将用ConcurrencyTester来实际复现这个问题,并与使用AtomicInteger的线程安全版本进行对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.example.hutool;

import cn.hutool.core.thread.ConcurrencyTester;

import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrencyTesterScene1 {

// 1. 一个非线程安全的计数器
static class UnsafeCounter {
private int count = 0;
public void increment() {
count++;
}
public int getCount() {
return count;
}
}

// 2. 一个使用AtomicInteger实现的线程安全计数器
static class SafeCounter {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}

public static void main(String[] args) {
int threadCount = 1000; // 模拟1000个线程并发
int loopCountPerThread = 1000; // 每个线程执行1000次
long expectedValue = (long) threadCount * loopCountPerThread;

// --- 测试非线程安全的计数器 ---
UnsafeCounter unsafeCounter = new UnsafeCounter();
ConcurrencyTester unsafeTester = new ConcurrencyTester(threadCount);
unsafeTester.test(() -> {
for (int i = 0; i < loopCountPerThread; i++) {
unsafeCounter.increment();
}
});

System.out.println("--- 非线程安全测试 ---");
System.out.println("期望结果: " + expectedValue);
System.out.println("实际结果: " + unsafeCounter.getCount());
System.out.println("总耗时: " + unsafeTester.getInterval() + " ms\n");

// --- 测试线程安全的计数器 ---
SafeCounter safeCounter = new SafeCounter();
ConcurrencyTester safeTester = new ConcurrencyTester(threadCount);
safeTester.test(() -> {
for (int i = 0; i < loopCountPerThread; i++) {
safeCounter.increment();
}
});

System.out.println("--- 线程安全测试 ---");
System.out.println("期望结果: " + expectedValue);
System.out.println("实际结果: " + safeCounter.getCount());
System.out.println("总耗时: " + safeTester.getInterval() + " ms");
}
}
// 一次典型的输出:
// --- 非线程安全测试 ---
// 期望结果: 1000000
// 实际结果: 999457
// 总耗时: 51 ms
//
// --- 线程安全测试 ---
// 期望结果: 1000000
// 实际结果: 1000000
// 总耗时: 102 ms

小结ConcurrencyTester以一种无可辩驳的方式,清晰地暴露了UnsafeCounter的线程安全问题。它的结果通常不等于期望值,而SafeCounter则总能得到正确的结果。这证明了ConcurrencyTester是验证代码线程安全性的一个极其有效的“试金石”。