通俗易懂讲 CompletableFuture
使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
构造任务
我们通常采用静态方法来构造任务,接下来演示几种常见的构造任务的方式。
supplyAsync
supplyAsync() 方法的参数是一个 Supplier 函数式接口
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
Supplier 接口不接收参数,但是返回一个泛型对象。因此,supplyAsync() 方法需要传入一个实现了 Supplier 接口的对象,我们这里采用函数式编程的方式构造一个 Supplier 实现类。
public static void demo1() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("supply");
return "supply";
});
}
supplyAsync() 一共有两个方法,另一个方法是接收一个 Executor 线程池,如果没传入这个线程池,那么默认采用 ForkJoinPool 线程池
public static void demo1() {
Executor executor = Executors.newCachedThreadPool();
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("supply");
return "supply";
}, executor);
}
runAsync
runAsync() 方法的参数是一个 Runnable函数式接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Runnable 接口不接收参数,也没有返回值。因此,runAsync() 方法需要传入一个实现了 Runnable 接口的对象,我们这里采用函数式编程的方式构造一个 Runnable 实现类。
如下:runAsync() 方法也有两个重载方法,一个传入 Runnable 实现类,一个传入Runnable 实现类以及 Executor 线程池。
public static void demo2() {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("run");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Executor executor = Executors.newCachedThreadPool();
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("run");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executor);
}
completedFuture
completedFuture() 方法接收一个 Object 对象。
老实说,感觉这个方法没啥用。
public static void demo3() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.completedFuture(1);
System.out.println(future.get());
}
anyOf
anyOf() 方法接收 N 个 CompletableFuture 任务,他的参数是一个可变长的参数。
代表着接收任意一个对象的返回。
如下,first 和 second 谁先返回就取谁。这里常用的常见是从多方数据源取数据,不管谁先返回结果,我拿着结果就继续干下面的事
public static void demo4() throws ExecutionException, InterruptedException {
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> "first");
CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> "second");
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(first, second);
System.out.println(anyFuture.get());
}
allOf
allOf() 方法接收 N 个 CompletableFuture 任务,他的参数是一个可变长的参数。
该方法代表需要所有任务都执行完毕,我才能接着干下面的事情。这里可以看到, allOf 构造出来的任务 allFuture 是一个 void 返回值的,代表着可以拿这个对象做接下来的事情,这里我们在后面会再介绍到。
public static void demo5() {
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("first");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "first";
});
CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("second");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "second";
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(first, second);
allFuture.join();
}
链式执行
构造完对象后就是到了执行的阶段了。其实上面的例子中,当我们构造一个任务的时候这个任务已经开始执行了,我们拿到的知识他的回调对象,通过这个回调对象我们可以接下来做一些操作。
这里先拿一个简单的列子做下演示:
先查找 id,然后根据 id 查询姓名,再根据姓名查询职位,最后打印,这边可以看到,一共构造了三个任务:查id、查姓名、查职位。通过很简便的 . 语法异步执行完了。
public static void demo6() throws ExecutionException, InterruptedException {
String position = CompletableFuture.supplyAsync(Main::findId)
.thenApplyAsync(Main::findNameById)
.thenApplyAsync(Main::findPositionByName).get();
System.out.println(position);
}
public static Integer findId() {
return 1;
}
public static String findNameById(Integer id) {
return "小王";
}
public static String findPositionByName(String name){
return "CEO";
}
输出:
CEO
可以看到,实例方法有很多,大多数通过名字就可以知道是干什么的,具体用法可以查阅 jdk api。这里调几个简单的来介绍介绍。
- xxx():表示该方法将继续在已有的线程中执行;
- xxxAsync():表示将异步在线程池中执行。
thenApplyAsync
这里可以看到 thenApplyAsync 有两个构造函数一个接收线程池,一个不接受。但是必须需要有一个实现了 Function 函数式接口的类才行,上面的例子采用了函数式编程,Main::findId 这种语法。
exceptionally
接收一个 Function 函数式接口,该接口的入参必须是 Throwable 对象,因此可以打印出堆栈信息。
public static void demo6() throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(Main::findId).exceptionally(e -> {
e.printStackTrace();
return null;
});
}
其他的方法使用差不多,有兴趣的可以查阅 jdk api
总结
最近业务上有个详情页,一个 rpc 调完填充信息,再调另一个再填充信息,最后返回给前端。总感觉有优化空间,然后发现了响应式编程,java 8 的 CompletableFuture 也是响应式编程的一种,还在调研其他的如 RxJava 等,在这里先学习下最简单的 CompletableFuture 。
ps
文章为本人学习过程中的一些个人见解,漏洞是必不可少的,希望各位大佬多多指教,帮忙修复修复漏洞!!!