专栏名称: 芋道源码
纯 Java 源码分享公众号,目前有「Dubbo」「SpringCloud」「Java 并发」「RocketMQ」「Sharding-JDBC」「MyCAT」「Elastic-Job」「SkyWalking」「Spring」等等
目录
相关文章推荐
芋道源码  ·  强类型查询:Java 的全能 ORM 神器 ·  昨天  
芋道源码  ·  线程数突增!领导说再这么写就gc掉我.... ·  昨天  
芋道源码  ·  这五款牛逼的 IDEA ... ·  4 天前  
芋道源码  ·  K 神!国产神级搜索引擎~太强了? ·  5 天前  
Java编程精选  ·  面试官:String s = new ... ·  1 周前  
51好读  ›  专栏  ›  芋道源码

再见ExecutorService,你好StructuredTaskScope

芋道源码  · 公众号  · Java  · 2024-11-02 18:20

正文

👉 这是一个或许对你有用的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料: 

👉这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号、CRM 等等功能:

  • Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn
【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本 

来源:juejin.cn/post/
7313499902452154383


JEP 453,结构化并发(预览)已经从 JDK 21 的 Targeted 状态变更为 Integrated 状态。这个最初的预览特性来源于一个孵化 API,它根据前两轮的孵化纳入了一些改进,这两轮孵化分别是 JDK 19 交付的 JEP 428,结构化并发(孵化)和 JDK 20 交付的 JEP 437,结构化并发(第二轮孵化)。在当前提案中,唯一的显著变化是StructuredTaskScope::fork(...)方法返回一个 Subtask,而不是 Future。这是一个预览特性。

JDK 21 中的结构化并发致力于引入结构化并发的 API 来简化并发编程。这种方法将在不同线程中运行的相关任务组视为一个工作单元,从而简化了错误处理和取消,提高了可靠性,并增强了可观测性。

自从 Java 中虚拟线程出现以来,官方就引入了一个新的、更好的并发模型,即 StructuredTaskScope。我们将在这个新的编程模型中看到一些常用的编程模式。

对于虚拟线程,由于它们非常轻量级,所以不需要将它们池化。此外,我们知道虚拟线程在阻塞任何操作时可以将其运行栈从底层平台线程 保存到堆 上,并且在完成后可以将其运行栈固定到任何可用的平台线程。这是 Java 中的新功能,而且非常棒。

考虑以下代码片段:

public static Weather readFromServerA() throws InterruptedException {
    Thread.sleep(RandomUtils.nextLong(1100));
    return new Weather("Partly Sunny""server-A");
}
public static Weather readFromServerB() throws InterruptedException {
    Thread.sleep(RandomUtils.nextLong(1100));
    return new Weather("Partly Sunny""server-B");
}
public static Weather readFromServerC() throws InterruptedException {
    Thread.sleep(RandomUtils.nextLong(1100));
    return new Weather("Partly Sunny""server-C");
}

上面的代码返回天气信息。它模拟服务器并在 1-100 毫秒内返回信息。我们的需求是查询所有服务器并得到结果。我们只考虑第一个返回结果的请求,并立即取消其他两个请求 (通过中断取消它们)。

让我们在 StructuredTaskScope 对象的帮助下实现这个逻辑。

public static Weather readWeather() throws ExecutionException, InterruptedException, TimeoutException {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess()){

       // need to call fork and pass runnables or callables
       scope.fork(() -> Weather.readWeatherFromServerA());
       scope.fork(() -> Weather.readWeatherFromServerB());
       scope.fork(() -> Weather.readWeatherFromServerC());

       // now we block, blocking is cheap in virtual threas
       // so no issue here
       scope.join();

       Weather weather = scope.result();
       return weather;
}

模式非常简单;我们使用了 StructuredTaskScopeShutdownOnSuccess 变体,它是由 JVM 开箱即用提供的。

我们只需一一提交三个任务,从不同的服务器获取天气即可。方法 fork() 接受可调用对象。

然后我们调用了scope. join(),为什么我们要用阻塞调用?为什么不是非阻塞调用?阻塞调用是廉价的,我们应该鼓励开发者使用阻塞调用。

最后,我们调用scope.result()方法显式获取返回结果。

现在让我们看看哪个请求得到了结果,另外两个请求中哪个被取消了。

public static Weather readWeather() throws InterruptedException, ExecutionException {

        try(var scope = new StructuredTaskScope.ShutdownOnSuccess();){

        scope.fork(() -> Weather.readWeatherFromServerA());
        scope.fork(() -> Weather.readWeatherFromServerB());
        scope.fork(() -> Weather.readWeatherFromServerC());


        Future futureA = scope.fork(Weather::readWeatherFromServerA);
        Future futureB = scope.fork(Weather::readWeatherFromServerB);
        Future futureC = scope.fork(Weather::readWeatherFromServerC);

        scope.join();

        System.out.println("futureA.state() = " + futureA.state());
        System.out.println("futureB.state() = " + futureB.state());
        System.out.println("futureC.state() = " + futureC.state());

        var weather = scope.result();

        return weather;
    }
}

 public static void main(String[] args) throws InterruptedException, ExecutionException {
       var weather = Weather.readWeather();
 }

让我们运行这个程序几次。

WARNING: Using incubator modules: jdk.incubator.concurrent
futureA.state() = FAILED
futureB.state() = FAILED
futureC.state() = SUCCESS

futureA.state() = FAILED
futureB.state() = SUCCESS
futureC.state() = FAILED

我们可以看到,每次运行不同的服务器返回了成功,而另外两个则失败了。这只是为了解释;它是技术代码;我们只需要查询不同的服务器,如下所示。

public static Weather readWeather() throws ExecutionException, InterruptedException, TimeoutException {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess()){

   // need to call fork and pass callables
   scope.fork(() -> Weather.readWeatherFromServerA());
   scope.fork(() -> Weather.readWeatherFromServerB());
   scope.fork(() -> Weather.readWeatherFromServerC());

   // now we block, blocking is cheap in virtual threas
   // so no issue here
   scope.join();

   Weather weather = scope.result();
   return weather;
}

顺便说一句,借助方法引用,上面的代码可以写得更加清晰。

public static Weather readWeather() throws InterruptedException, ExecutionException {
        try(var scope = new StructuredTaskScope.ShutdownOnSuccess();){
        scope.fork(Weather::readWeatherFromServerA);
        scope.fork(Weather::readWeatherFromServerB);
        scope.fork(Weather::readWeatherFromServerC);
        scope.join(); 
        var weather = scope.result();
        return weather;
    }
}

哪它与 ExecutorService 有什么不同?

ExecutorService 生命周期与应用程序的生命周期一起运行。一旦应用程序启动,executor service framework 就会启动,并在应用程序终止后关闭。

但在这里,对于每个请求调用,StructuredTaskScope立即创建,而且都会在我们退出该方法后立即销毁。我们能做到这一点是因为虚拟线程很轻量级——比平台线程便宜大约 1000 倍。

其次,上面的代码可以借助完整的stage API来编写,但是如果使用这些框架,我们需要回到回调地狱和代码的层层嵌套。😉 谁喜欢回调地狱呢?没有人!。

我们将研究更多这样的模式。这是一个全新的并发模型,我们需要确切地了解哪些模式将成为规范,但我认为这就是我们将要使用的模式。

提醒一点:它只能在启用预览功能的 Java 19 及更高版本上运行。

原文地址:https://medium.com/@anil.java.story/structured-task-scope-new-concurrency-model-c5fa38613f47

另外,虽说StructuredTaskScope更好用,更轻量级,但是很多底层框架依旧依赖了ExecutorService而未更上,进行快速升级。这个时候,我们的机会就来了,可以给一些知名框架提 PR 了,简历上又可以吹一番了🤣。


欢迎加入我的知识星球,全面提升技术能力。

👉 加入方式,长按”或“扫描”下方二维码噢

星球的内容包括:项目实战、面试招聘、源码解析、学习路线。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)