获取线程执行结果

实际项目中,一个接口可能需要同时获取多种不同的数据,然后再汇总返回,如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。考虑到这些任务之间有大部分都是 无前后顺序关联 的,可以 并行执行 ,就比如说调用获取商品详情的时候,可以同时调用获取物流信息。通过并行执行多个任务的方式,接口的响应速度会得到大幅优化。

Java 1.5 提供了 Callable、Future、FutureTask,它们可以在任务执行完后得到执行结果, 可以实现业务之间的并发执行与返回

在[[procthread]] 我们讲述了创建线程实现任务并发的 3 种方式, 直接继承 Thread 实现 Runnable 接口 实现 Callable<> 接口

前 2 种方式都有一个缺陷:在执行完任务之后无法获取执行结果。这对需要前置任务返回值的线程来说很重要

Java 1.5 提供了 Callable、Future、FutureTask,它们可以在任务执行完后得到执行结果

Callable<>

callable 源码如下

public interface Callable<V> {
    V call() throws Exception;
}

可以看到 call() 方法返回值为泛型 V

Callable<> 允许返回值的一个任务

Runnable 接口代表一个可以由线程执行的任务, 实现 Runnable 而不是 callable 主要是线程池的兼容性考虑

Future 异步计算结果接口

Future 表示异步计算的结果,提供了用于检查计算是否完成、等待计算完成、以及检索计算结果的方法。

Future 接口的设计目标是允许任务在一个线程中执行,并且可以返回执行结果或抛出异常

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • 取消任务;
  • 判断任务是否被取消;
  • get():等待任务完成,获取执行结果,如果任务取消会抛出异常
  • get(long timeout, TimeUnit unit):指定等待任务完成的时间,等待超时会抛出异常

注意, get() 方法会阻塞主进程, 一直阻塞到定时结束或者线程返回

Future 也属于同步器的一种 因为类内部定义了线程的控制方法, 同步线程之间的状态 Future 的同步机制主要体现在它的 get() 方法:

  • 如果任务已完成,get() 立即返回结果。
  • 如果任务尚未完成,get() 会将调用线程挂起,直到任务完成并且结果可用,才会唤醒线程并返回结果。

isDone():判断任务是否完成 isCancelled():判断任务是否被取消 cancel(boolean mayInterruptIfRunning):尝试取消此任务的执行,如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败 参数 mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务

FutureTask 异步计算接口实现类

FutureTask<T>Future 的一个实现类,同时也是 Runnable,可以直接用线程启动。

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

FutureTask 本身是对异步任务的封装,是 Future 的实现类, 实际的线程控制和结果计算是由其完成的

FutureTask 通过 run() 方法实现了 Callable 的任务执行逻辑,因此兼容了 Callable 的行为, 但其本身是 Runnable 的任务

使用例子

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) throws Exception {
        // 创建一个固定大小为3的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // 创建一个 Callable 数组,用于存储 5 个任务
        Callable<Integer>[] tasks = new Callable[5];
        for (int i = 0; i < tasks.length; i++) {
            final int index = i;
            tasks[i] = () -> {
                // 模拟任务执行时间,每个任务休眠 index+1 秒
                TimeUnit.SECONDS.sleep(index + 1);
                // 任务返回 (index + 1) * 100
                return (index + 1) * 100;
            };
        }

        // 使用 Future 数组存储任务的执行结果
        Future<Integer>[] futures = new Future[tasks.length];
        for (int i = 0; i < tasks.length; i++) {
            // 提交 Callable 任务到线程池,并返回 Future 对象
            futures[i] = executorService.submit(tasks[i]);
        }

        // 获取任务的执行结果
        for (int i = 0; i < futures.length; i++) {
            // 调用 get() 方法,会阻塞直到任务完成
            System.out.println("Result of task " + (i + 1) + ": " + futures[i].get());
        }

        // 关闭线程池,释放资源
        executorService.shutdown();
    }
}

了解 executorService.submit()

上述代码定义了一系列 callable 任务, 包装为FutureTask便于线程池使用, 调用 执行器 创建了线程池服务并要求线程池服务执行提交的 FutureTask

Future<Integer>[] futures = new Future[tasks.length] 这里是泛型 实际绑定的是 FutureTask

CompleteableFuture 任务编排实现类

Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.

Future 在实际使用过程中存在一些局限性比如不支持异步任务的编排组合以及 使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

如何使用

创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:

public interface Supplier<T> {
    T get();
}

紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例。完成时,CompletableFuture会调用Consumer对象:

public interface Consumer<T> {
    void accept(T t);
}

异常时,CompletableFuture会调用Function对象:

public interface Function<T, R> {
    R apply(T t);
}

示例:

/**  
 * @program: alog  
 * @description: 实现类  
 * @author: hamhuo  
 **/public class Future {  
    public static void main(String[] args) throws InterruptedException {  
        //创建异步执行的耗时任务, 获取一段文字  
        CompletableFuture<String> task = CompletableFuture.supplyAsync(new fetchString());  
        //这时已经开始执行了  
        //如果执行成功执行回调 
        task.thenAccept((response) -> {  
            System.out.println(response + " human");  
        });  
  
        //如果失败执行回调
        task.exceptionally((e) -> {  
            e.printStackTrace();  
            return null;  
        });  
  
        //保证线程池正确关闭, 让主线程在任务完成后等一会  
        task.join();  
    }  
  
  
    static class fetchString implements Supplier<String> {  
        @Override  
        public  String get() {  
            String message = "hello from star";  
            try{  
                Thread.sleep(2000);  
                message = "re:: correct";  
            } catch (InterruptedException e) {  
                Thread.currentThread().interrupt();  
            }  
            if (message.equals("hello from star")) {  
                throw new RuntimeException("Earth");  
            }  
            return message;  
        }  
    }  
}

thenAcccept() 不会阻塞线程, 是非阻塞的回调方法, 需要主线程手动等待完成

引用: https://liaoxuefeng.com/books/java/threading/completable-future/index.html https://javaguide.cn/java/concurrent/completablefuture-intro.html#future-%E4%BB%8B%E7%BB%8D