导读
大数据时代,数据量激增与业务场景复杂化,使传统大数据处理框架面临性能与资源管理挑战,Spark 在进一步优化及适配云环境上有需求,Cloud Native 理念兴起带来新契机。在此背景下,阿里公司经过多年不断努力沉淀,推出了 EMR Serverless Spark 实践产品。
1.
背景介绍
2.
Spark Native: Native 语言重写 SQL 引擎
3.
Cloud Native: 存算分离下的极限弹性
4.
EMR Serverless Spark 的实践
分享嘉宾|
周克勇
阿里云
引擎研发
编辑整理|
苗文亚
内容校对|李瑶
出品社区|
DataFun
背景介绍
首先介绍一下
Spark 和 Cloud Native。
Databricks 最早提出 Lakehouse[1]架构,结合了数据湖架构的开放性和数据新鲜度,以及数仓架构的数据管理和高性能引擎,目前已成为行业共识。Lakehouse 架构分四层,最底层是分布式存储,如云厂商的对象存储、传统的 hadoop 集群,用来存储结构化、半(非)结构化数据。第二层是湖格式层,目的是管理湖上数据,提高数据新鲜度,并提供一定的事务能力。(半)结构化数据有成熟的湖格式方案,如 Paimon,Iceberg,Delta,Hudi,非结构化尚无成熟方案。此外 Databricks 在这层提供了缓存、索引等加速能力。第三层是元数据管理层,向下管理多种湖格式及多模态数据,向上为引擎提供统一的元数据接口,同时提供权限管理、数据共享等能力,如 Databricks 开源的 UnityCatalog[2],Snowflake 开源的 Apache Polaris[3], 以及 Apache Gravitino[4]。最上是引擎层,Lakehouse 架构的开放和公平性决定了对引擎无偏好和限制,各个引擎都在积极对接 Lakehouse 架构,作为 Databricks 的底层引擎,Spark 是最为重要的一款。
为了让 Spark 在湖上具备数仓的处理性能,Databricks 用 C++和向量化技术重写了 Spark SQL 引擎:Photon[5],相比 Apache Spark 有 2-5 倍性能提升。由于 Photon 未开源,社区近几年也在积极探索替代方案。
Snowflake 作为第一个大型云原生数仓系统,给业界最大的启发是云上存算分离架构的先进性[6],Dremel 也很早就在使用存算分离架构[7]。存算分离解藕了存储和计算,一方面让计算节点和存储节点可以根据不同的工作负载特性选择合适的机型,另一方面让计算节点能更好的做弹性。
Spark Native: Native
语言重写 SQL 引擎
1.
趋势:从 JVM 到 Native 语言
用 Native 语言(C++, Rust 等)来(重新)实现 SQL 引擎已经是个趋势。Photon 之后,Apache Spark 社区生态进行了多年的探索,目前形成了 3 种主流方案: Apache Gluten, Apache Comet, Blaze,都是用向量化 SQL 库替换 Spark 基于 JVM 的 SQL Engine。Presto 社区当前最重要的事情也是全面 C++化,一方面通过 Velox 替换现有的 SQL 引擎,另一方面用 C++重写 BE。流计算也在积极 Native 化,例如阿里云全托管 Flink 产品推出了 Flash 加速引擎,近几年新出现的 RisingWave 流计算引擎则是用 Rust 开发。此外,当前火热的各种 OLAP 引擎,如 Clickhouse, Starrocks, Doris, Duckdb, Databend, Polars, Datafusion 等都是用 C++或 Rust 开发。
仅仅改变语言的收益有限,Native 语言本身不是目的,而是为了通过 Native 语言对硬件(CPU, 内存,intrics 指令等)更精细的控制,来应用更先进的 SQL 引擎设计范式。向量化和 Codegen 是过去十年最先进的两大技术方向,代表分别是 MonetDB/X100[8]和Hyper[9]。在了解这两个技术之前先介绍下经典的 SQL 执行模型:迭代器模型。在这个模型中,SQL 执行树的每个算子都是个迭代器,实现了 next 方法,算子的每次 next 调用会基于自身逻辑及当前状态,决定是否递归调用子节点的 next 方法获取输入数据,完成必要的计算之后返回一条数据。
迭代器模型存在如下几个问题。一是解释执行 overhead 极大,90% 以上的指令浪费在解释执行中,真正有效的比例只有 10%。二是每条 record 需要经过完整而复杂的表达式、算子流水线计算,CPU 难以作出正确的分支预测。三是难以运用 SIMD(Single Instruction, Multiple Data)的指令加速。
而向量化和 Codegen 能很好的解决这些问题,虽然采取的是截然不同的方法。
向量化模型依然沿用了迭代器模型的解释执行框架,但每次 next 返回的是一个 Columnar Batch,即由多个列式存储 Columnar Vector 组成的 Row Vector,通常包含数千条逻辑 Row。所有的表达式和算子都基于 Row Vector 进行计算,这也是“向量化”名字的由来。由于每次 next 处理和返回的是数千条数据,因此解释执行的开销被大大稀释。此外,基于 Vector 的计算 kernel 拥有更好的缓存命中率,更高的分支预测准确度,并更容易利用 SIMD 来加速。
Codegen 技术的核心是用编译执行替换解释执行,从而完全消除解释执行的开销。在 SQL 执行之前,根据计划树和 Schema,通过代码生成框架实时生成当前 SQL 的特定执行代码,接下来对生成的代码进行编译,最后直接执行编译后代码。由于生成的代码完全针对当前的 SQL,因此不存在多余的类型判断、虚函数调用等,即没有解释开销。这个技术的代表包括 Hyper 和其后继 Umbra,生成 LLVM 或者字节码;Apache Spark,生成 Java;Impala,生成 LLVM;Redshift,生成 C++,配合 Code Cache 降低编译开销。
向量化和 Codegen 虽然是截然不同的两个技术,但他们的性能表现比较接近[10][11]。Codegen 的上限更高,但由于技术门槛太高,当前主流系统都选择了向量化技术,部分系统辅以 Codegen 做复杂表达式计算[12]。
其中 Gluten, Blaze, Comet 都是桥接层,作用是把 Velox/Clickhouse/DataFusion 的 SQL 执行模块集成到 Spark 中。
以 Gluten + Velox 为例,主要模块包括算子转换、数据转换、统一内存管理等,分别负责 Spark 物理执行计划到 Velox 执行计划的转换,Spark 的 UnsafeRow 和 Velox 的 RowVector 之间的互相转换,以及 Spark 和 Velox 的堆内外统一内存管理。
当前社区的主流方案在 Benchmark 上通常有 2 倍以上的提升,在生产作业通常由 30% 左右的资源节省。
以上介绍了 Spark Native 的内容,接下来介绍 Spark + Cloud Native。
Cloud Native:
存算分离下的极限弹性
1.
物化 Shuffle:BSP 引擎的基础
物化 Shuffle 是 BSP(Bulk Synchronized Processing)引擎的基础,能带来两大重要优化 :
-
Task 重跑的代价极低。只需要读取上游物化的 Shuffle 数据来重算失败的 Task,无需重算整个作业,DAG 越复杂的作业收益越大。
-
物化 Shuffle 是自适应执行的基石。Databricks 关于自适应执行的论文[13]获得了 VLDB 2024 Industrial Best Paper 第二名,其中描述了 Databricks 基于 Shuffle 的自适应执行框架针对作业的性能和稳定性的多处优化。
因此,大多 BSP 引擎都支持物化 Shuffle,如 BigQuery,Spark,MaxCompute,Flink Batch,Hive,Trino(Project Tardigrade)等。尽管物化 Shuffle 有诸多好处,由于其把 Shuffle 数据写在计算节点本地,导致计算节点有状态,大大限制了作业内部的弹性伸缩。此外,本地 Shuffle的M * R的特性导致低效的磁盘和网络效率,进一步影响作业稳定性和性能。为了解决这个问题,Remote Shuffle Service技术应运而生,例如 BigQuery 采用了 Remote In-Memory Shuffle[14],而开源社区的标准是 Apache Celeborn[15]。
2.
Apache Celeborn:
解除本地存储的桎梏
本地 Shuffle 主要存在三个缺陷:
以 Apache Celeborn 为代表的 Remote Shuffle Service 系统,一方面通过接管 Shuffle 数据解除了计算节点的本地盘依赖,另一方面通过聚合 Shuffle Partition 数据使得 Shuffle Read 变成 One-to-One,从而大大提升作业的稳定性、性能和弹性。Celeborn 让包括 Spark 在内的 BSP 引擎能更好的应用云原生架构。
3. S
park Native
遇上 Cloud Native
将上述两个技术结合,就实现了 Spark Native + Cloud Native。目前 Celeborn 已经支持了 Gluten 和 Blaze,向量化引擎先对 Shuffle 数据做 Partition、序列化和压缩,然后通过 Celeborn SDK 推给 Celeborn 集群做聚合存储。Shuffle Reader 阶段通过 Celeborn SDK 读取数据,解压缩和反序列化之后拿到向量化数据,进而执行向量化运算。
04
EMR Serverless Spark
的实践
阿里云 EMR Serverless Spark[16]是一款 Lakehouse 架构的产品,如前文介绍,自底向上分别是 OSS,湖格式(Paimon/Delta/Iceberg/Hudi),元数据管理 DLF,以及深度优化的 Spark 引擎。其中 Spark 集成了向量化技术和企业级 Celeborn,提供给客户极速性能,Executor 粒度的极致弹性,以及完全的生态兼容。
Executor 粒度的弹性意味着不存在资源浪费,结合向量化和 Celeborn 的引擎能力,我们的生产客户观察到高达 60% 的计算资源节省,从而获得可观的降本增效。
EMR Serverless Spark 已于 2024 年 9 月份正式商业化,来自互联网、游戏、新能源、金融、制造业等多个行业标杆客户都在重点使用,欢迎尝试: )。
[1] Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics
[2] https://github.com/unitycatalog/unitycatalog
[3] https://github.com/apache/polaris
[4] https://github.com/apache/gravitino
[5] Photon: A Fast Query Engine for Lakehouse Systems
[6] The Snowflake Elastic Data Warehouse
[7] Dremel: Interactive Analysis of Web-Scale Datasets
[8] MonetDB/X100: Hyper-Pipelining Query Execution
[9] Efficiently Compiling Efficient Query Plans for Modern Hardware
[10] Everything You Always Wanted to Know About Compiled and Vectorized Queries But Were Afraid to Ask
[11] https://benchmark.clickhouse.com/
[12] https://clickhouse.com/blog/clickhouse-just-in-time-compiler-jit
[13] Adaptive and Robust ‑ery Execution for Lakehouses at Scale
[[14] https://medium.com/@marko.zarkovic/big-query-best-practices-47f4aa86de10
[15] https://celeborn.apache.org/
[16] https://help.aliyun.com/zh/emr/emr-serverless-spark/