跳转至

13 Virtual Thread and Reactive

说明

本文档仅涉及部分内容,仅可用于复习重点知识

1 Virtual Thread

传统线程(平台线程):使用 new Thread() 创建的线程都是平台线程。它们是操作系统线程的直接包装。创建一个平台线程就意味着在操作系统内核中创建了一个对应的线程。但是平台线程是昂贵的资源,数量有限,上下文切换开销大

这个模型在处理 I/O 密集型任务(如网络请求、数据库访问)时效率低下。当一个平台线程发起一个网络请求并等待响应时,它会被阻塞 (block)。在这个等待期间,这个昂贵的平台线程什么也做不了,完全被浪费了

虚拟线程不是 OS 线程。它们是 JVM 内部管理的对象。JVM 会将大量的(M 个)虚拟线程,调度到一小部分(N 个)平台线程上运行。这些作为载体的平台线程被称为载体线程 (Carrier Threads)。通常,载体线程的数量等于 CPU 的核心数

当一个虚拟线程执行的代码遇到一个阻塞的 I/O 操作时,它不会阻塞底层的载体线程。相反,JVM 会自动执行以下操作:

  1. 卸载 (Unmount):JVM 将这个正在等待 I/O 的虚拟线程从其载体线程上卸载下来,并保存其状态
  2. 运行其他任务:载体线程立即被释放,可以去执行另一个准备就绪的虚拟线程
  3. 重新挂载 (Mount):当 I/O 操作完成后(例如,网络数据到达),JVM 会将原来的虚拟线程重新提交给一个可用的载体线程,让它从中断的地方继续执行

这个过程对开发者是完全透明的。写的代码看起来是同步阻塞的,但底层却实现了非阻塞的高效调度

创建虚拟线程的 3 种方法:

// 最简单的方式
// Thread.startVirtualThread(Runnable)
Thread.startVirtualThread(() -> {
    System.out.println("Hello from a virtual thread!");
});

// 使用构建器
Thread vt2 = Thread.ofVirtual()
    .name("my-virtual-thread")  // 设置线程名称
    .start(() -> {
        System.out.println("有名字的虚拟线程");
    });

// 推荐的最佳实践
// Executors.newVirtualThreadPerTaskExecutor()
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    for (int i = 0; i < 100_000; i++) {
        int taskNumber = i;
        executor.submit(() -> {
            // 模拟一个耗时 1 秒的网络请求
            Thread.sleep(Duration.ofSeconds(1));
            System.out.println("Task " + taskNumber + " completed.");
            return taskNumber;
        });
    }
}

何时不应该使用虚拟线程:

  1. 不适合 CPU 密集型任务:如果任务是长时间的纯计算(例如,视频编码、复杂的数学运算),它会一直占用着它的载体线程。由于载体线程的数量约等于 CPU 核心数,程序并不会因为使用了虚拟线程而获得比平台线程更多的并行计算能力。对于这类任务,仍然应该使用数量与 CPU 核心数相当的平台线程池
  2. 线程固定 Pinning:在某些情况下,虚拟线程会被固定到它的载体线程上,导致即使它阻塞了,也无法被卸载。这会使载体线程被阻塞,从而降低性能。常见的情况包括:在 synchronized 块内阻塞;调用了某些本地方法(JNI)并阻塞。解决方法是尽量使用 java.util.concurrent.locks.ReentrantLock 来代替 synchronized

2 Structured Concurrency

传统并发模型的问题:以 ExecutorService 为例,这种模型通常被称为发射后不管 (fire-and-forget)

1
2
3
4
5
6
7
// 传统方式
Future<String> userFuture = executor.submit(() -> findUser());
Future<Integer> orderFuture = executor.submit(() -> fetchOrder());

// ... 在代码的某个遥远的地方 ...
String user = userFuture.get();
Integer order = orderFuture.get();

这种方式存在几个严重问题:

  1. 错误处理困难:如果 fetchOrder() 任务抛出异常,而 findUser() 任务仍在运行,主线程只有在调用 orderFuture.get() 时才会发现异常。此时,findUser() 可能已经白白运行了很久,浪费了资源
  2. 任务取消混乱:如果主线程想取消整个操作,它必须手动追踪所有 Future 对象并逐个调用 cancel()。如果主线程本身被中断,这些后台任务很可能会泄漏,永远在后台运行下去
  3. 生命周期不明确:子任务的生命周期与父任务(或代码块)的生命周期是脱钩的。代码块可以执行完毕,但子任务可能仍在后台运行,这使得程序状态难以推理
  4. 可读性差:任务的启动(submit)和结果的获取(get)在代码中通常相隔很远,使得理解一段代码的完整并发流程变得困难

结构化并发是一种全新的并发编程模型,旨在通过将并发任务的生命周期与代码的语法结构绑定,来解决传统并发模型中常见的错误处理、任务取消和资源泄漏等问题

结构化并发的核心思想是:如果一个任务分解为多个并发的子任务,那么这些子任务必须在主任务继续执行之前全部完成(无论是成功还是失败)

它将并发任务的范围(scope)限制在一个明确的语法块内,通常是 try-with-resources 语句。这带来了与结构化编程(如 if, for 循环)类似的好处:单一入口,单一出口

当代码执行离开这个语法块时,可以百分之百地确定,所有在这个块内启动的并发任务都已经终止了

StructuredTaskScope 是实现结构化并发的主要工具

  1. try-with-resources 语句中创建一个 StructuredTaskScope 实例
  2. 使用 scope.fork(Callable<T> task) 方法启动一个或多个子任务。这些任务会在新的虚拟线程中并发执行
  3. 调用 scope.join() 方法,阻塞等待所有子任务完成
  4. 调用 scope.throwIfFailed() 来检查是否有任何子任务失败。如果有,它会将在子任务中发生的异常重新抛到当前线程中
  5. 如果所有任务都成功,通过 Future.resultNow() 获取各个子任务的结果
public class StructuredConcurrencyDemo {

    String findUser() throws InterruptedException {
        Thread.sleep(1000);
        System.out.println("找到了用户");
        return "User A";
    }

    Integer fetchOrder() throws InterruptedException {
        Thread.sleep(1500);
        System.out.println("获取了订单");
        // 模拟一个错误
        // if (true) throw new RuntimeException("获取订单失败");
        return 123;
    }

    String handle() throws Exception {
        // 1. 在 try-with-resources 中创建 Scope
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

            // 2. fork 子任务,它们会并发执行
            Future<String> userFuture = scope.fork(() -> findUser());
            Future<Integer> orderFuture = scope.fork(() -> fetchOrder());

            // 3. 等待所有子任务完成
            scope.join();

            // 4. 如果有任何子任务失败,则抛出异常
            //    同时,另一个未完成的任务会被自动取消
            scope.throwIfFailed();

            // 5. 如果都成功,则处理结果
            return String.format("用户: %s, 订单号: %d", userFuture.resultNow(), orderFuture.resultNow());
        }
        // 当代码执行到这里时,可以保证 findUser 和 fetchOrder 两个任务都已经结束了
    }

    public static void main(String[] args) throws Exception {
        StructuredConcurrencyDemo demo = new StructuredConcurrencyDemo();
        System.out.println("开始处理...");
        String result = demo.handle();
        System.out.println("最终结果: " + result);
    }
}

3 Reactive

响应式编程是一种围绕异步数据流 (Asynchronous Data Streams) 进行构建的声明式编程范式

  1. 数据流 (Stream):在响应式编程中,任何东西都可以是一个流。一个流可以发出零个、一个或多个数据项,最后可能以一个完成信号或一个错误信号结束。例如:变量的变更、用户的输入、HTTP 请求、数据库查询结果等
  2. 异步 (Asynchronous):操作是非阻塞的。当你对一个流应用一个操作时,你不会等待它完成,而是定义了当数据到达时,该如何处理,然后继续做其他事
  3. 声明式 (Declarative):你只需要声明你想要做什么(What),而不需要关心具体如何实现(How)。你定义一个处理管道,数据就会自动流过这个管道

在普通的命令式编程中,如果写 C1 = A1 + B1C1 会被计算一次。之后即使 A1B1 变了,C1 也不会自动更新。在响应式编程中,C1 订阅了 A1B1 的变化。代码声明了 C1A1, B1 的关系。任何时候 A1B1 的值发生变化,C1 都会自动地、响应式地更新自己的值

响应式编程的四大原则:

  1. 响应性 (Responsive):系统能够及时地响应用户请求
  2. 弹性 (Resilient):系统在出现故障时仍能保持响应。例如,一个组件的失败不应该导致整个系统崩溃
  3. 伸缩性 (Elastic):系统在不同的工作负载下都能保持响应。它可以通过增减资源来自动适应负载的变化
  4. 消息驱动 (Message Driven):系统的各个组件之间通过异步消息进行通信,实现了松耦合、隔离和位置透明性

Reactive Streams Specification 标准定义了一套用于处理异步数据流的最小化接口。它规范了不同响应式库之间的互操作性。其核心接口包括:

  1. Publisher<T>: 生产者,负责发布(发出)类型为 T 的数据流
  2. Subscriber<T>: 消费者,负责订阅 Publisher 并处理它发出的数据
  3. Subscription: 代表一个 Subscriber 和一个 Publisher 之间的连接。Subscriber 通过它来请求数据(实现反压)或取消订阅
  4. Processor<T, R>: 一个同时实现了 PublisherSubscriber 的中间处理阶段

Back-pressure(反压,背压):如果生产者(Publisher)产生数据的速度远远快于消费者(Subscriber)处理数据的速度,消费者可能会因为内存溢出而崩溃。反压机制将控制权交给了消费者。消费者通过 subscription.request(n) 方法来告诉生产者请给我 n 个数据项。生产者在收到请求之前,不会发送超过消费者请求数量的数据。这是一种拉模式,确保了消费者不会被压垮

Java 中响应式编程的实现库有 Project Reactor,它提供了两种核心的 Publisher 实现:

  1. Flux<T>: 代表一个包含 0 到 N 个元素的异步序列
  2. Mono<T>: 代表一个包含 0 或 1 个元素的异步序列。非常适合表示那些只会返回一个结果的异步操作(如单个数据库查询或 HTTP 响应)
public class ReactiveDemo {
    public static void main(String[] args) {
        System.out.println("开始定义响应式流...");

        Flux<String> stream = Flux.just("apple", "banana", "cherry", "apricot", "blueberry")
            .map(fruit -> {
                System.out.println("转换: " + fruit);
                return fruit.toUpperCase();
            })
            .filter(fruit -> {
                System.out.println("过滤: " + fruit);
                return fruit.startsWith("A");
            });

        System.out.println("流已定义,但尚未执行。");

        // 关键:在调用 subscribe() 之前,什么都不会发生!
        // 这被称为“冷发布者”(Cold Publisher)
        System.out.println("准备订阅...");
        stream.subscribe(
            data -> System.out.println("消费者收到: " + data),      // onNext: 处理每个数据
            error -> System.err.println("发生错误: " + error),     // onError: 处理错误
            () -> System.out.println("流处理完成!")              // onComplete: 处理完成信号
        );
    }
}

根据输出分析:输出不是先全部转换,再全部过滤,最后全部消费。而是数据一个一个地流过整个管道

output
开始定义响应式流...
流已定义但尚未执行
准备订阅...
转换: apple
过滤: APPLE
消费者收到: APPLE
转换: banana
过滤: BANANA
转换: cherry
过滤: CHERRY
转换: apricot
过滤: APRICOT
消费者收到: APRICOT
转换: blueberry
过滤: BLUEBERRY
流处理完成!

评论区

欢迎在评论区指出文档错误,为文档提供宝贵意见,或写下你的疑问