JEP 453,结构化并发(预览)已经从 JDK 21 的 Targeted 状态变更为 Integrated 状态。这个最初的预览特性来源于一个孵化 API,它根据前两轮的孵化纳入了一些改进,这两轮孵化分别是 JDK 19 交付的 JEP 428,结构化并发(孵化)和 JDK 20 交付的 JEP 437,结构化并发(第二轮孵化)。在当前提案中,唯一的显著变化是StructuredTaskScope::fork(...)
方法返回一个 Subtask
,而不是 Future。这是一个预览特性。
JDK 21 中的结构化并发致力于引入结构化并发的 API 来简化并发编程。这种方法将在不同线程中运行的相关任务组视为一个工作单元,从而简化了错误处理和取消,提高了可靠性,并增强了可观测性。
自从 Java 中虚拟线程出现以来,官方就引入了一个新的、更好的并发模型,即 StructuredTaskScope
。我们将在这个新的编程模型中看到一些常用的编程模式。
对于虚拟线程,由于它们非常轻量级,所以不需要将它们池化。此外,我们知道虚拟线程在阻塞任何操作时可以将其运行栈从底层平台线程 保存到堆 上,并且在完成后可以将其运行栈固定到任何可用的平台线程。这是 Java 中的新功能,而且非常棒。
考虑以下代码片段:
public static Weather readFromServerA () throws InterruptedException { Thread.sleep(RandomUtils.nextLong(1 , 100 )); return new Weather("Partly Sunny" , "server-A" ); }public static Weather readFromServerB () throws InterruptedException { Thread.sleep(RandomUtils.nextLong(1 , 100 )); return new Weather("Partly Sunny" , "server-B" ); }public static Weather readFromServerC () throws InterruptedException { Thread.sleep(RandomUtils.nextLong(1 , 100 )); return new Weather("Partly Sunny" , "server-C" ); }
上面的代码返回天气信息。它模拟服务器并在 1-100 毫秒内返回信息。我们的需求是查询所有服务器并得到结果。我们只考虑第一个返回结果的请求,并立即取消其他两个请求 (通过中断取消它们)。
让我们在 StructuredTaskScope
对象的帮助下实现这个逻辑。
public static Weather readWeather () throws ExecutionException, InterruptedException, TimeoutException { try (var scope = new StructuredTaskScope.ShutdownOnSuccess()){ // need to call fork and pass runnables or callables scope.fork(() -> Weather.readWeatherFromServerA()); scope.fork(() -> Weather.readWeatherFromServerB()); scope.fork(() -> Weather.readWeatherFromServerC()); // now we block, blocking is cheap in virtual threas // so no issue here scope.join(); Weather weather = scope.result(); return weather; }
模式非常简单;我们使用了 StructuredTaskScope
的 ShutdownOnSuccess
变体,它是由 JVM 开箱即用提供的。
我们只需一一提交三个任务,从不同的服务器获取天气即可。方法 fork() 接受可调用对象。
然后我们调用了scope. join()
,为什么我们要用阻塞调用?为什么不是非阻塞调用?阻塞调用是廉价的,我们应该鼓励开发者使用阻塞调用。
最后,我们调用scope.result()
方法显式获取返回结果。
现在让我们看看哪个请求得到了结果,另外两个请求中哪个被取消了。
public static Weather readWeather () throws InterruptedException, ExecutionException { try (var scope = new StructuredTaskScope.ShutdownOnSuccess();){ scope.fork(() -> Weather.readWeatherFromServerA()); scope.fork(() -> Weather.readWeatherFromServerB()); scope.fork(() -> Weather.readWeatherFromServerC()); Future futureA = scope.fork(Weather::readWeatherFromServerA); Future futureB = scope.fork(Weather::readWeatherFromServerB); Future futureC = scope.fork(Weather::readWeatherFromServerC); scope.join(); System.out.println("futureA.state() = " + futureA.state()); System.out.println("futureB.state() = " + futureB.state()); System.out.println("futureC.state() = " + futureC.state()); var weather = scope.result(); return weather; } } public static void main (String[] args) throws InterruptedException, ExecutionException { var weather = Weather.readWeather(); }
让我们运行这个程序几次。
WARNING: Using incubator modules: jdk.incubator.concurrent futureA.state() = FAILED futureB.state() = FAILED futureC.state() = SUCCESS futureA.state() = FAILED futureB.state() = SUCCESS futureC.state() = FAILED
我们可以看到,每次运行不同的服务器返回了成功,而另外两个则失败了。这只是为了解释;它是技术代码;我们只需要查询不同的服务器,如下所示。
public static Weather readWeather () throws ExecutionException, InterruptedException, TimeoutException { try (var scope = new StructuredTaskScope.ShutdownOnSuccess()){ // need to call fork and pass callables scope.fork(() -> Weather.readWeatherFromServerA()); scope.fork(() -> Weather.readWeatherFromServerB()); scope.fork(() -> Weather.readWeatherFromServerC()); // now we block, blocking is cheap in virtual threas // so no issue here scope.join(); Weather weather = scope.result(); return weather; }
顺便说一句,借助方法引用,上面的代码可以写得更加清晰。
public static Weather readWeather () throws InterruptedException, ExecutionException { try (var scope = new StructuredTaskScope.ShutdownOnSuccess();){ scope.fork(Weather::readWeatherFromServerA); scope.fork(Weather::readWeatherFromServerB); scope.fork(Weather::readWeatherFromServerC); scope.join(); var weather = scope.result(); return weather; } }
哪它与 ExecutorService
有什么不同?
ExecutorService
生命周期与应用程序的生命周期一起运行。一旦应用程序启动,executor service framework 就会启动,并在应用程序终止后关闭。
但在这里,对于每个请求调用,StructuredTaskScope
立即创建,而且都会在我们退出该方法后立即销毁。我们能做到这一点是因为虚拟线程很轻量级——比平台线程便宜大约 1000 倍。
其次,上面的代码可以借助完整的stage API来编写,但是如果使用这些框架,我们需要回到回调地狱和代码的层层嵌套。😉 谁喜欢回调地狱呢?没有人!。
我们将研究更多这样的模式。这是一个全新的并发模型,我们需要确切地了解哪些模式将成为规范,但我认为这就是我们将要使用的模式。
提醒一点:它只能在启用预览功能的 Java 19 及更高版本上运行。
原文地址:https://medium.com/@anil.java.story/structured-task-scope-new-concurrency-model-c5fa38613f47
。
另外,虽说StructuredTaskScope
更好用,更轻量级,但是很多底层框架依旧依赖了ExecutorService
而未更上,进行快速升级。这个时候,我们的机会就来了,可以给一些知名框架提 PR 了,简历上又可以吹一番了🤣。