CompletableFuture学习
# CompletableFuture 异步编程
# 前言
我们异步执行一个任务时,一般是用线程池 Executor 去创建。
- 如果不需要有返回值,任务实现 Runnable 接口;
- 如果需要有返回值,任务实现 Callable 接口,调用 Executor 的 submit 方法,再使用 Future 获取即可。
如果多个线程存在依赖组合的话,我们怎么处理呢?
- 可使用同步组件 CountDownLatch、CyclicBarrier 等,但是比较麻烦。
- 其实有简单的方法,就是用 CompeletableFuture。
在现代的软件开发中,处理并发和异步任务变得越来越重要。Java 中的 CompletableFuture
类为我们提供了一种强大的方式来处理异步编程,让我们能够更有效地利用多核处理器和并行执行。
# 源码解析
源码:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
2
从源码可以看出 CompletableFuture
同时实现了 Future
和 CompletionStage
接口。
CompletableFuture
除了提供了更为好用和强大的 Future
特性之外,还提供了函数式编程的能力。
Future 接口有 5 个方法:
boolean cancel(boolean mayInterruptIfRunning)
:尝试取消执行任务。boolean isCancelled()
:判断任务是否被取消。boolean isDone()
:判断任务是否已经被执行完成。get()
:等待任务执行完成并获取运算结果。get(long timeout, TimeUnit unit)
:多了一个超时时间。
CompletionStage 接口
CompletionStage
接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。(大量使用了函数式编程)
# 什么是 CompletableFuture?
CompletableFuture
是 Java 8 引入的一个类,用于支持异步编程和操作多个异步任务。它是 Future
的扩展,提供了更多的功能和灵活性。通过 CompletableFuture
,我们可以将多个异步任务串行或并行执行,然后等待它们的完成结果。
# 使用步骤
# 创建 CompletableFuture
常见的有两种方法
- 通过 new 关键字
- 基于
CompletableFuture
自带的静态工厂方法:runAsync()
、supplyAsync()
。
# new 关键字
通过 new 关键字创建 CompletableFuture
对象这种使用方式可以看作是将 CompletableFuture
当做 Future
来使用。
举例:
- 创建异步运算的载体
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
上面代码创建了一个结果值类型为 RpcResponse<Object>
的 CompletableFuture
,你可以把 resultFuture
看作是异步运算结果的载体。
- 传入运算结果
// complete() 方法只能调用一次,后续调用将被忽略。
resultFuture.complete(rpcResponse);
2
假设在未来的某个时刻,我们得到了最终的结果。这时,我们可以调用 complete()
方法为其传入结果,这表示 resultFuture
已经被完成了。
- 判断任务是否已经被完成
public boolean isDone() {
return result != null;
}
2
3
可以通过 isDone()
方法来检查是否已经完成。(Future 接口的方法)
- 等待任务执行完成并获取运算结果
rpcResponse = completableFuture.get();
可以通过调用 get()
方法来获取异步计算结果。调用 get()
方法的线程会阻塞直到 CompletableFuture
完成运算。(阻塞等待)
- 如果你已经知道计算的结果的话,可以使用静态方法
completedFuture()
来创建CompletableFuture
。
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
assertEquals("hello!", future.get());
2
completedFuture()
方法底层调用的是带参数的 new 方法,只不过,这个方法不对外暴露。
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}
2
3
# 静态工厂方法
- supplyAsync 执行 CompletableFuture 任务,支持返回值
- runAsync 执行 CompletableFuture 任务,没有返回值。因为
runAsync()
方法接受的参数是Runnable
,这是一个函数式接口,不允许返回值。
# supplyAsync 方法
// 使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 使用自定义线程池,根据supplier构建执行任务(推荐)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
2
3
4
supplyAsync()
方法接受的参数是 Supplier<U>
,是一个函数式接口,U
是返回结果值的类型。
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
2
3
4
5
6
7
8
9
10
使用场景:当你需要异步操作且关心返回结果的时候,可以使用 supplyAsync()
方法。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("hello!"));
future.get();// 输出 "hello!"
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello!");
assertEquals("hello!", future2.get()); // 进行断言,判断返回值是否为 "hello!",不通过就会抛出异常
2
3
4
# runAsync 方法
// 使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 使用自定义线程池,根据runnable构建执行任务(推荐)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
2
3
4
使用场景:当你需要异步操作且不关心返回结果的时候可以使用 runAsync()
方法。
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
2
3
4
# 实例代码
public class FutureTest {
public static void main(String[] args) {
//可以自定义线程池
ExecutorService executor = Executors.newCachedThreadPool();
//runAsync的使用
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,关注公众号:捡田螺的小男孩"), executor);
//supplyAsync的使用
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.print("supply,关注公众号:捡田螺的小男孩");
return "捡田螺的小男孩"; }, executor);
//runAsync的future没有返回值,输出null
System.out.println(runFuture.join());
//supplyAsync的future,有返回值
System.out.println(supplyFuture.join());
executor.shutdown(); // 线程池需要关闭
}
}
//输出
run,关注公众号:捡田螺的小男孩
null
supply,关注公众号:捡田螺的小男孩捡田螺的小男孩
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 简单任务异步回调
# 处理异步结算结果
当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个:
thenApply()
thenAccept()
thenRun()
whenComplete()
# thenRun/thenRunAsync
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
2
CompletableFuture 的 thenRun 方法,通俗点讲就是,做完第一个任务后,再做第二个任务。
某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值。
代码示例:
public class FutureThenRunTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("先执行第一个CompletableFuture方法任务");
return "捡田螺的小男孩";
}
);
CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {
System.out.println("接着执行第二个任务");
});
System.out.println(thenRunFuture.get());
}
}
//输出
先执行第一个CompletableFuture方法任务
接着执行第二个任务
null
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
thenRun 和 thenRunAsync 有什么区别?
源码:
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
2
3
4
5
6
7
8
9
如果你执行第一个任务的时候,传入了一个自定义线程池:
- 调用 thenRun 方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
- 调用 thenRunAsync 执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是 ForkJoin 线程池
tips: thenAccept 和 thenAcceptAsync,thenApply 和 thenApplyAsync 等,它们之间的区别也是这个
# thenAccept/thenAcceptAsync
CompletableFuture 的 thenAccept 方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
public class FutureThenAcceptTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("原始CompletableFuture方法任务");
return "捡田螺的小男孩";
}
);
CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {
if ("捡田螺的小男孩".equals(a)) {
System.out.println("关注了");
}
System.out.println("先考虑考虑");
});
System.out.println(thenAcceptFuture.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# thenApply/thenApplyAsync
thenApply()
方法接收一个 Function
实例,用它来处理结果。
// 沿用上一个任务的线程池
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
//使用默认的 ForkJoinPool 线程池(不推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
// 使用自定义线程池(推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
使用示例:
CompletableFuture 的 thenApply 方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
public class FutureThenApplyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("原始CompletableFuture方法任务");
return "捡田螺的小男孩";
}
);
CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {
if ("捡田螺的小男孩".equals(a)) {
return "关注了";
}
return "先考虑考虑";
});
System.out.println(thenApplyFuture.get());
}
}
//输出
原始CompletableFuture方法任务
关注了
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# whenComplete()
CompletableFuture 的 whenComplete 方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且 whenComplete 方法返回的 CompletableFuture 的 result 是上个任务的结果。
public class FutureWhenTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("当前线程名称:" + Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "捡田螺的小男孩";
}
);
CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("上个任务执行完啦,还把" + a + "传过来");
if ("捡田螺的小男孩".equals(a)) {
System.out.println("666");
}
System.out.println("233333");
});
System.out.println(rstFuture.get());
}
}
//输出
当前线程名称:ForkJoinPool.commonPool-worker-1
当前线程名称:ForkJoinPool.commonPool-worker-1
上个任务执行完啦,还把捡田螺的小男孩传过来
666
233333
捡田螺的小男孩
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 异常处理
异步操作可能会失败,CompletableFuture 允许我们使用 exceptionally()
或 handle()
方法来处理异步操作的异常。
handle()
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(defaultExecutor(), fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
示例代码:
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Computation error!");
}
return "hello!";
}).handle((res, ex) -> {
// res 代表返回的结果
// ex 的类型为 Throwable ,代表抛出的异常
return res != null ? res : "world!";
});
assertEquals("world!", future.get());
2
3
4
5
6
7
8
9
10
11
12
exceptionally()
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Computation error!");
}
return "hello!";
}).exceptionally(ex -> {
System.out.println(ex.toString());// CompletionException
return "world!";
});
assertEquals("world!", future.get());
2
3
4
5
6
7
8
9
10
11
设置 CompletableFuture 的结果就是异常
可以使用 completeExceptionally()
方法为其赋值。
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// ...
completableFuture.completeExceptionally(
new RuntimeException("Calculation failed!"));
// ...
completableFuture.get(); // ExecutionException
2
3
4
5
6
# 多个任务组合处理
# AND 组合关系
thenCombine / thenAcceptBoth / runAfterBoth 都表示:将两个 CompletableFuture 组合起来,只有这两个都正常执行完了,才会执行某个任务。
区别在于:
- thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
- thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
- runAfterBoth 不会把执行结果当做方法入参,且没有返回值。
代码示例:
public class ThenCombineTest {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务");
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture
//第二个异步任务
.supplyAsync(() -> "第二个异步任务", executor)
// (w, s) -> System.out.println(s) 是第三个任务
.thenCombineAsync(first, (s, w) -> {
System.out.println(w);
System.out.println(s);
return "两个异步任务的组合";
}, executor);
System.out.println(future.join());
executor.shutdown();
}
}
//输出
第一个异步任务
第二个异步任务
两个异步任务的组合
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# allOf 全部执行完
所有任务都执行完成后,才执行 allOf 返回的 CompletableFuture。如果任意一个任务异常,allOf 的 CompletableFuture,执行 get 方法,会抛出异常。
public class allOfFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
System.out.println("我执行完了");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("我也执行完了");
});
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{
System.out.println("finish");
});
}
}
//输出
我执行完了
我也执行完了
finish
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# anyOf 任一执行完
任意一个任务执行完,就执行 anyOf 返回的 CompletableFuture。如果执行的任务异常,anyOf 的 CompletableFuture,执行 get 方法,会抛出异常。
public class AnyOfFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我执行完了");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("我也执行完了");
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{
System.out.println("finish");
});
anyOfFuture.join(); // 等待任意一个给定的 CompletableFuture 完成
}
}
// 输出
我也执行完了
finish
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
join() 的含义是:等待一个异步操作(也就是 CompletableFuture
)完成并获取其结果。
具体来说,join
方法会阻塞当前线程,直到相应的 CompletableFuture
完成,并返回其计算结果(或异常)。如果在调用 join
时异步操作还未完成,那么当前线程将一直阻塞等待,直到操作完成或者抛出异常。