CompletableFuture异步多任务最佳实践 简介 在项目中需要执行异步任务时,通常可以考虑到可以用线程池Executor 创建。
如果不需要有返回值,任务实现Runnable 接口
如果需要有返回值,任务实现Callable 接口,调用Executor 的submit 方法,再使用Future 获取即可
如果多个线程存在依赖组合,可使用同步组件CountDownLatch 、CyclicBarrier 以及使用起来较为简单的CompeletableFuture 等
在Java 8中引入了CompletableFuture ,其对Java 5中的Future 类提供了非常强大的拓展,可以帮助我们简化异步编程的步骤,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture 的方法,适用于处理并发的IO密集型任务。
CompletableFuture类图
回顾Future 因为CompletableFuture 实现了Future 接口,因此先回顾Future 的用法。
Future 是Java 5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future 把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future 获取计算结果。
示例 假设我们有两个任务服务,一个查询用户基本信息,一个是查询用户等级信息。如下
1 2 3 4 5 6 7 8 9 10 public class UserInfoService { public UserInfo getUserInfo (Long userId) { try { Thread.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return new UserInfo (userId, "zhangsan" , 18 ); } }
1 2 3 4 5 6 7 8 9 10 public class MemberService { public Member getMemberInfo (Long userId) { try { Thread.sleep(500 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return new Member (1L , userId, "六级大会员" ); } }
接下来,在主线程中使用Future 来进行异步调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class FutureTest { public static void main (String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(10 ); UserInfoService userInfoService = new UserInfoService (); MemberService memberService = new MemberService (); Long userId = 1L ; long startTime = System.currentTimeMillis(); FutureTask<UserInfo> userInfoFutureTask = new FutureTask <>(new Callable <UserInfo>() { @Override public UserInfo call () { return userInfoService.getUserInfo(userId); } }); executorService.submit(userInfoFutureTask); Thread.sleep(300 ); FutureTask<Member> memberFutureTask = new FutureTask <>(new Callable <Member>() { @Override public Member call () { return memberService.getMemberInfo(userId); } }); executorService.submit(memberFutureTask); UserInfo userInfo = userInfoFutureTask.get(); Member member = memberFutureTask.get(); System.out.println(userInfo); System.out.println(member); long endTime = System.currentTimeMillis(); System.out.println("总计用时:" + (endTime - startTime) + "ms" ); executorService.shutdown(); } }
结果:
1 2 3 UserInfo{userId=1, userName='zhangsan' , age=18} Member{memberId=1, userId=1, level='六级大会员' } 总计用时:851ms
如果不使用Future 进行并行异步调用,而是在主线程串行进行的话,耗时大约为$300+500+300 = 1100 ms$。可以发现,Future+线程池 异步配合,提高了程序的执行效率。
但是Future 对于结果的获取,不是很友好,只能通过阻塞 或者轮询的方式 得到任务的结果。
Future.get()
就是阻塞调用,在线程获取结果之前get方法会一直阻塞 。
Future 提供了一个isDone
方法,可以在程序中轮询这个方法查询 执行结果。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源 。因此,JDK 8设计出CompletableFuture 。CompletableFuture 提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
从Future走进CompletableFuture 基于以上的例子,改用CompletableFuture 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class CompletableFutureTest { public static void main (String[] args) throws InterruptedException, ExecutionException { UserInfoService userInfoService = new UserInfoService (); MemberService memberService = new MemberService (); Long userId = 1L ; long startTime = System.currentTimeMillis(); CompletableFuture<UserInfo> userInfoCompletableFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId)); Thread.sleep(300 ); CompletableFuture<Member> memberCompletableFuture = CompletableFuture.supplyAsync(() -> memberService.getMemberInfo(userId)); UserInfo userInfo = userInfoCompletableFuture.get(); Member member = memberCompletableFuture.get(); System.out.println(userInfo); System.out.println(member); long endTime = System.currentTimeMillis(); System.out.println("总计用时:" + (endTime - startTime) + "ms" ); } }
结果:
1 2 3 UserInfo{userId=1, userName='zhangsan' , age=18} Member{memberId=1, userId=1, level='六级大会员' } 总计用时:826ms
可以发现,使用CompletableFuture ,代码简洁了很多。CompletableFuture 的supplyAsync 方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,CompletableFuture 使用了默认线程池ForkJoinPool.commonPool 。
CompletableFuture使用场景 CompletableFuture 提供了几十种方法,辅助我们的异步任务场景。常见的使用场景有
创建异步任务 CompletableFuture 创建异步任务,一般有supplyAsync
和runAsync
两个方法
supplyAsync
执行CompletableFuture 任务,支持返回值
runAsync
执行CompletableFuture 任务,没有返回值 。
supplyAsync方法 1 2 3 4 5 6 7 8 9 10 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) { return asyncSupplyStage(ASYNC_POOL, supplier); } public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
runAsync方法 1 2 3 4 5 6 7 8 9 10 public static CompletableFuture<Void> runAsync (Runnable runnable) { return asyncRunStage(ASYNC_POOL, runnable); } public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class Main { public static void main (String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runAsync方法调用" ), executor); CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> { System.out.println("supplyAsync方法调用" ); return "supplyAsync返回值" ; }, executor); System.out.println(runFuture.join()); System.out.println(supplyFuture.join()); executor.shutdown(); } }
结果:
1 2 3 4 runAsync方法调用 supplyAsync方法调用 null supplyAsync返回值
异步任务回调 CompletableFuture 的简单异步回调方法:
不关心上一个任务的执行返回结果,无传参,无返回值
依赖上一个任务的结果,有传参,无返回值
thenAccept
thenAcceptAsync
依赖上一个任务的结果,有传参,有返回值
某个任务执行异常时,执行的回调方法 exceptionally
某个任务执行完成后,执行的回调方法,无返回值 whenComplete
某个任务执行完成后,执行的回调方法,有返回值 handle
thenRun/thenRunAsync 1 2 3 4 5 6 7 public CompletableFuture<Void> thenRun (Runnable action) { return uniRunStage(null , action); } public CompletableFuture<Void> thenRunAsync (Runnable action) { return uniRunStage(defaultExecutor(), action); }
CompletableFuture 的thenRun
方法,通俗点讲就是,做完第一个任务后,再做第二个任务 。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class FutureThenRunTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> { System.out.println("先执行第一个CompletableFuture方法任务" ); return "第一个任务" ; } ); CompletableFuture<Void> secondFuture = firstFuture.thenRun(() -> { System.out.println("接着执行第二个任务" ); }); System.out.println(firstFuture.get()); System.out.println(secondFuture.get()); } }
结果:
1 2 3 4 先执行第一个CompletableFuture方法任务 接着执行第二个任务 第一个任务 null
thenRun 和thenRunAsync的区别
部分源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor (); public CompletableFuture<Void> thenRun (Runnable action) { return uniRunStage(null , action); } public CompletableFuture<Void> thenRunAsync (Runnable action) { return uniRunStage(defaultExecutor(), action); } public Executor defaultExecutor () { return ASYNC_POOL; }
如果在执行任务的时候,传入了一个自定义线程池:
调用thenRun
方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池 。
调用thenRunAsync
执行第二个任务时,则第一个任务使用的是传入的线程池,第二个任务使用的是ForkJoin线程池
后面介绍的thenAccept
和thenAcceptAsync
,thenApply
和thenApplyAsync
等,它们之间的区别也是这个。
thenAccept/thenAcceptAsync CompletableFuture 的thenAccept
方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果作为入参,传递到回调方法中,但是回调方法是没有返回值 的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class FutureThenAcceptTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync( () -> { System.out.println("原始CompletableFuture方法任务" ); return "firstFuture的返回值" ; } ); CompletableFuture<Void> thenAcceptFuture = firstFuture.thenAccept((o) -> { if ("firstFuture的返回值" .equals(o)) { System.out.println("第一个任务的返回值作为第二个回调方法任务的入参" ); } System.out.println("thenAccept调用" ); }); System.out.println(firstFuture.get()); System.out.println(thenAcceptFuture.get()); } }
结果:
1 2 3 4 5 原始CompletableFuture方法任务 firstFuture的返回值 第一个任务的返回值作为第二个回调方法任务的入参 thenAccept调用 null
thenApply/thenApplyAsync CompletableFuture 的thenApply
方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果作为入参,传递到回调方法中,并且回调方法是有返回值 的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class FutureThenApplyTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync( () -> { System.out.println("原始CompletableFuture方法任务" ); return "firstFuture的返回值" ; } ); CompletableFuture<String> thenApplyFuture = firstFuture.thenApply((o) -> { if ("firstFuture的返回值" .equals(o)) { System.out.println("第一个任务的返回值作为第二个回调方法任务的入参" ); } return "thenApply的返回值" ; }); System.out.println(firstFuture.get()); System.out.println(thenApplyFuture.get()); } }
结果:
1 2 3 4 原始CompletableFuture方法任务 firstFuture的返回值 第一个任务的返回值作为第二个回调方法任务的入参 thenApply的返回值
exceptionally CompletableFuture 的exceptionally
方法表示,某个任务执行异常时,执行的回调方法,并且有抛出异常作为参数 ,传递到回调方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class FutureExceptionTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync( () -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); int i = 1 / 0 ; return "异常" ; } ); CompletableFuture<String> exceptionFuture = firstFuture.exceptionally((e) -> { e.printStackTrace(); return "程序异常" ; }); System.out.println(exceptionFuture.get()); } }
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 当前线程名称:ForkJoinPool.commonPool-worker-1 程序异常 java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply .run(CompletableFuture.java:1770) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply .exec (CompletableFuture.java:1760) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue .topLevelExec(ForkJoinPool.java:1182) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) Caused by: java.lang.ArithmeticException: / by zero at com.lpc.demo.completablefuture.FutureExceptionTest.lambda$main$0 (FutureExceptionTest.java:16) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply .run(CompletableFuture.java:1768) ... 6 more
whenComplete CompletableFuture 的whenComplete
方法表示,某个任务执行完成后,执行的回调方法,无返回值 ;并且whenComplete
方法返回的CompletableFuture 的result是上个任务的结果 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class FutureWhenTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); try { Thread.sleep(2000L ); } catch (InterruptedException e) { e.printStackTrace(); } return "第一个任务的返回值" ; } ); CompletableFuture<String> secondFuture = firstFuture.whenComplete((o, throwable) -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("上个任务已执行完成,还把" + o + "传过来" ); if ("第一个任务的返回值" .equals(o)) { System.out.println("第一个任务的返回值作为第二个回调方法任务的入参" ); } System.out.println("第二个任务调用" ); }); System.out.println(secondFuture.get()); } }
结果:
1 2 3 4 5 6 当前线程名称:ForkJoinPool.commonPool-worker-1 当前线程名称:ForkJoinPool.commonPool-worker-1 上个任务已执行完成,还把第一个任务的返回值传过来 第一个任务的返回值作为第二个回调方法任务的入参 第二个任务调用 第一个任务的返回值
handle CompletableFuture 的handle
方法表示,某个任务执行完成后,执行回调方法, 并且是有返回值的 ,并且handle
方法返回的CompletableFuture 的result是回调方法 执行的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class FutureHandleTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); try { Thread.sleep(2000L ); } catch (InterruptedException e) { e.printStackTrace(); } return "第一个任务的返回值" ; } ); CompletableFuture<String> secondFuture = firstFuture.handle((o, throwable) -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("上个任务已执行完成,还把" + o + "传过来" ); if ("第一个任务的返回值" .equals(o)) { System.out.println("第一个任务的返回值作为第二个回调方法任务的入参" ); return "返回值相同" ; } System.out.println("返回值不同" ); return null ; }); System.out.println(secondFuture.get()); } }
结果:
1 2 3 4 5 当前线程名称:ForkJoinPool.commonPool-worker-1 当前线程名称:ForkJoinPool.commonPool-worker-1 上个任务已执行完成,还把第一个任务的返回值传过来 第一个任务的返回值作为第二个回调方法任务的入参 返回值相同
多个任务组合处理 CompletableFuture 的多个任务的组合处理:
AND组合的关系
thenCombine/thenCombineAsync
thenAcceptBoth/thenAcceptBothAsync
runAfterBoth/runAfterBothAsync
OR组合的关系
applyToEither/applyToEitherAsync
applyEither/applyEitherAsync
runAfterEither/runAfterEitherAsync
anyOf 任意一个任务执行完,就执行anyOf返回的CompletableFuture
allOf 所有任务执行完成后,才执行allOf返回的CompletableFuture
thenCompose 两个CompletableFuture依赖
AND组合关系 thenCombine
/ thenAcceptBoth
/ runAfterBoth
都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务 。
区别:
thenCombine
:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
thenAcceptBoth
: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
runAfterBoth
不会把执行结果当做方法入参,且没有返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class ThenCombineTest { public static void main (String[] args) throws InterruptedException, ExecutionException, TimeoutException { CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务" ); ExecutorService executor = Executors.newFixedThreadPool(10 ); CompletableFuture<String> second = CompletableFuture .supplyAsync(() -> "第二个异步任务" , executor) .thenCombineAsync(first, (a, b) -> { System.out.println(a); System.out.println(b); return "两个异步任务的组合" ; }, executor); System.out.println(second.join()); executor.shutdown(); } }
结果:
1 2 3 第二个异步任务 第一个异步任务 两个异步任务的组合
OR组合关系 applyToEither
/ acceptEither
/ runAfterEither
都表示:将两个CompletableFuture 组合起来,只要其中一个执行完了,就会执行某个任务。
区别:
applyToEither
:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
acceptEither
: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
runAfterEither
: 不会把执行结果当做方法入参,且没有返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class AcceptEitherTest { public static void main (String[] args) { CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000L ); System.out.println("执行完第一个异步任务" ); } catch (Exception e) { return "第一个任务异常" ; } return "第一个异步任务" ; }); ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> second = CompletableFuture .supplyAsync(() -> { System.out.println("执行完第二个任务" ); return "第二个任务" ; }, executor) .acceptEitherAsync(first, System.out::println, executor); executor.shutdown(); } }
结果:
AllOf 所有任务都执行完成后,才执行allOf
返回的CompletableFuture 。如果任意一个任务异常,allOf
的CompletableFuture 执行get
方法会抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 public class AllOfFutureTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(() -> { System.out.println("我执行完了" ); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("我也执行完了" ); }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((o, throwable) -> { System.out.println("finish" ); }); } }
结果:
AnyOf 任意一个任务执行完,就执行anyOf
返回的CompletableFuture 。如果执行的任务异常,anyOf
的CompletableFuture 执行get
方法会抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class AnyOfFutureTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(() -> { try { Thread.sleep(3000L ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("我执行完了" ); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("我也执行完了" ); }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((o, throwable) -> { System.out.println("finish" ); }); anyOfFuture.join(); } }
结果:
thenCompose thenCompose
方法会在某个任务执行完成后,将该任务的执行结果作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture 实例
如果该CompletableFuture 实例的result不为null ,则返回一个基于该result新的CompletableFuture 实例;
如果该CompletableFuture 实例的result为null ,然后就执行这个新任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ThenComposeTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> first = CompletableFuture.completedFuture("第一个任务" ); ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "第二个任务" , executor) .thenComposeAsync(data -> { System.out.println(data); return first; }, executor); System.out.println(future.join()); executor.shutdown(); } }
结果:
使用CompletableFuture的注意点 Future需要获取返回值,才能获取异常信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) { ExecutorService executorService = new ThreadPoolExecutor ( 5 , 10 , 5L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10 ) ); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { int a = 1 / 0 ; return true ; }, executorService).thenAccept(System.out::println); executorService.shutdown(); }
get()/join()方法是阻塞的 CompletableFuture 的get()
方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间
1 2 3 4 CompletableFuture.get(); CompletableFuture.get(3 , TimeUnit.SECONDS);
默认线程池的注意点 CompletableFuture 代码中使用了默认的线程池,处理的线程个数是电脑CPU核数-1 。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢 。一般建议使用自定义线程池,优化线程池配置参数。
自定义线程池时,注意饱和策略 CompletableFuture 的get()
方法是阻塞的,一般建议使用future.get(3, TimeUnit.SECONDS)
。并且一般建议使用自定义线程池。
但是如果线程池拒绝策略是DiscardPolicy 或者DiscardOldestPolicy ,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture 线程池策略最好使用AbortPolicy ,然后耗时的异步线程,做好线程池隔离 。
与传统方法的比较 需求:查询产品列表的名称和对应价格
Product代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class Product { private String name; private Double price; public Product () { } public Product (String name, Double price) { this .name = name; this .price = price; } public String getName () { try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return name; } public void setName (String name) { this .name = name; } public Double getPrice () { try { Thread.sleep(3000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return price; } public void setPrice (Double price) { this .price = price; } }
给出6个产品列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static final List<Product> productList = Arrays.asList( new Product ("苹果" , 6999.0 ), new Product ("华为" , 5999.0 ), new Product ("小米" , 4999.0 ), new Product ("三星" , 6499.0 ), new Product ("OPPO" , 4599.0 ), new Product ("VIVO" , 4599.0 ), new Product ("华为" , 5999.0 ), new Product ("小米" , 4999.0 ), new Product ("三星" , 6499.0 ), new Product ("OPPO" , 4599.0 ), new Product ("华为" , 5999.0 ), new Product ("小米" , 4999.0 ), new Product ("三星" , 6499.0 ), new Product ("OPPO" , 4599.0 ) );
顺序流 1 2 3 4 5 6 7 8 9 public static void main (String[] args) { long startTime = System.currentTimeMillis(); List<String> list = productList.stream() .map(product -> product.getName() + " price is " + product.getPrice()) .collect(Collectors.toList()); long endTime = System.currentTimeMillis(); list.forEach(System.out::println); System.out.println("执行结束,共执行" + (endTime - startTime) / 1000 + "秒" ); }
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 苹果 price is 6999.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 VIVO price is 4599.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 执行结束,共执行56秒
并行流 对此我们采用并行流尝试以多线程的形式执行任务,将stream
改为parallelStream
1 2 3 4 5 6 7 8 9 public static void main (String[] args) { long startTime = System.currentTimeMillis(); List<String> list = productList.parallelStream() .map(product -> product.getName() + " price is " + product.getPrice()) .collect(Collectors.toList()); long endTime = System.currentTimeMillis(); list.forEach(System.out::println); System.out.println("执行结束,共执行" + (endTime - startTime) / 1000 + "秒" ); }
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 苹果 price is 6999.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 VIVO price is 4599.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 执行结束,共执行8 秒
执行时间大幅缩短
使用CompletableFuture执行异步多查询任务 我们给出了CompletableFuture 执行多IO查询任务的代码示例,可以看到代码的执行流程大致为:
遍历产品。
提交异步查询任务。
调用join()
,(join
方法get
方法作用是一样的,都是阻塞获取查询结果,唯一的区别就是join
方法签名没有抛异常,所以无需try-catch处理)。
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { long startTime = System.currentTimeMillis(); List<String> list = productList.stream() .map(product -> CompletableFuture.supplyAsync(() -> product.getName() + " price is " + product.getPrice())) .map(CompletableFuture::join) .collect(Collectors.toList()); long endTime = System.currentTimeMillis(); list.forEach(System.out::println); System.out.println("执行结束,共执行" + (endTime - startTime) / 1000 + "秒" ); }
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 苹果 price is 6999.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 VIVO price is 4599.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 执行结束,共执行56秒
执行时间与顺序流并没什么区别。原因为在一次查询中,先在查询产品名称和产品价格时消耗时间,随后被join()
方法阻塞,以获取结果,最终CompletableFuture 用的和顺序流一样。
分解流优化使用CompletableFuture 上一方法之所以慢,是因为join()
阻塞了流的操作,所以提升效率的方式就是不要让join阻塞流的操作。将流拆成两个,第一个流负责提交任务,即遍历每一个产品的任务提交出去,期间不阻塞,最终会生成一个CompletableFuture 的List。
紧接着遍历上一个流生成的List<CompletableFuture>
,调用join
方法阻塞获取结果,因为上一个流操作提交任务时不阻塞,所以每个任务一提交时就可能已经在执行了,所以join
方法获取结果的耗时也会相对短一些。
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) { long startTime = System.currentTimeMillis(); List<CompletableFuture<String>> completableFutureList = productList.stream() .map(product -> CompletableFuture.supplyAsync(() -> product.getName() + " price is " + product.getPrice())) .collect(Collectors.toList()); List<String> list = completableFutureList.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); long endTime = System.currentTimeMillis(); list.forEach(System.out::println); System.out.println("执行结束,共执行" + (endTime - startTime) / 1000 + "秒" ); }
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 苹果 price is 6999.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 VIVO price is 4599.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 执行结束,共执行8秒
可以看到执行时间与并行流差不多,原因是默认的线程数对于IO密集型任务来说显然是不够的
CompletableFuture使用自定义线程池
《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。BrianGoetz建议,线程池大小与处理器的利用率 之比可以使用下面的公式进行估算: N(threads) = N(CPU)* U(CPU) * (1+ W/C) 其中: N(CPU)是处理器的核的数目,可以通过 Runtime.getRuntime().available Processors() 得到。U(CPU)是期望的 CPU利用率(该值应该介于 0和 1之间) W/C是等待时间与计算时间的比率。
注意此公式仅作参考,若与机器实际偏离过大,可以根据经验或实际情况设置线程数数量,本例将线程数的数量设置的与产品数量差不多,具体需要靠压测进行增减。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor ( 16 , 20 , 1 , TimeUnit.MINUTES, new ArrayBlockingQueue <>(100 )); public static void main (String[] args) { long startTime = System.currentTimeMillis(); List<CompletableFuture<String>> completableFutureList = productList.stream() .map(product -> CompletableFuture.supplyAsync(() -> product.getName() + " price is " + product.getPrice(), threadPool)) .collect(Collectors.toList()); List<String> list = completableFutureList.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); long endTime = System.currentTimeMillis(); list.forEach(System.out::println); System.out.println("执行结束,共执行" + (endTime - startTime) / 1000 + "秒" ); }
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 苹果 price is 6999.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 VIVO price is 4599.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 华为 price is 5999.0 小米 price is 4999.0 三星 price is 6499.0 OPPO price is 4599.0 执行结束,共执行4秒
执行时间又有了质的飞跃
并行流一定比CompletableFuture差吗 如果是计算密集型的任务,使用stream 是最佳姿势,因为密集型需要一直计算,加多少个线程都无济于事,使用stream 简单使用了。 而对于IO密集型的任务,例如上文这种大量查询都需要干等的任务,使用CompletableFuture 是最佳实战了,通过自定义线程创建比CPU核心数更多的线程来提高工作效率才是较好的解决方案