CompletableFuture异步多任务最佳实践

简介

在项目中需要执行异步任务时,通常可以考虑到可以用线程池Executor创建。

  • 如果不需要有返回值,任务实现Runnable接口
  • 如果需要有返回值,任务实现Callable接口,调用Executorsubmit方法,再使用Future获取即可
  • 如果多个线程存在依赖组合,可使用同步组件CountDownLatchCyclicBarrier以及使用起来较为简单的CompeletableFuture

在Java 8中引入了CompletableFuture,其对Java 5中的Future类提供了非常强大的拓展,可以帮助我们简化异步编程的步骤,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法,适用于处理并发的IO密集型任务。

微信图片_20240202103610

CompletableFuture类图

回顾Future

因为CompletableFuture实现了Future接口,因此先回顾Future的用法。

Future是Java 5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果。

示例

假设我们有两个任务服务,一个查询用户基本信息,一个是查询用户等级信息。如下

1
2
3
4
5
6
7
8
9
10
public class UserInfoService {
public UserInfo getUserInfo(Long userId) {
try {
Thread.sleep(300); // 模拟查询数据库调用耗时
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new UserInfo(userId, "zhangsan", 18);
}
}
1
2
3
4
5
6
7
8
9
10
public class MemberService {
public Member getMemberInfo(Long userId) {
try {
Thread.sleep(500); // 模拟查询数据库调用耗时
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new Member(1L, userId, "六级大会员");
}
}

接下来,在主线程中使用Future来进行异步调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class FutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
UserInfoService userInfoService = new UserInfoService();
MemberService memberService = new MemberService();
Long userId = 1L;
long startTime = System.currentTimeMillis();

// 调用用户服务获取用户基本信息
FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() {
@Override
public UserInfo call() {
return userInfoService.getUserInfo(userId);
}
});
executorService.submit(userInfoFutureTask);

Thread.sleep(300); //模拟主线程其它操作耗时

// 调用会员服务获取会员信息
FutureTask<Member> memberFutureTask = new FutureTask<>(new Callable<Member>() {
@Override
public Member call() {
return memberService.getMemberInfo(userId);
}
});
executorService.submit(memberFutureTask);

UserInfo userInfo = userInfoFutureTask.get();
Member member = memberFutureTask.get();
System.out.println(userInfo);
System.out.println(member);

long endTime = System.currentTimeMillis();
System.out.println("总计用时:" + (endTime - startTime) + "ms");

executorService.shutdown(); // 关闭线程池
}
}

结果:

1
2
3
UserInfo{userId=1, userName='zhangsan', age=18}
Member{memberId=1, userId=1, level='六级大会员'}
总计用时:851ms

如果不使用Future进行并行异步调用,而是在主线程串行进行的话,耗时大约为$300+500+300 = 1100 ms$。可以发现,Future+线程池异步配合,提高了程序的执行效率。

但是Future对于结果的获取,不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。

  • Future.get() 就是阻塞调用,在线程获取结果之前get方法会一直阻塞
  • Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK 8设计出CompletableFutureCompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

从Future走进CompletableFuture

基于以上的例子,改用CompletableFuture实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CompletableFutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
UserInfoService userInfoService = new UserInfoService();
MemberService memberService = new MemberService();
Long userId = 1L;
long startTime = System.currentTimeMillis();

// 调用用户服务获取用户基本信息
CompletableFuture<UserInfo> userInfoCompletableFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId));

Thread.sleep(300); // 模拟主线程其它操作耗时

// 调用会员服务获取会员信息
CompletableFuture<Member> memberCompletableFuture = CompletableFuture.supplyAsync(() -> memberService.getMemberInfo(userId));

UserInfo userInfo = userInfoCompletableFuture.get();
Member member = memberCompletableFuture.get();
System.out.println(userInfo);
System.out.println(member);

long endTime = System.currentTimeMillis();
System.out.println("总计用时:" + (endTime - startTime) + "ms");
}
}

结果:

1
2
3
UserInfo{userId=1, userName='zhangsan', age=18}
Member{memberId=1, userId=1, level='六级大会员'}
总计用时:826ms

可以发现,使用CompletableFuture,代码简洁了很多。CompletableFuturesupplyAsync方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,CompletableFuture使用了默认线程池ForkJoinPool.commonPool

微信截图_20240202140336

CompletableFuture使用场景

CompletableFuture提供了几十种方法,辅助我们的异步任务场景。常见的使用场景有

  • 创建异步任务
  • 简单异步任务回调
  • 多个任务组合处理

创建异步任务

CompletableFuture创建异步任务,一般有supplyAsyncrunAsync两个方法

  • supplyAsync执行CompletableFuture任务,支持返回值
  • runAsync执行CompletableFuture任务,没有返回值

supplyAsync方法

1
2
3
4
5
6
7
8
9
10
// 使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}

// 使用自定义线程池,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}

runAsync方法

1
2
3
4
5
6
7
8
9
10
// 使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(ASYNC_POOL, runnable);
}

//自定义线程池,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
// runAsync的使用
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runAsync方法调用"), executor);
// supplyAsync的使用
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync方法调用");
return "supplyAsync返回值";
}, executor);

// runAsync的future没有返回值
System.out.println(runFuture.join());
// supplyAsync的future有返回值
System.out.println(supplyFuture.join());

executor.shutdown(); // 关闭线程池
}
}

结果:

1
2
3
4
runAsync方法调用
supplyAsync方法调用
null
supplyAsync返回值

异步任务回调

CompletableFuture的简单异步回调方法:

  • 不关心上一个任务的执行返回结果,无传参,无返回值
    • thenRun
    • thenRunAsync
  • 依赖上一个任务的结果,有传参,无返回值
    • thenAccept
    • thenAcceptAsync
  • 依赖上一个任务的结果,有传参,有返回值
    • thenApply
    • thenApplyAsync
  • 某个任务执行异常时,执行的回调方法 exceptionally
  • 某个任务执行完成后,执行的回调方法,无返回值 whenComplete
  • 某个任务执行完成后,执行的回调方法,有返回值 handle

thenRun/thenRunAsync

1
2
3
4
5
6
7
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}

CompletableFuturethenRun方法,通俗点讲就是,做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FutureThenRunTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {

CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("先执行第一个CompletableFuture方法任务");
return "第一个任务";
}
);

CompletableFuture<Void> secondFuture = firstFuture.thenRun(() -> {
System.out.println("接着执行第二个任务");
});

System.out.println(firstFuture.get());
System.out.println(secondFuture.get());
}
}

结果:

1
2
3
4
先执行第一个CompletableFuture方法任务
接着执行第二个任务
第一个任务
null

thenRun 和thenRunAsync的区别

部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}

public Executor defaultExecutor() {
return ASYNC_POOL;
}

如果在执行任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是传入的线程池,第二个任务使用的是ForkJoin线程池

后面介绍的thenAcceptthenAcceptAsyncthenApplythenApplyAsync等,它们之间的区别也是这个。

thenAccept/thenAcceptAsync

CompletableFuturethenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果作为入参,传递到回调方法中,但是回调方法是没有返回值的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class FutureThenAcceptTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {

CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
() -> {
System.out.println("原始CompletableFuture方法任务");
return "firstFuture的返回值";
}
);

CompletableFuture<Void> thenAcceptFuture = firstFuture.thenAccept((o) -> {
if ("firstFuture的返回值".equals(o)) {
System.out.println("第一个任务的返回值作为第二个回调方法任务的入参");
}
System.out.println("thenAccept调用");
});

System.out.println(firstFuture.get());
System.out.println(thenAcceptFuture.get());
}
}

结果:

1
2
3
4
5
原始CompletableFuture方法任务
firstFuture的返回值
第一个任务的返回值作为第二个回调方法任务的入参
thenAccept调用
null

thenApply/thenApplyAsync

CompletableFuturethenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果作为入参,传递到回调方法中,并且回调方法是有返回值的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FutureThenApplyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
() -> {
System.out.println("原始CompletableFuture方法任务");
return "firstFuture的返回值";
}
);

CompletableFuture<String> thenApplyFuture = firstFuture.thenApply((o) -> {
if ("firstFuture的返回值".equals(o)) {
System.out.println("第一个任务的返回值作为第二个回调方法任务的入参");
}
return "thenApply的返回值";
});

System.out.println(firstFuture.get());
System.out.println(thenApplyFuture.get());
}
}

结果:

1
2
3
4
原始CompletableFuture方法任务
firstFuture的返回值
第一个任务的返回值作为第二个回调方法任务的入参
thenApply的返回值

exceptionally

CompletableFutureexceptionally方法表示,某个任务执行异常时,执行的回调方法,并且有抛出异常作为参数,传递到回调方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FutureExceptionTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {

CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(
() -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
int i = 1 / 0;
return "异常";
}
);

CompletableFuture<String> exceptionFuture = firstFuture.exceptionally((e) -> {
e.printStackTrace();
return "程序异常";
});

System.out.println(exceptionFuture.get());
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
当前线程名称:ForkJoinPool.commonPool-worker-1
程序异常
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.lang.ArithmeticException: / by zero
at com.lpc.demo.completablefuture.FutureExceptionTest.lambda$main$0(FutureExceptionTest.java:16)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
... 6 more

whenComplete

CompletableFuturewhenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFutureresult是上个任务的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class FutureWhenTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "第一个任务的返回值";
}
);

CompletableFuture<String> secondFuture = firstFuture.whenComplete((o, throwable) -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("上个任务已执行完成,还把" + o + "传过来");
if ("第一个任务的返回值".equals(o)) {
System.out.println("第一个任务的返回值作为第二个回调方法任务的入参");
}
System.out.println("第二个任务调用");
});

System.out.println(secondFuture.get());
}
}

结果:

1
2
3
4
5
6
当前线程名称:ForkJoinPool.commonPool-worker-1
当前线程名称:ForkJoinPool.commonPool-worker-1
上个任务已执行完成,还把第一个任务的返回值传过来
第一个任务的返回值作为第二个回调方法任务的入参
第二个任务调用
第一个任务的返回值

handle

CompletableFuturehandle方法表示,某个任务执行完成后,执行回调方法,并且是有返回值的,并且handle方法返回的CompletableFuture的result是回调方法执行的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class FutureHandleTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "第一个任务的返回值";
}
);

CompletableFuture<String> secondFuture = firstFuture.handle((o, throwable) -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("上个任务已执行完成,还把" + o + "传过来");
if ("第一个任务的返回值".equals(o)) {
System.out.println("第一个任务的返回值作为第二个回调方法任务的入参");
return "返回值相同";
}
System.out.println("返回值不同");
return null;
});

System.out.println(secondFuture.get());
}
}

结果:

1
2
3
4
5
当前线程名称:ForkJoinPool.commonPool-worker-1
当前线程名称:ForkJoinPool.commonPool-worker-1
上个任务已执行完成,还把第一个任务的返回值传过来
第一个任务的返回值作为第二个回调方法任务的入参
返回值相同

多个任务组合处理

CompletableFuture的多个任务的组合处理:

  • AND组合的关系
    • thenCombine/thenCombineAsync
    • thenAcceptBoth/thenAcceptBothAsync
    • runAfterBoth/runAfterBothAsync
  • OR组合的关系
    • applyToEither/applyToEitherAsync
    • applyEither/applyEitherAsync
    • runAfterEither/runAfterEitherAsync
  • anyOf 任意一个任务执行完,就执行anyOf返回的CompletableFuture
  • allOf 所有任务执行完成后,才执行allOf返回的CompletableFuture
  • thenCompose 两个CompletableFuture依赖

AND组合关系

thenCombine/ thenAcceptBoth/ runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务

区别:

  • thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
  • thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
  • runAfterBoth 不会把执行结果当做方法入参,且没有返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThenCombineTest {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务");
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> second = CompletableFuture
// 第二个异步任务
.supplyAsync(() -> "第二个异步任务", executor)
.thenCombineAsync(first, (a, b) -> {
System.out.println(a);
System.out.println(b);
return "两个异步任务的组合";
}, executor);
System.out.println(second.join());
executor.shutdown();
}
}

结果:

1
2
3
第二个异步任务
第一个异步任务
两个异步任务的组合

OR组合关系

applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。

区别:

  • applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
  • acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
  • runAfterEither: 不会把执行结果当做方法入参,且没有返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class AcceptEitherTest {
public static void main(String[] args) {
// 第一个异步任务,休眠2秒,保证它执行晚点
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000L);
System.out.println("执行完第一个异步任务");
} catch (Exception e) {
return "第一个任务异常";
}
return "第一个异步任务";
});
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> second = CompletableFuture
// 第二个异步任务
.supplyAsync(() -> {
System.out.println("执行完第二个任务");
return "第二个任务";
}, executor)
// 第三个任务
.acceptEitherAsync(first, System.out::println, executor);
executor.shutdown();
}
}

结果:

1
2
执行完第二个任务
第二个任务

AllOf

所有任务都执行完成后,才执行allOf返回的CompletableFuture。如果任意一个任务异常,allOfCompletableFuture执行get方法会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
public class AllOfFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {
System.out.println("我执行完了");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("我也执行完了");
});
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((o, throwable) -> {
System.out.println("finish");
});
}
}

结果:

1
2
3
我执行完了
我也执行完了
finish

AnyOf

任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOfCompletableFuture执行get方法会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class AnyOfFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我执行完了");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("我也执行完了");
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((o, throwable) -> {
System.out.println("finish");
});
anyOfFuture.join();
}
}

结果:

1
2
3
我也执行完了
finish

thenCompose

thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

  • 如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;
  • 如果该CompletableFuture实例的result为null,然后就执行这个新任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThenComposeTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> first = CompletableFuture.completedFuture("第一个任务");
ExecutorService executor = Executors.newSingleThreadExecutor();
// 第二个异步任务
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "第二个任务", executor)
.thenComposeAsync(data -> {
System.out.println(data);
return first; // 使用第一个任务作为返回
}, executor);
System.out.println(future.join());
executor.shutdown();
}
}

结果:

1
2
第二个任务
第一个任务

使用CompletableFuture的注意点

Future需要获取返回值,才能获取异常信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
5L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10)
);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int a = 1 / 0;
return true;
}, executorService).thenAccept(System.out::println);

//如果不加 get()方法这一行,看不到异常信息
//future.get();
executorService.shutdown();
}

get()/join()方法是阻塞的

CompletableFutureget()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间

1
2
3
4
//反例
CompletableFuture.get();
//正例
CompletableFuture.get(3, TimeUnit.SECONDS);

默认线程池的注意点

CompletableFuture代码中使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

自定义线程池时,注意饱和策略

CompletableFutureget()方法是阻塞的,一般建议使用future.get(3, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。

但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离

与传统方法的比较

需求:查询产品列表的名称和对应价格

Product代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Product {
private String name;
private Double price;

public Product() {
}

public Product(String name, Double price) {
this.name = name;
this.price = price;
}

public String getName() {
try {
Thread.sleep(1000); // 模拟查询数据库耗时
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return name;
}

public void setName(String name) {
this.name = name;
}

public Double getPrice() {
try {
Thread.sleep(3000); // 模拟查询数据库耗时
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return price;
}

public void setPrice(Double price) {
this.price = price;
}
}

给出6个产品列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final List<Product> productList = Arrays.asList(
new Product("苹果", 6999.0),
new Product("华为", 5999.0),
new Product("小米", 4999.0),
new Product("三星", 6499.0),
new Product("OPPO", 4599.0),
new Product("VIVO", 4599.0),
new Product("华为", 5999.0),
new Product("小米", 4999.0),
new Product("三星", 6499.0),
new Product("OPPO", 4599.0),
new Product("华为", 5999.0),
new Product("小米", 4999.0),
new Product("三星", 6499.0),
new Product("OPPO", 4599.0)
);

顺序流

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> list = productList.stream()
.map(product -> product.getName() + " price is " + product.getPrice())
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
list.forEach(System.out::println);
System.out.println("执行结束,共执行" + (endTime - startTime) / 1000 + "秒");
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
苹果 price is 6999.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
VIVO price is 4599.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
执行结束,共执行56秒

并行流

对此我们采用并行流尝试以多线程的形式执行任务,将stream改为parallelStream

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> list = productList.parallelStream()
.map(product -> product.getName() + " price is " + product.getPrice())
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
list.forEach(System.out::println);
System.out.println("执行结束,共执行" + (endTime - startTime) / 1000 + "秒");
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
苹果 price is 6999.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
VIVO price is 4599.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
执行结束,共执行8

执行时间大幅缩短

使用CompletableFuture执行异步多查询任务

我们给出了CompletableFuture执行多IO查询任务的代码示例,可以看到代码的执行流程大致为:

  1. 遍历产品。
  2. 提交异步查询任务。
  3. 调用join(),(join方法get方法作用是一样的,都是阻塞获取查询结果,唯一的区别就是join方法签名没有抛异常,所以无需try-catch处理)。
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> list = productList.stream()
.map(product -> CompletableFuture.supplyAsync(() -> product.getName() + " price is " + product.getPrice()))
.map(CompletableFuture::join)
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
list.forEach(System.out::println);
System.out.println("执行结束,共执行" + (endTime - startTime) / 1000 + "秒");
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
苹果 price is 6999.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
VIVO price is 4599.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
执行结束,共执行56秒

执行时间与顺序流并没什么区别。原因为在一次查询中,先在查询产品名称和产品价格时消耗时间,随后被join()方法阻塞,以获取结果,最终CompletableFuture用的和顺序流一样。

分解流优化使用CompletableFuture

上一方法之所以慢,是因为join()阻塞了流的操作,所以提升效率的方式就是不要让join阻塞流的操作。将流拆成两个,第一个流负责提交任务,即遍历每一个产品的任务提交出去,期间不阻塞,最终会生成一个CompletableFuture的List。

紧接着遍历上一个流生成的List<CompletableFuture>,调用join方法阻塞获取结果,因为上一个流操作提交任务时不阻塞,所以每个任务一提交时就可能已经在执行了,所以join方法获取结果的耗时也会相对短一些。

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<CompletableFuture<String>> completableFutureList = productList.stream()
.map(product -> CompletableFuture.supplyAsync(() -> product.getName() + " price is " + product.getPrice()))
.collect(Collectors.toList()); //CompletableFuture提交价格查询任务
List<String> list = completableFutureList.stream()
.map(CompletableFuture::join) //用join阻塞获取结果
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
list.forEach(System.out::println);
System.out.println("执行结束,共执行" + (endTime - startTime) / 1000 + "秒");
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
苹果 price is 6999.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
VIVO price is 4599.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
执行结束,共执行8秒

可以看到执行时间与并行流差不多,原因是默认的线程数对于IO密集型任务来说显然是不够的

CompletableFuture使用自定义线程池

《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。BrianGoetz建议,线程池大小与处理器的利用率 之比可以使用下面的公式进行估算: N(threads) = N(CPU)* U(CPU) * (1+ W/C) 其中: N(CPU)是处理器的核的数目,可以通过 Runtime.getRuntime().available Processors() 得到。U(CPU)是期望的 CPU利用率(该值应该介于 0和 1之间) W/C是等待时间与计算时间的比率。

注意此公式仅作参考,若与机器实际偏离过大,可以根据经验或实际情况设置线程数数量,本例将线程数的数量设置的与产品数量差不多,具体需要靠压测进行增减。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
16,
20,
1,
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(100));

public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<CompletableFuture<String>> completableFutureList = productList.stream()
.map(product -> CompletableFuture.supplyAsync(() -> product.getName() + " price is " + product.getPrice(), threadPool))
.collect(Collectors.toList()); // CompletableFuture提交价格查询任务
List<String> list = completableFutureList.stream()
.map(CompletableFuture::join) // 用join阻塞获取结果
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
list.forEach(System.out::println);
System.out.println("执行结束,共执行" + (endTime - startTime) / 1000 + "秒");
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
苹果 price is 6999.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
VIVO price is 4599.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
华为 price is 5999.0
小米 price is 4999.0
三星 price is 6499.0
OPPO price is 4599.0
执行结束,共执行4秒

执行时间又有了质的飞跃

并行流一定比CompletableFuture差吗

如果是计算密集型的任务,使用stream是最佳姿势,因为密集型需要一直计算,加多少个线程都无济于事,使用stream简单使用了。 而对于IO密集型的任务,例如上文这种大量查询都需要干等的任务,使用CompletableFuture是最佳实战了,通过自定义线程创建比CPU核心数更多的线程来提高工作效率才是较好的解决方案