获取线程执行结果
Categories:
Java 1.5 提供了 Callable、Future、FutureTask,它们可以在任务执行完后得到执行结果, 可以实现业务之间的并发执行与返回
在[[procthread]] 我们讲述了创建线程实现任务并发的 3 种方式,
直接继承 Thread
实现 Runnable
接口
实现 Callable<>
接口
前 2 种方式都有一个缺陷:在执行完任务之后无法获取执行结果。这对需要前置任务返回值的线程来说很重要
Java 1.5 提供了 Callable、Future、FutureTask,它们可以在任务执行完后得到执行结果
Callable<>
callable
源码如下
public interface Callable<V> {
V call() throws Exception;
}
可以看到 call()
方法返回值为泛型 V
而
Callable<>
允许返回值的一个任务
Runnable
接口代表一个可以由线程执行的任务, 实现Runnable
而不是callable
主要是线程池的兼容性考虑
Future
异步计算结果接口
Future
表示异步计算的结果,提供了用于检查计算是否完成、等待计算完成、以及检索计算结果的方法。
Future
接口的设计目标是允许任务在一个线程中执行,并且可以返回执行结果或抛出异常
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- 取消任务;
- 判断任务是否被取消;
get()
:等待任务完成,获取执行结果,如果任务取消会抛出异常get(long timeout, TimeUnit unit)
:指定等待任务完成的时间,等待超时会抛出异常
注意,
get()
方法会阻塞主进程, 一直阻塞到定时结束或者线程返回
Future
也属于同步器的一种 因为类内部定义了线程的控制方法, 同步线程之间的状态Future
的同步机制主要体现在它的get()
方法:
- 如果任务已完成,
get()
立即返回结果。- 如果任务尚未完成,
get()
会将调用线程挂起,直到任务完成并且结果可用,才会唤醒线程并返回结果。
isDone()
:判断任务是否完成
isCancelled()
:判断任务是否被取消
cancel(boolean mayInterruptIfRunning)
:尝试取消此任务的执行,如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败
参数 mayInterruptIfRunning
表示是否允许取消正在执行却没有执行完毕的任务
FutureTask
异步计算接口实现类
FutureTask<T>
是 Future
的一个实现类,同时也是 Runnable
,可以直接用线程启动。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask
本身是对异步任务的封装,是Future
的实现类, 实际的线程控制和结果计算是由其完成的
FutureTask
通过run()
方法实现了Callable
的任务执行逻辑,因此兼容了Callable
的行为, 但其本身是Runnable
的任务
使用例子
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) throws Exception {
// 创建一个固定大小为3的线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 创建一个 Callable 数组,用于存储 5 个任务
Callable<Integer>[] tasks = new Callable[5];
for (int i = 0; i < tasks.length; i++) {
final int index = i;
tasks[i] = () -> {
// 模拟任务执行时间,每个任务休眠 index+1 秒
TimeUnit.SECONDS.sleep(index + 1);
// 任务返回 (index + 1) * 100
return (index + 1) * 100;
};
}
// 使用 Future 数组存储任务的执行结果
Future<Integer>[] futures = new Future[tasks.length];
for (int i = 0; i < tasks.length; i++) {
// 提交 Callable 任务到线程池,并返回 Future 对象
futures[i] = executorService.submit(tasks[i]);
}
// 获取任务的执行结果
for (int i = 0; i < futures.length; i++) {
// 调用 get() 方法,会阻塞直到任务完成
System.out.println("Result of task " + (i + 1) + ": " + futures[i].get());
}
// 关闭线程池,释放资源
executorService.shutdown();
}
}
上述代码定义了一系列 callable
任务, 包装为FutureTask
便于线程池使用, 调用 执行器 创建了线程池服务并要求线程池服务执行提交的 FutureTask
Future<Integer>[] futures = new Future[tasks.length]
这里是泛型 实际绑定的是FutureTask
CompleteableFuture
任务编排实现类
A
Future
that may be explicitly completed (setting its value and status), and may be used as aCompletionStage
, supporting dependent functions and actions that trigger upon its completion.
Future
在实际使用过程中存在一些局限性比如不支持异步任务的编排组合以及 使用Future
获得异步执行结果时,要么调用阻塞方法get()
,要么轮询看isDone()
是否为true
,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了
CompletableFuture
,它针对Future
做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
如何使用
创建一个CompletableFuture
是通过CompletableFuture.supplyAsync()
实现的,它需要一个实现了Supplier
接口的对象:
public interface Supplier<T> {
T get();
}
紧接着,CompletableFuture
已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture
完成时和异常时需要回调的实例。完成时,CompletableFuture
会调用Consumer
对象:
public interface Consumer<T> {
void accept(T t);
}
异常时,CompletableFuture
会调用Function
对象:
public interface Function<T, R> {
R apply(T t);
}
示例:
/**
* @program: alog
* @description: 实现类
* @author: hamhuo
**/public class Future {
public static void main(String[] args) throws InterruptedException {
//创建异步执行的耗时任务, 获取一段文字
CompletableFuture<String> task = CompletableFuture.supplyAsync(new fetchString());
//这时已经开始执行了
//如果执行成功执行回调
task.thenAccept((response) -> {
System.out.println(response + " human");
});
//如果失败执行回调
task.exceptionally((e) -> {
e.printStackTrace();
return null;
});
//保证线程池正确关闭, 让主线程在任务完成后等一会
task.join();
}
static class fetchString implements Supplier<String> {
@Override
public String get() {
String message = "hello from star";
try{
Thread.sleep(2000);
message = "re:: correct";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (message.equals("hello from star")) {
throw new RuntimeException("Earth");
}
return message;
}
}
}
thenAcccept()
不会阻塞线程, 是非阻塞的回调方法, 需要主线程手动等待完成
引用: https://liaoxuefeng.com/books/java/threading/completable-future/index.html https://javaguide.cn/java/concurrent/completablefuture-intro.html#future-%E4%BB%8B%E7%BB%8D