导读
本文介绍了 Apache Hudi 从零到一:解析读取流程和查询类型(二),翻译自原英文博客 https://blog.datumagic.com/p/apache-hudi-from-zero-to-one-210
。
分享嘉宾|许世彦 Onehouse 开源项目负责人
编辑整理|张阳
出品社区|
DataFun
在上一篇文章中,我们探讨了 Hudi 表中的数据布局,并详细介绍了两种表类型:Copy-on-Write
(CoW) 和 Merge-on-Read
(MoR),以及它们各自的优缺点。基于这些知识,我们现在将进一步讨论在 Hudi 中读取操作是如何实现的。
多种引擎(例如 Spark、Flink、Presto 和 Trino 等)已经与 Hudi 实现了集成,这使得用户能够对数据进行高效的分析查询。虽然各引擎与 Hudi 集成的 API 可能存在差异,但作为分布式查询引擎,它们在处理查询时的基本流程是相似的。具体来说,这些引擎首先会解析用户输入的 SQL 语句,然后在工作节点上生成并执行查询计划,最后将处理结果汇总返回给用户。
在这篇文章中,我选择了 Spark
作为示例引擎来说明读取操作的流程,并提供代码片段来展示各种 Hudi 查询类型的用法。首先,我会讲解 Spark 查询的相关内容,接着深入探讨 Hudi 与 Spark 集成的细节,最后对不同的查询类型进行阐释。
01
Spark 查询入门
Spark SQL 是一个分布式 SQL 引擎,它能够处理和分析大规模数据集。用户通常通过编写 SQL 查询来开始分析任务,这些查询旨在从存储系统上的表中提取所需的结果。Spark SQL 接收用户输入的 SQL 语句,并将其转换成一个执行计划,该计划在 Spark 集群上分多个阶段执行,如下图所示。
在分析阶段,Spark SQL 会将用户的 SQL 输入进行解析,并转换成树状结构,这种结构是对 SQL 语句的抽象表示。系统会查阅表目录来获取如表名称和列类型等相关信息。
在逻辑优化步骤中,优化器会在逻辑层面上对这树型结构进行评估和改进。常见的优化手段包括谓词下推、schema 剪裁和 null 值的传播等。这一步骤的目的是生成一个逻辑计划,它描述了执行查询所需的所有计算步骤。需要注意的是,逻辑计划只是一种逻辑上的表示,它并未包含在实际执行节点上运行所需的详细操作信息。
物理规划阶段则扮演了逻辑层和物理层之间的桥梁角色。在这个阶段,优化器会根据逻辑计划生成物理计划,物理计划详细指定了执行计算的具体方式。例如,在逻辑计划中,一个 join 操作可能只是简单地表示为 join 节点,而在物理计划中,这个连接操作会被具体指定为 sort-merge join 或 broadcast-hash join,具体的选择取决于相关表的大小估计。最终,系统会为代码生成和实际执行选择最优的物理计划。
这三个阶段——解析、逻辑优化和物理规划——构成了 Catalyst 优化器的核心功能,它们共同确保了 Spark SQL 能够高效地执行复杂的查询任务。对于这个主题的进一步研究,您可以参考一些精彩的讲座。(
Spark SQL 讲座一
、
Spark SQL 讲座二
)
在执行过程中,Spark 应用程序基于一种名为 RDD(弹性分布式数据集)的基础数据结构来运行。RDD 是由 JVM 对象组成的集合,这些对象具有以下特点:不可变性、能够在不同节点上分区,并且由于记录了数据血缘信息,因此具备容错能力。当应用程序运行时,会根据执行计划对 RDD 进行转换并执行操作以生成结果。这个过程通常也被称为 RDD 的“物化”过程。
02
Spark-Hudi 读取流程
1. 数据源 API
在 Catalyst 优化器生成查询计划时,与数据源的连接对于降低优化难度是有益的。Spark 的数据源 API 设计初衷就是为了提供与多种数据源集成的灵活性。一些数据源是内置的,比如 JDBC、Hive 表和 Parquet 文件。Hudi 表则代表了一种特殊类型的
自定义数据源,这得益于其特定的数据布局。
下图展示了 Spark-Hudi 读取流程中的一些核心接口和方法调用。
使用 Spark 读取 Hudi 数据的操作流程如下:
-
集成入口点:
DefaultSource
作为集成的入口,将数据源格式定义为
org.apache.hudi
或
hudi
。
它提供了一个
BaseRelation
,Hudi 通过它来实现数据提取过程。
-
构建扫描:
buildScan()
是一个核心 API,用于将过滤器传递给数据源以进行查询优化。
Hudi 通过
collectFileSplits()
方法来收集相关的文件。
-
收集文件分片:
collectFileSplits()
方法会将所有过滤器传递给 FileIndex 对象,该对象帮助确定需要读取的文件。
-
文件索引:
FileIndex 查找所有相关的 FileSlice 进行进一步处理。
-
构建 RDD:
在识别 FileSlice 后,调用 composeRDD()。
-
加载和读取:
FileSlices 被加载并读取为 RDD。
对于列式存储文件(如 Parquet 格式的基本文件),读取操作通过只读取必要的列来最小化数据传输。
-
返回 RDD:
RDD 从 API 返回,用于进一步的查询规划和代码生成。
请注意,上述步骤仅提供了一个读取流程概述,省略了一些细节,例如支持读取时架构变更和高级索引技术(例如使用元数据表跳过数据)。
这个流程适用于所有使用 Spark
的 Hudi 查询类型。在后续章节中,我将详细介绍各种查询类型的工作原理。除了读取优化模式之外,这些查询类型都适用于 Copy on Write (CoW) 和 Merge on Read (MoR) 表。
2. 快照查询
这是读取 Hudi 表时的默认查询类型。其目的是从表中获取最新的记录,实际上就是在查询时捕捉表的“快照”。当对 Merge on Read (MoR) 表执行此操作时,由于需要将日志文件与基文件合并,这可能会对性能产生一定影响。
启动包含 Hudi 依赖项
的 spark-sql shell 后,可以运行 SQL 语句来设置一个 MoR 表,并插入及更新一条记录。
create table hudi_mor_example (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
) location '/tmp/hudi_mor_example';
set hoodie.spark.sql.insert.into.operation=UPSERT;
insert into hudi_mor_example select 1, 'foo', 10, 1000;
insert into hudi_mor_example select 1, 'foo', 20, 2000;
insert into hudi_mor_example select 1, 'foo', 30, 3000;
可以通过运行 SELECT 语句来执行快照查询,如下所示,它将检索记录的最新值。
spark-sql> select id, name, price, ts from hudi_mor_example;
1 foo 30.0 3000
Time taken: 0.161 seconds, Fetched 1 row(s)
3. 读取优化(RO)查询
RO 查询类型的目标是在更低的读取延迟和可能不是最新的结果之间做出权衡,因此它只适用于 Merge on Read (MoR) 表。在进行此类查询时,`collectFileSplits()`方法只会获取 `FileSlice` 的基文件部分。
上面提供的设置代码会自动创建一个名为 `hudi_mor_example_ro` 的目录表,并在其中指定属性 `hoodie.query.as.ro.table=true`。这个属性告诉查询引擎总是执行 RO 查询。执行下面的 SELECT 语句将返回记录的原始值,因为后续的更新还没有应用到基文件中。
spark-sql> select id, name, price, ts from hudi_mor_example_ro;
1 foo 10.0 1000
Time taken: 0.114 seconds, Fetched 1 row(s)
4. 时间旅行查询
通过指定时间戳,用户可以请求 Hudi 表在特定时间点的历史快照。正如前面所提到的,FileSlices 与特定的提交时间关联,因此可以进行筛选。在执行时间旅行查询时,如果没有精确匹配的时间点,FileIndex 将只查找与指定时间相等或早于指定时间的 FileSlices。
spark-sql> select id, name, price, ts from hudi_mor_example timestamp as of '20230905221619987';
1 foo 30.0 3000
Time taken: 0.274 seconds, Fetched 1 row(s)
spark-sql> select id, name, price, ts from hudi_mor_example timestamp as of '20230905221619986';
1 foo 20.0 2000
Time taken: 0.241 seconds, Fetched 1 row(s)
第一个 SELECT 语句在最新插入的增量提交时间点上精确执行时间旅行查询,从而提供了表的最新快照。第二个查询设置的时间戳比最新插入的时间戳早,因此生成了倒数第二个插入的快照。
在示例中,时间戳遵循 Hudi 时间线的格式“yyyyMMddHHmmssSSS”。也可以将其设置为“yyyy-MM-dd HH:mm:ss”的形式,或者仅使用“yyyy-MM-dd”。
5. 增量查询
用户可以设置起始时间戳(可以选择是否带有结束时间戳),以便在指定的时间范围内检索已发生变更的记录。如果没有设置结束时间戳,时间窗口将包含最近的记录。Hudi 还提供了完整的变更数据捕获(CDC)功能,这通过在写入端启用额外的日志记录,并为增量读取器激活 CDC 模式来实现。更多细节将在后续专门介绍增量处理的帖子中详细说明。
03
回顾
在本文中,我们介绍了 Spark 的 Catalyst 优化器,探讨了 Hudi 如何通过实现 Spark 数据源 API 来读取数据,并详细介绍了四种不同的 Hudi 查询类型。在下一篇文章中,我将展示写入流程,以帮助大家更深入地理解 Hudi。欢迎关注 Hudi 公众号 ApacheHudi 获取微信群信息,加入钉钉群:35087066,发送空邮件至 [email protected] 参与讨论。
Onehouse 创始团队成员,开源项目负责人。Apache Hudi PMC 成员
。