跳转至

11 Future and Asynchronous Programming

说明

本文档仅涉及部分内容,仅可用于复习重点知识

1 Future

当执行一个耗时的任务时,我们不想让主线程一直傻等。于是,我们可以把这个任务交给一个后台线程(通常通过 ExecutorService)去执行。这个提交操作不会立即返回结果,而是立即返回一个 Future 对象。这个 Future 对象就是那个后台任务最终会产出结果的凭证。主线程拿到这张凭证后,就可以继续做其他事情了。当主线程需要那个结果时,它就可以拿着这张凭证去取货

使用 Future 最常见的模式是结合 ExecutorServiceCallable

  1. Callable<V>: 一个类似于 Runnable 的接口,但它的 call() 方法可以返回一个结果并抛出异常
  2. ExecutorService: 线程池,用于在后台执行任务

步骤:

  1. 创建一个 ExecutorService(线程池)
  2. 创建一个实现了 Callable 接口的任务
  3. 使用 executor.submit(callableTask) 提交任务,这个方法会立即返回一个 Future 对象
  4. 主线程继续执行其他操作
  5. 当需要结果时,调用 future.get() 方法
方法 作用
V get() 阻塞当前线程,直到后台任务执行完成并返回结果。如果任务已经完成,它会立即返回结果
V get(long timeout, TimeUnit unit) 阻塞等待,但如果在指定的时间内任务仍未完成,它会抛出 TimeoutException。这是一个非常重要的方法,可以防止线程无限期地等待
boolean isDone() 检查任务是否已经完成(无论是正常完成、被取消还是发生异常)。这是一个非阻塞方法,可以用来轮询任务状态
boolean cancel(boolean mayInterruptIfRunning) 尝试取消任务的执行。如果任务还没开始,它将永远不会运行;如果任务已经完成或已被取消,此方法将返回 false;如果任务正在运行,mayInterruptIfRunning = true 会尝试中断正在执行任务的线程,mayInterruptIfRunning = false 不会中断线程,只是让任务在完成后不再设置其结果
boolean isCancelled() 检查任务是否在正常完成前被取消
public class FutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 2. 创建一个 Callable 任务,模拟一个耗时 2 秒的计算
        Callable<String> task = () -> {
            System.out.println("后台任务开始执行...");
            Thread.sleep(2000);
            System.out.println("后台任务执行完毕!");
            return "Hello from Callable";
        };

        // 3. 提交任务,立即获取 Future 对象
        System.out.println("主线程:提交任务到线程池...");
        Future<String> future = executor.submit(task);

        // 4. 主线程继续做其他事情
        System.out.println("主线程:继续执行其他任务...");
        Thread.sleep(500); // 模拟其他工作
        System.out.println("主线程:任务 isDone? " + future.isDone());

        // 5. 当需要结果时,调用 get() 方法
        System.out.println("主线程:准备获取后台任务结果...");
        String result = future.get(); // 这里会阻塞,直到后台任务完成
        System.out.println("主线程:获取到结果: " + result);
        System.out.println("主线程:任务 isDone? " + future.isDone());

        // 关闭线程池
        executor.shutdown();
    }
}

Future 的局限性:

  1. 阻塞式获取结果:future.get() 是阻塞的。我们无法以一种非阻塞的方式,在 Future 完成时注册一个回调函数来处理结果。我们只能通过阻塞或轮询 isDone() 的方式来等待结果,这很不优雅
  2. 无法链式调用/组合:Future 之间很难组合。无法轻松地实现当任务 A 完成后,将其结果作为任务 B 的输入,然后执行任务 B 这样的链式异步操作
  3. 没有统一的异常处理机制:异常只能在调用 get() 时以 ExecutionException 的形式被捕获,无法以回调的方式优雅地处理
  4. 无法手动完成:不能从外部手动设置一个 Future 的结果或异常

2 CompletableFuture

CompletableFuture 的核心优势:

  1. 非阻塞的回调机制:可以通过 thenApply, thenAccept, thenRun 等方法注册回调函数。当异步任务完成时,这些回调函数会被自动触发,无需手动阻塞等待
  2. 强大的组合与编排能力:可以轻松地将多个异步任务串联或并联起来,构建复杂的异步处理流水线

    1. 串行执行:当任务 A 完成后,将其结果作为任务 B 的输入(thenCompose
    2. 并行组合:当任务 A 和任务 B 都完成后,将它们的结果合并处理(thenCombine
    3. 竞速执行:当任务 A 和任务 B 中任意一个完成后,就执行下一步(applyToEither
  3. 优雅的异常处理:提供了 exceptionallyhandle 方法,可以在异步管道中优雅地捕获和处理异常,而不是在调用 get() 时被迫处理 ExecutionException

  4. 可手动完成:可以创建一个 CompletableFuture 实例,并在未来的某个时刻手动调用 complete(value)completeExceptionally(ex) 来完成它。这对于将传统的基于回调的 API 适配到 CompletableFuture 模型中非常有用

通常使用工厂方法来创建并执行一个异步任务:

  1. CompletableFuture.runAsync(Runnable runnable): 运行一个没有返回值的异步任务
  2. CompletableFuture.supplyAsync(Supplier<U> supplier): 运行一个有返回值的异步任务

这两个方法都有一个重载版本,可以接收一个自定义的 Executor(线程池)。可以避免将耗时任务提交到默认的公共 ForkJoinPool,从而实现任务隔离

方法 作用
thenApply(Function<T, U> fn) 接收上一步的结果,对其进行转换,并返回一个包含新结果的 CompletableFuture
thenAccept(Consumer<T> action) 接收上一步的结果,对其进行消费,不返回任何结果(返回 CompletableFuture<Void>
thenRun(Runnable action) 不关心上一步的结果,只要上一步完成了,就执行指定的 Runnable
thenCompose(Function<T, CompletionStage<U>> fn) 接收上一步的结果,并用这个结果去创建并返回一个新的 CompletableFuture。它会将两层 CompletableFuture<CompletableFuture<U>> 扁平化为一层 CompletableFuture<U>
thenCombine(CompletionStage<U> other, BiFunction<T, U, V> fn) 将当前 CompletableFuture 与另一个 other 并行执行,当两者都完成时,将它们的结果传递给 BiFunction 进行合并
allOf(CompletableFuture<?>... cfs) 等待所有给定的 CompletableFuture 都执行完毕。返回 CompletableFuture<Void>
anyOf(CompletableFuture<?>... cfs) 等待任意一个给定的 CompletableFuture 执行完毕
exceptionally(Function<Throwable, T> fn) 如果在异步管道的任何上游步骤中发生异常,这个方法会被触发。它提供了一个机会来处理异常并返回一个默认值,从而让管道可以继续正常执行
handle(BiFunction<T, Throwable, U> fn) 无论上一步是成功还是失败,这个方法总是会被执行。它接收两个参数:结果(成功时)和异常(失败时),两者中只有一个不为 null。这提供了一个统一处理成功和失败场景的地方
public class CompletableFutureDemo {
    public static void main(String[] args) {
        ExecutorService customPool = Executors.newFixedThreadPool(4);

        CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(() -> {
            // 1. 异步获取用户信息
            System.out.println("获取用户信息...");
            return "user-123";
        }, customPool)
        .thenCompose(userId -> {
            // 2. 根据用户信息,异步获取订单列表 (依赖上一步)
            System.out.println("根据 " + userId + " 获取订单列表...");
            return CompletableFuture.supplyAsync(() -> List.of("订单1", "订单2"), customPool);
        })
        .thenCombine(
            // 3. 同时,异步获取推荐商品 (与上一步并行)
            CompletableFuture.supplyAsync(() -> {
                System.out.println("获取推荐商品...");
                return List.of("商品A", "商品B");
            }, customPool),
            // 4. 当订单和推荐都获取到后,合并它们
            (orders, recommendations) -> {
                System.out.println("合并订单和推荐...");
                return "用户订单: " + orders + ", 推荐商品: " + recommendations;
            }
        )
        .exceptionally(ex -> {
            // 统一的异常处理
            System.err.println("发生异常: " + ex.getMessage());
            return "无法加载数据";
        });

        // 主线程可以做其他事,最后获取结果
        System.out.println("主线程不阻塞,继续执行...");
        System.out.println("最终结果: " + finalResult.join()); // join() 类似于 get() 但不抛出受检异常

        customPool.shutdown();
    }
}

评论区

欢迎在评论区指出文档错误,为文档提供宝贵意见,或写下你的疑问