大多数程序员在平时工作中,都是增删改查。这里我跟大家讲解如何利用CompletableFuture优化项目代码,使项目性能更佳!
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
-
项目地址:https://github.com/YunaiV/ruoyi-vue-pro
-
视频教程:https://doc.iocoder.cn/video/
举个例子:用户登录成功,需要返回前端用户角色,菜单权限,个人信息,用户余额,积分情况等。正常逻辑是依次查询不同表,得到对应的数据封装返回给前端,代码如下:
@Test
public void login(Long userId){
log.info("开始查询用户全部信息---串行!");
// 查询用户角色信息
getUserRole(userId);
// 查询用户菜单信息
getUserMenu(userId);
// 查询用户余额信息
getUserAmount(userId);
// 查询用户积分信息
getUserIntegral(userId);
log.info("封装用户信息返回给前端!");
}
假如查询用户角色,用户菜单,用户余额,用户积分分别耗时500,200,200,100毫秒,则登录接口耗时为1秒。如果采用异步(多线程并行)形式,则登录接口耗时以单个查询最慢的任务为主,为查询用户角色信息500毫秒。相当于登录接口性能提升一倍!查询任务越多,则其性能提升越大!
代码演示(串行):
@Test
public void login() throws InterruptedException {
long startTime = System.currentTimeMillis();
log.info("开始查询用户全部信息!");
log.info("开始查询用户角色信息!");
Thread.sleep(500);
String role = "管理员";
log.info("开始查询用户菜单信息!");
Thread.sleep(200);
String menu = "首页,账户管理,积分管理";
log.info("开始查询查询用户余额信息!");
Thread.sleep(200);
Integer amount = 1999;
log.info("开始查询查询查询用户积分信息!");
Thread.sleep(100);
Integer integral = 1015;
log.info("封装用户信息返回给前端!");
log.info("查询用户全部信息总耗时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}
结果:
代码演示(异步):
@Test
public void asyncLogin() {
long startTime = System.currentTimeMillis();
log.info("开始查询用户角色信息!");
CompletableFuture
结果:
直观的可以看出,异步执行的优势!
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
-
项目地址:https://github.com/YunaiV/yudao-cloud
-
视频教程:https://doc.iocoder.cn/video/
Future是什么?
-
Java 1.5中引入Callable解决多线程执行无返回值的问题。
-
Future是为了配合Callable/Runnable而产生的。简单来讲,我们可以通过future来对任务查询、取消、执行结果的获取,是调用方与异步执行方之间沟通的桥梁。
-
FutureTask实现了RunnableFuture接口,同时具有Runnable、Future的能力,即既可以作为Future得到Callable的返回值,又可以作为一个Runnable。
-
CompletableFuture实现了Futrue接口。
-
Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们可以将这个任务通过Future放到异步线程中去执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果。
-
Future可以在连续流程中满足数据驱动的并发需求,既获得了并发执行的性能提升,又不失连续流程的简洁优雅。
代码演示(不使用自定义线程池):
@Test
public void callable() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
Callable amountCall = new Callable() {
@Override
public Object call() throws Exception {
long startTime = System.currentTimeMillis();
Thread.sleep(6000);
Map amountMap = new
HashMap();
amountMap.put("amount", 99);
long endTime = System.currentTimeMillis();
log.info("查询金额信息耗时:" + (endTime - startTime) / 1000 + "秒");
return amountMap;
}
};
FutureTask amountFuture = new FutureTask<>(amountCall);
new Thread(amountFuture).start();
Callable roleCall = new Callable() {
@Override
public Object call() throws Exception {
long startTime = System.currentTimeMillis();
Thread.sleep(5000);
Map roleMap = new HashMap();
roleMap.put("name", "管理员");
long endTime = System.currentTimeMillis();
log.info("查询角色信息耗时:" + (endTime - startTime) / 1000 + "秒");
return roleMap;
}
};
FutureTask roleFuture = new FutureTask<>(roleCall);
new Thread(roleFuture).start();
log.info("金额查询结果为:" + amountFuture.get());
log.info("角色查询结果为:" + roleFuture.get());
long endTime = System.currentTimeMillis();
log.info("总耗时:" + (endTime - startTime) / 1000 + "秒");
}
“
这里要注意:Future对于结果的获取,不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。
Future.get()
就是阻塞调用,在线程获取结果之前get方法会一直阻塞;Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。
这里的
amountFuture.get()
如果放到如下图所示的位置,则amountFuture下面的线程将等
amountFuture.get()
完成后才能执行,没有执行完,则一直阻塞。
结果:
代码演示(使用自定义线程池):
@Test
public void executor() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(2);
Callable amountCall = new Callable() {
@Override
public Object call() throws Exception {
long startTime = System.currentTimeMillis();
Thread.sleep(6000);
Map amountMap = new HashMap();
amountMap.put("amount", 99);
long endTime = System.currentTimeMillis();
log.info("查询金额信息耗时:" + (endTime - startTime) / 1000 + "秒");
return amountMap;
}
};
Callable roleCall = new Callable() {
@Override
public Object call() throws Exception {
long startTime = System.currentTimeMillis();
Thread.sleep(5000);
Map roleMap = new HashMap();
roleMap.put("name", "管理员");
long endTime = System.currentTimeMillis();
log.info("查询用户角色信息耗时:" + (endTime - startTime) / 1000 + "秒");
return roleMap;
}
};
Future amountFuture = executor.submit(amountCall);
Future roleFuture = executor.submit(roleCall);
log.info("金额查询结果为:" + amountFuture.get());
log.info("角色查询结果为:" + roleFuture.get());
long endTime = System.currentTimeMillis();
log.info("总耗时:" + (endTime - startTime) / 1000 + "秒");
}
结果:
CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法:
-
supplyAsync执行CompletableFuture任务,支持返回值。
-
runAsync执行CompletableFuture任务,没有返回值。
supplyAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static CompletableFuture supplyAsync(Supplier supplier)
//自定义线程,根据supplier构建执行任务
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
runAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture runAsync(Runnable runnable)
//自定义线程,根据runnable构建执行任务
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
代码演示:
@Test
// supplyAsync执行CompletableFuture任务,支持返回值
public void defaultSupplyAsync() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
// 构建执行任务
CompletableFuture> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map amountMap = new HashMap();
amountMap.put("amount", 99);
long endTime = System.currentTimeMillis();
log.info("查询金额信息耗时:" + (endTime - startTime) / 1000 + "秒");
return amountMap;
});
// 这行代码在这里 则会进行6秒的阻塞 下面代码其他线程无法创建
// 只能等这个线程6秒过后结束才能创建其他线程
// Map userMap = userCompletableFuture.get();
CompletableFuture> roleCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map roleMap = new HashMap();
roleMap.put("name", "管理员");
return roleMap;
});
log.info("金额查询结果为:" + amountCompletableFuture.join());
log.info("角色查询结果为:" + roleCompletableFuture.join());
log.info("总耗时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}
@Test
// supplyAsync执行CompletableFuture任务,支持返回值
public void customSupplyAsync() throws ExecutionException, InterruptedException {
// 自定义线程池
ExecutorService executorService = Executors.newCachedThreadPool();
long startTime = System.currentTimeMillis();
CompletableFuture> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map amountMap = new HashMap();
amountMap.put("amount", 99);
long endTime = System.currentTimeMillis();
log.info("查询金额信息耗时:" + (endTime - startTime) / 1000 + "秒");
return amountMap;
}, executorService);
// 这行代码在这里 则会进行6秒的阻塞 下面代码其他线程无法创建
// 只能等这个线程6秒过后结束才能创建其他线程
// Map userMap = userCompletableFuture.get();
CompletableFuture> roleCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map roleMap = new HashMap();
roleMap.put("name", "管理员");
return roleMap;
}, executorService);
log.info("金额查询结果为:" + amountCompletableFuture.join());
log.info("角色查询结果为:" + roleCompletableFuture.join());
// 线程池需要关闭
executorService.shutdown();
log.info("总耗时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}
@Test
// runAsync执行CompletableFuture任务,没有返回值
public void defaultRunAsync() {
long lordStartTime = System.currentTimeMillis();
CompletableFuture amountCompletableFuture = CompletableFuture.runAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行金额增删改操作用时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
});
CompletableFuture roleCompletableFuture = CompletableFuture.runAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行角色增删改操作用时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
});
log.info("金额查询结果为:" + amountCompletableFuture.join());
log.info("角色查询结果为:" + roleCompletableFuture.join());
log.info("总耗时:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
@Test
// runAsync执行CompletableFuture任务,没有返回值
public void customRunAsync() {
long lordStartTime = System.currentTimeMillis();
ExecutorService executor = Executors.newCachedThreadPool();
CompletableFuture amountCompletableFuture = CompletableFuture.runAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行金额增删改操作用时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}, executor);
CompletableFuture roleCompletableFuture = CompletableFuture.runAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行角色增删改操作用时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}, executor);
log.info("金额查询结果为:" + amountCompletableFuture.join());
log.info("角色查询结果为:" + roleCompletableFuture.join());
// 关闭线程池
executor.shutdown();
log.info("总耗时:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
注意:这里的get()与join()都是获取任务线程的返回值。join()方法抛出的是uncheck异常(即RuntimeException),不会强制开发者抛出, 会将异常包装成
CompletionException
异常 /
CancellationException
异常,但是本质原因还是代码内存在的真正的异常;
get()方法抛出的是经过检查的异常,
ExecutionException
,
InterruptedException
需要用户手动处理(抛出或者 try catch)。
thenRun / thenRunAsync
CompletableFuture的thenRun方法,通俗点讲就是,做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值。
public CompletableFuture thenRun(Runnable action);
public CompletableFuture thenRunAsync(Runnable action);
thenRun / thenRunAsync的区别? 源码解释:
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
public CompletableFuture thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
如果你执行第一个任务的时候,传入了一个自定义线程池:
-
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
-
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个哈!
代码演示:
@Test
// 执行第一个任务后 可以继续执行第二个任务 两个任务之间无传参 无返回值
public void defaultThenRun() throws ExecutionException, InterruptedException {
long lordStartTime = System.currentTimeMillis();
CompletableFuture amountCompletableFuture = CompletableFuture.runAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行金额增删改操作用时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
});
CompletableFuture thenCompletableFuture = amountCompletableFuture.thenRun(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行角色增删改操作用时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
});
thenCompletableFuture.get();
log.info("总耗时:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
结果:
thenAccept / thenAcceptAsync
CompletableFuture的thenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
public CompletableFuture thenAccept(Consumer super T> action);
public CompletableFuture thenAcceptAsync(Consumer super T> action);
代码演示:
@Test
// 执行第一个任务后 可以继续执行第二个任务 并携带第一个任务的返回值 第二个任务执行完没有返回值
public void defaultThenAccept() throws ExecutionException, InterruptedException {
long lordStartTime = System.currentTimeMillis();
CompletableFuture amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map amountMap = new HashMap();
amountMap.put("amount", 90);
log.info("执行金额查询操作用时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
return amountMap;
});
CompletableFuture thenCompletableFuture = amountCompletableFuture.thenAccept((map) -> {
long startTime = System.currentTimeMillis();
if (Integer.parseInt(map.get("amount").toString()) > 90) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("金额充足,可以购买!:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
} else {
log.info("金额不足,无法购买!:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
}
});
thenCompletableFuture.get();
log.info("总耗时:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
结果:
thenApply / thenApplyAsync
CompletableFuture的thenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
public CompletableFuture thenApplyAsync();
public CompletableFuture thenAccept(Consumer super T> action);
代码演示:
@Test
// 执行第一个任务后 可以继续执行第二个任务 并携带第一个任务的返回值 第二个任务执行完有返回值
public void defaultThenApply() throws ExecutionException, InterruptedException {
long lordStartTime = System.currentTimeMillis();
CompletableFuture amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map amountMap = new HashMap();
amountMap.put("amount", 90);
log.info("执行金额查询操作用时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
return amountMap;
});
CompletableFuture thenCompletableFuture = amountCompletableFuture.thenApply((map) -> {
int number = 0;
if (Integer.parseInt(map.get("amount").toString()) > 3) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 可口可乐3元一瓶 看金额一共能购买多少瓶
number = Integer.parseInt(map.get("amount").toString()) / 3;
}
return number;
});
log.info("当前金额一共可以买" + thenCompletableFuture.get() + "瓶可口可乐!");
Integer integer = thenCompletableFuture.get();
log.info("总耗时:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
结果:
exceptionally
CompletableFuture的exceptionally方法表示,某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法。
public CompletableFuture exceptionally(
Function fn) {
return uniExceptionallyStage(fn);
}
代码演示:
@Test
// 某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法。
public void exceptionally() throws ExecutionException, InterruptedException {
long lordStartTime = System.currentTimeMillis();
CompletableFuture amountCompletableFuture = CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map amountMap = new HashMap();
amountMap.put("amount", 90);
log.info("执行金额查询操作用时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
return amountMap;
});
CompletableFuture thenCompletableFuture = amountCompletableFuture.thenApply((map) -> {
int number = 0;
if (Integer.parseInt(map.get("amount").toString()) > 3) {
try {
Thread.sleep(1000);
// 可口可乐3元一瓶 看金额一共能购买多少瓶
number = Integer.parseInt(map.get("amount").toString()) / 0;
} catch (ArithmeticException | InterruptedException e) {
e.printStackTrace();
throw new ArithmeticException(); // 这里一定要将异常抛除了,不然exceptionally无效
}
}
return number;
});
CompletableFuture exceptionFuture = thenCompletableFuture.exceptionally((e) -> {
log.error("除数为0,则默认商为0!");
return 0;
});
log.info("当前金额一共可以买" + thenCompletableFuture.get() + "瓶可口可乐!");
exceptionFuture.get();
log.info("总耗时:" + (System.currentTimeMillis() - lordStartTime) / 1000 + "秒");
}
“
注意:这里的异常一定要抛出来,不然exceptionally无效!
whenComplete
CompletableFuture的whenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFuture的result是上个任务的结果。
public CompletableFuture whenComplete(
BiConsumer super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
代码演示:
@Test
// 某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFuture的result是上个任务的结果。
public void whenComplete() {
CompletableFuture stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
return "周杰伦";
});
CompletableFuture stringCompletableFuture1 = stringCompletableFuture.whenComplete((a, throwable) -> {
log.info("周杰伦喜欢唱");
});
log.info("输出结果为第一个任务:" + stringCompletableFuture1.join());
}
结果:
handle
CompletableFuture的handle方法表示,某个QQ账号买号平台地图任务执行完成后,执行回调方法,并且是有返回值的;并且handle方法返回的CompletableFuture的result是回调方法执行的结果。
public CompletableFuture handle(
BiFunction super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
代码演示:
@Test
// 某个任务执行完成后,执行的回调方法,有返回值;并且handle方法返回的CompletableFuture的result是第二个任务的结果。
public void handle() {
CompletableFuture stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
return "周杰伦";
});
CompletableFuture stringCompletableFuture1 = stringCompletableFuture.handle((a, throwable) -> {
return "周杰伦喜欢唱歌!";
});
log.info("输出结果为第二个任务:" + stringCompletableFuture1.join());
}
结果:
AND组合关系
thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务。
-
thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值。