CompletableFuture异步编程
在JDK8之前,若要进行异步编程,只能通过JDK5中提供的Future
和Callable
实现。 若要想知道什么时候异步任务执行完成只能去轮询future.isDone()或者future.get()阻塞等待, 而且Future存在局限性,它很难描述多个异步任务之间的依赖关系。
在JDK8中引入了CompletableFuture
,它本身实现了Future
接口,还结合了ExecutorService
及CompletionStage
可以完成一些Future不能做到的事情。
CompletionStage接口
CompletionStage
分为有返回值(CompletionStage<T>)与无返回值(CompletionStage<Void>)两种,类似Runnable和Callable。
接口方法中频繁出现了一些动词run
、accept
、apply
、handle
、combine
、 compose
,形容词async
、either
、both
,副词then
、when
,主要方法名基本由这些词构成。 通过这些方法实现了任务的串行
、并行
、或聚合
、与聚合
。
方法参数大量使用函数式接口Supplier
、 Consumer
、BiConsumer
、Function
、BiFunction
。
词语 | 描述 |
---|---|
run | 执行一个动作 |
accept | 对返回值执行一个动作 |
apply | 对任务返回值执行一次转换 |
handle | 对任务返回值或异常进行转换,使当前任务结果为转换后的值 |
combine | 将两个任务的返回值进行结合,使当前任务返回值为结合后的值 |
compose | 将当前任务返回值组合成一个新任务返回 |
async | 异步执行,一般带有async方法会有一个重载方法,多一个自定义线程池参数,默认使用ForkJoin common线程池 |
either | 多个任务,任意一个任务完成时继续执行 |
both | 两个任务完成时继续执行 |
then | 当前任务完成后继续执行其他任务 |
when | 当任务完成时,对返回值或异常执行一个动作 |
CompletableFuture API
创建CompletableFuture的方式
方法 描述 constructor 使用构造方法直接new static
runAsync无返回值任务 static
supplyAsync有返回值任务 static
allOf无返回值任务,当所有任务完成时返回 static
anyOf有返回值任务,当惹你任务完成时返回 static
completedFuture有返回值任务,以给定值为任务返回值 除CompletionStage和Future之外的方法
方法 描述 complete 若任务未完成,以给定值完成任务 completeExceptionally 若任务未完成,以给定异常完成任务 getNow 若任务未完成以给定值立即返回,否则返回任务值 join 类似 Future
的get方法,但是若有异常抛出,建议获取阻塞值使用这个方法obtrudeValue 强制以给定值完成任务 obtrudeException 强制以给定异常完成任务 getNumberOfDependents 返回当前任务待完成任务的数量,设计用于系统监控
CompletableFuture示例
异步执行无返回值
java
public void run() {
CompletableFuture<Void> future = CompletableFuture.runAsync(() ->
System.out.printf("%s says, hello world \n", Thread.currentThread().getName())
)
.thenRunAsync(() -> System.out.printf("%s says, hello world again\n", Thread.currentThread().getName()));
future.join();
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
异步执行有返回值
java
public void supply() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApplyAsync(s -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignore) {
}
System.out.printf("%s says, %s\n", Thread.currentThread().getName(), s);
return s + " World";
})
.thenApplyAsync(s -> {
System.out.printf("%s says, %s\n", Thread.currentThread().getName(), s);
return s + "!";
})
.whenComplete((s, e) -> {
if (Objects.nonNull(e)) {
System.out.println(e.getMessage());
}
System.out.printf("%s says, %s\n", Thread.currentThread().getName(), s);
});
System.out.println(future.join());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
多个任务异步执行
java
public void allOf() {
// 异步执行所有任务
CompletableFuture.allOf(
CompletableFuture.runAsync(() ->
System.out.printf("%s says, hello world \n", Thread.currentThread().getName())
),
CompletableFuture.runAsync(() ->
System.out.printf("%s says, hello world again\n", Thread.currentThread().getName())
),
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException ignore) {
}
System.out.printf("%s says, hello world \n", Thread.currentThread().getName());
})
)
// 当所有任务完成时
.join();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
多个任务组合异步执行
java
public void combine() {
// 异步执行带返回值的任务
Integer join = CompletableFuture.completedFuture(1)
.thenCombineAsync(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ignore) {
}
System.out.printf("calc 2 is thread %s\n", Thread.currentThread().getName());
return 2;
}), Integer::sum)
.thenCombineAsync(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException ignore) {
}
System.out.printf("calc 3 is thread %s\n", Thread.currentThread().getName());
return 3;
}), Integer::sum)
.join();
System.out.println(join);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22