Gunnar Morling 是一位软件工程师和开源爱好者,目前在 Decodable 从事基于 Apache Flink 的流处理工作。之前他在 Redhat 领导了 Debezium 项目。他是 Java Champion,创立了多个开源项目,如 JfrUnit、kcctl 和 MapStruct。Gunnar 曾在 QCon、Java One 和 Devoxx 等各种会议上发表过演讲。本文中,Gunnar Morling 讨论了应对一个包含十亿行数据的文件时,速度最快的解决方案所采用的一些技巧,这些技巧通过并行化和高效的内存访问在不到两秒的时间内就能处理 13 GB 的输入文件。
我想谈谈我之前参加的一个病毒式编程挑战,名为“十亿行挑战”(1 Billion Row Challenge,1BRC)。
我在一家名为 Decodable 的公司担任软件工程师。我们基于 Apache Flink 构建了一个用于流处理的托管平台。这件事对我来说完全是副业,但 Decodable 支持我这样做。
之所以举办这项挑战是因为我想学点新东西。每六个月就会有新的 Java 版本出现,带有新的 API 和新功能。要跟踪所有这些开发成果并非易事。我想知道这些新 Java 版本中有哪些新东西,我能用它们做什么?同时我也想为社区提供一个渠道让大家都能学到新东西。在这个挑战中,你可以从其他人的实现里汲取灵感。另外我还想要纠正一个偏见,那就是很多人认为 Java 很慢。但如果你看看现代版本及其特性,你会发现这完全是错误的。
我们先来深入研究一下挑战的细节。
我们的想法是,有一个包含温度测量值的文件,本质上就像一个 CSV,但它不是以逗号,而是以分号作为分隔符。内容有两列,还有一个车站名称,如汉堡或慕尼黑等。与之关联的温度测量值是随机值。
挑战任务是处理该文件,汇总文件中的值,并为每个站点确定最小值、最大值和平均温度值,很简单。唯一的警告是,正如挑战的名称所示,它有 10 亿行,生成文件的大小约为 13 GB,相当大。然后你必须打印出结果,如上图。
关于规则再多说一点。首先,这主要针对 Java。为什么?因为这是我最了解的平台,我想支持它,想传播关于它的知识。然后,你可以选择任何版本,新版本、预览版本、所有类型的发行版,如 GraalVM,或各种各样的 JDK 提供程序都可以。你可以使用一个名为 SDKMAN 的工具进行管理。这是一个非常好的工具,可以管理很多 Java 版本,并在不同版本之间来回切换。
仅限 Java,没有依赖项。引入一些库为你完成任务没什么意义,你应该自己编程。各次运行之间没有缓存。我对每个实现都运行了五次。然后我丢弃了最快和最慢的那个,并从剩下的三次运行结果中取平均值。可以缓存的话,你只需执行一次任务,将结果保存在文件中,然后将其读回,这样就会非常快。这没有多大意义。你也可以从他人那里获得灵感。
关于我的运行环境:
我的公司花了 100 欧元购买了这台机器,有 32 个内核,其中我主要只使用 8 个内核。有相当多的 RAM。实际上文件总是从 RAM 盘读取。我想确保磁盘 I/O 不至于影响性能,因为它的可预测性要差得多。这里只有一个纯粹的 CPU 绑定问题。
上面是我的基本实现。我使用这个 Java Streams API、这个 files.lines 方法,它为我提供了一个包含文件行的流。我从磁盘读取该文件,然后使用 split 方法将每行映射到那里。我想将站名与值分开。然后我将结果(行)收集到这个分组收集器中。我按站名对其分组。
然后,对于我的每个站,我需要聚合这些值,在我的聚合器实现中处理。每当新值添加到现有聚合器对象时,我都会跟踪最小值和最大值。为了计算平均值,我会跟踪值的总和和计数。非常简单。然后,如果我并行运行这个操作,我需要合并两个聚合器,同样非常简单。最后,如果我完成了,需要减少处理结果,然后通过对象发出这样的结果,其中包含最小值和最大值。对于平均值,我只需将总数除以计数,然后将其打印出来。在这台机器上这大约需要五分钟,不算超级快,但也不是很糟糕。编写这段代码花了我半个小时不到,还不错。如果你在工作中解决这个问题,到这里你可能会收工回家,喝杯咖啡,就完事了。当然,为了这次挑战的目的,我们希望速度更快,看看我们能在这里取得多大的进展。
这项挑战重要的是必须有人来参与。一位来自荷兰的 Java 冠军 Roy Van Rijn 立刻对此产生了兴趣,在我发布帖子后大约一小时,他创建了自己的第一个实现,不是很花哨或很复杂。只要第一个人出现,其他人也会来参与。
我们深入了解一下如何加快这个程序的速度。人们花了整个一月份的时间来研究这个问题,他们探索到了一个非常深的层次,基本上是计算 CPU 指令。
首先我们谈谈并行化,因为我们有很多 CPU 核心。在我用来评估它的服务器上有 32 个核心,64 个线程。我们想利用这一点,只使用一个核心会有点浪费。我们该怎么做呢?回到我简单的基线实现,我能做的第一件事就是添加这个并行调用,也就是 Java Streams API 的这一部分。
现在它将并行处理这个管道,或者说这个流管道的一部分。只需添加这个单一方法调用,就可以将时间缩短到 71 秒,非常轻松的胜利。
如果你仔细想想,是的,它让我们的速度加快了不少,但并没有达到八倍的水平。可我们有 8 个 CPU 核心,为什么它没有八倍的速度?因为这个并行运算符适用于处理逻辑。所有这些聚合和分组逻辑都是并行发生的,但从内存中读取文件仍然是按顺序发生的。读取部分是按顺序进行,其他 CPU 核心依旧处于空闲状态,所以我们也想将其并行化。
新的 Java 版本都带有新的 API、JEP、Java 增强提案。其中之一是最近添加的外部函数和内存 API。
本质上它是一个 Java API,允许你使用原生方法。它比旧的 JNI API 更易用,还允许你使用本机内存。你可以管理自己的内存部分(如堆外内存),而不是由 JVM 管理的堆,并且你将负责维护它、释放它,等等。我们想在这里使用它,因为我们可以内存映射这个文件,然后在那里并行处理它。
首先我们确定并行度。我们的例子中是八个核心,这就是我们的并行度。接下来我们要对这个文件进行内存映射。在早期的 Java 版本中,你也可以使用内存映射文件,但你有大小之类的限制,你无法一次对整个 13 GB 的文件进行内存映射。而现在有了新的外部内存 API,我们就可以做到这一点。你映射文件。我们有这个 Arena 对象。这本质上是我们对这个内存的表示。有不同类型的 Arena,这里我只是使用这个全局 Arena,它可以从我的应用程序中的任何位置访问。现在我可以使用多个线程并行访问整个内存部分。
为了做到这一点,我们需要分割文件和内存表示。首先,我们将其大致分成八个相等的块。我们将整个大小除以八。当然,很有可能我们会分割到某一行的中间,而理想情况下,我们希望我们的工作进程能够处理整行。这里发生的事情是,我们转到了文件的大约八分之一,然后继续转到下一个行结束符。那么这里就是这个块的结尾,也是下一个块的起点。然后我们处理这些块,我们启动线程,然后将它们连接起来。现在我们真正在整个周期内都利用了所有 8 个内核,进行 I/O 时也一样。
有一个警告。从本质上讲,其中一个 CPU 核心总是最慢的。在某个时候,其他七个核心都会等待最后一个核心完成,因为数据的分布有点不均匀。人们最终的做法是不再使用 8 个块,而是将这个文件分成更小的块。本质上,他们积压了这些块。每当其中一个工作线程处理完当前块时,它就会去处理下一个块。通过这种方式,你可以确保所有 8 个线程始终得到平等利用。事实证明,理想的块大小是 2 兆字节。为什么是 2 兆字节?我使用的这台机器上的这个 CPU 有 16 兆字节的二级缓存,8 个线程每次处理 2 兆字节。这个数值在预测 I/O 等方面是最好的。这也表明,我们确实深入到了具体的 CPU 和架构的层面来真正优化该问题。
我们更深入地分析一下。我们已经了解了如何利用多个 CPU 核心,但具体处理每一行时究竟发生了什么?我们仔细看看。
我们想摆脱最初的使用正则表达式等分割文件的做法,那样效率并不高。我能想到的办法是,只需逐个字符地处理这些输入行即可。
这里差不多是一个状态机。我们读取字符,继续读取行,直到没有字符。然后我们使用将站点名称与温度值分隔开的分号字符来切换这些状态。根据我们所处的状态,我们是读取组成站点名称的字节,还是读取组成测量值的字节?我们需要将它们添加到某个构建器或缓冲区中,以聚合这些值。
然后,如果我们在一行的末尾,也就是说我们找到了行结束符,那么我们需要使用建立的这两个缓冲区来记录站点和测量值。对于测量值,我们需要了解如何将其转换为整数值,这也是人们想出的办法。这个问题被描述为双精度或浮点运算,因此值是 21.7 度,但同样,我总是只遇到一个小数位。人们意识到,这个数据实际上总是只有一个小数位。我们可以利用这一点,只需将数字乘以 100 即可将其视为整数问题,作为计算方法。然后在最后,将其除以 100 或 10。这是人们经常做的事情,我低估了他们会在多大程度上利用该数据集的特定特性。
所以我们处理或使用这些值。如果我们看到减号就取反这个值。如果我们看到两位数字中的第一个,就把它乘以 100 或 10。这样做,我们可以把时间缩短到 20 秒,已经比最初的基线实现快了一个数量级。
到目前为止没有什么真正神奇的事情。你也应该得到一个启示,继续做这样的事情有多大意义?如果这是你在日常工作中面临的问题,也许就此打住吧。它可读性好,可维护性好。它比原生基线实现快了一个数量级,所以这相当不错。
当然,为了应对这一挑战,我们可能需要走得更远。我们还能做什么?我们可以再次回到并行的概念,尝试一次处理多个值,现在我们有了不同的并行方法。我们已经看到了如何充分利用所有 CPU 核心。这是并行度的一方面。我们还可以考虑扩展到多个计算节点,这通常是我们对大规模数据存储所做的事情。对于这个问题它并不那么重要,我们必须拆分该文件并将其分发到网络中。也许不是那么理想,但那将是另一个极端。而我们也可以朝另一个方向发展,在特定的 CPU 指令内作并行化。这就是 SIMD(单指令多数据)所做的事情。
基本上所有这些 CPU 都有扩展指令,允许你一次将相同类型的操作应用于多个值。例如,在这里我们想找到行尾字符。现在我们不再逐字节对比,而是可以使用这样的 SIMD 指令将其应用于 8 个或 16 个甚至更多字节,当然这会大大加快速度。问题是,在 Java 中,你没有很好的方法来利用这些 SIMD 指令,因为它是一种可移植的抽象语言,它不允许你降低到这种级别的 CPU 底层上。
好消息是我们可以用上面这个向量 API,它仍在孵化中,在第八个孵化版本左右。这个 API 现在允许你在扩展中使用这些向量化指令。你可以使用这个相等运算符进行类似这样的比较调用,然后它将被转换为底层架构的正确 SIMD 指令,转换为 Intel 或 AMD64 扩展。对于 Arm,它也会这样做。如果你的机器没有任何向量扩展,它将回退到标量执行。这是指令级别的并行化。我对此作了另一次演讲(
https://speakerdeck.com/gunnarmorling/to-the-moon-and-beyond-with-java-17-apis
),其中向你展示了如何使用 SIMD 解决 FizzBuzz。
这种模式可以一次将相同的操作应用于多个值,此外我们还可以执行所谓的 SWAR,即寄存器内的 SIMD。
这里的想法是,做同样的事情,比如一次处理多个值的相等操作,我们也可以在单个变量中执行此操作。如果你有 8 个字节,我们也可以看到一个 long,那是 64 位,那也是 8 个字节。我们可以将正确级别的位级魔法应用于 long 值,然后将此操作应用于所有 8 个字节。这里会有位级屏蔽和移位等等事情。Richard Startin 有一篇非常好的博客文章,一步一步地向你展示了如何做到这一点,或者如何使用它来查找字符串中的第一个零字节。
我把数学公式放在右边,你会看到,这实际上给了你一个长整型的第一个零字节。这就是寄存器内的 SIMD,SWAR。现在有趣的是,如果你看一下这段代码,会发现这里缺少了一些东西。有人意识到我们这里缺了什么吗?这段代码中没有 if、没有条件、没有分支。这实际上非常重要,因为我们需要记住 CPU 实际上是如何工作的。如果你看看 CPU 如何获取和执行我们的代码,会发现它总是采用这种流水线方法。每条指令都有这个阶段,它将从内存中获取,解码,执行,最后写回结果。现在这些事情中的多个操作是并行发生的。当我们解码一条指令时,CPU 已经去获取下一条指令了。这是一种流水线并行化方法。
当然,CPU 实际上需要知道下一条指令是什么,否则我们就不知道要获取什么。为了让它知道,我们实际上不能有任何 if,因为那样我们就不知道要往哪个方向走。如果你有一种用这种无分支方式表达这个问题的方法(就像我们之前看到的那样),那么这对 CPU 中分支预测器来说非常有益,这样它就总能知道下一条指令是什么。我们从来没有遇到过一种情况,就是实际上需要刷新这个管道,只因为我们在这个预测执行中走了一条错误的路径。
人们经常使用的资源之一是这本书《黑客的喜悦》。如果你对此感兴趣,我建议每个人都去买这本书。比如这个问题,比如在字符串中找到第一个零字节,所有这些算法、例程都在这本书中描述过了。如果这件事让你兴奋,一定要看看这本书买下来。