本文主要介绍基于 MaxCompute 的离线近实时一体化新架构如何来支持这些综合的业务场景,提供基于Delta Table的近实时增全量一体的数据存储和计算解决方案。
随着当前数据处理业务场景日趋复杂,对于大数据处理平台基础架构的能力要求也越来越高,既要求数据湖的大存储能力,也要求具备海量数据高效批处理能力,同时还可能对延时敏感的近实时链路有强需求,本文主要介绍基于 MaxCompute 的离线近实时一体化新架构如何来支持这些综合的业务场景,提供基于Delta Table的近实时增全量一体的数据存储和计算解决方案。
当前典型的数据处理业务场景中,对于时效性要求低的大规模数据全量批处理的单一场景,直接使用 MaxCompute 足以很好的满足业务需求。但随着 MaxCompute 承载的业务无论是规模,还是使用场景,都越来越丰富,在处理好大规模离线批处理链路的同时,用户对近实时和增量处理链路也有很多的需求,下图展示了部分业务场景。
比如近实时数据导入链路,依赖平台引擎具备事务隔离,小文件自动合并等能力,又比如增全量数据合并链路,还依赖增量数据存储和读写,主键等能力。MaxCompute以前不具备新架构能力之前,要支持这些复杂的综合业务场景,只能通过下图所示的三种解决方案,但无论使用单一引擎或者联邦多引擎都存在一些无法解决的痛点。
方案一,只使用单一的MaxCompute离线批处理解决方案,对于近实时链路或者增量处理链路通常需要转化成T+1的批处理链路,会一定程度上增加业务逻辑复杂度,且时效性也较差,存储成本也可能较高。方案二,只使用单一的实时引擎,那资源成本会较高,性价比较低,且对于大规模数据批处理链路的稳定性和灵活性也存在一些瓶颈。方案三,使用典型的Lambda架构,全量批处理使用MaxCompute链路,时效性要求比较高的增量处理使用实时引擎链路,但该架构也存在大家所熟知的一些固有缺陷,比如多套处理和存储引擎引发的数据不一致问题,多份数据冗余存储和计算引入的额外成本,架构复杂以及开发周期长等问题。这些解决方案在成本,易用性,低延时,高吞吐等方面互相制约,很难同时具备较好的效果,这也驱动着MaxCompute有必要开发新的架构既能满足这些业务场景需求,也能提供较低的成本和较好的用户体验。
近几年在大数据开源生态中,针对这些问题已经形成了一些典型的解决方案,最流行的就是Spark/Flink/Trino开源数据处理引擎,深度集成Hudi / Delta Lake / Iceberg / Paimon开源数据湖,践行开放统一的计算引擎和统一的数据存储思想来提供解决方案,解决Lamdba架构带来的一系列问题。同时MaxCompute近一年多在离线批处理计算引擎架构上,自研设计了离线&近实时数仓一体化架构,在保持经济高效的批处理优势下,同时具备分钟级的增量数据读写和处理的业务需求,另外,还可提供Upsert,Time travel等一系列实用功能来扩展业务场景,可有效地节省数据计算,存储和迁移成本,切实提高用户体验。
上图所示即为MaxCompute高效支持上述综合业务场景的全新业务架构。写入端会融合多种数据集成工具将丰富的数据源近实时增量或批量导入到统一的MaxCompute表存储中,存储引擎的表数据管理服务会自动优化编排数据存储结构来治理小文件等问题;使用统一的计算引擎支持近实时增量和大规模离线批量分析处理链路;由统一的元数据服务支持事务机制和海量文件元数据管理。统一的新架构带来的优势也是非常显著,可有效解决纯离线系统处理增量数据导致的冗余计算和存储、时效低等问题,也能避免实时系统高昂的资源消耗成本,同时可消除Lambda架构多套系统的不一致问题,减少冗余多份存储成本以及系统间的数据迁移成本。简言之,一体化新架构既可以满足增量处理链路的计算存储优化以及分钟级的时效性,又能保证批处理的整体高效性,还能有效节省资源使用成本。
目前新架构已支持了部分核心能力,包括主键表,Upsert实时写入,Time travel查询,增量查询,SQL DML操作,表数据自动治理优化等,更详细的架构原理和相关操作指导请参考官网
架构原理[1]
和
用户操作文档[2]
。
本章节重点介绍新架构如何支持一些典型的业务链路以及产生的优化效果。
本章节主要介绍建表操作和关键表属性的含义,以及根据业务场景如何设置表属性值以达到最佳效果,也会简单描述一下存储引擎后台如何自动优化表数据。
首先,一体化新架构需要设计统一的表格式来存储不同格式的数据以支撑不同业务场景的数据读写,这里称为Delta Table,可以同时支持既有的批处理链路,以及近实时增量等新链路的所有功能。
createtable dt (pk bigint notnullprimarykey, val string) tblproperties ("transactional"="true");
createtable par_dt (pk bigint notnullprimarykey, val string)
partitioned by (pt string) tblproperties ("transactional"="true");
只需要设置主键Primary Key(PK),以及表属性transactional为true,就可以创建一张dt。PK用来保障数据行的unique属性,transactional属性用来配置ACID事务机制,满足读写快照隔离。
createtable dt (pk bigint notnullprimarykey, val string)
tblproperties
("transactional"="true", "write.bucket.num" = "32", "acid.data.retain.hours"="48");
此属性非常重要,表示每个partition或者非分区表的分桶数量,默认值为16,所有写入的记录会根据PK值对数据进行分桶存储,相同PK值的记录会落在同一个桶中。非分区表不支持修改,分区表可修改,但只有新分区生效。
数据写入和查询的并发度可通过bucket数量来水平扩展,每个并发可至少处理一个桶数据。但桶数量并不是越多越好,对于每个数据文件只会归属一个桶,因此桶数量越多,越容易产生更多的小文件,进一步可能增加存储成本和压力,以及读取效率。因此需要结合数据写入的吞吐,延时,总数据的大小,分区数,以及读取延时来整体评估合理的桶数量。
此外,数据分桶存储也非常有助于提升点查场景性能,如果查询语句的过滤条件为具体的PK值,那查询时可进行高效的桶裁剪和数据文件裁剪,极大减少查询的数据量。
表属性: acid.data.retain.hours
此属性也很重要,代表time travel查询时可以读取的历史数据实践范围,默认值是1天,最大支持7天。
建议用户按真实的业务场景需求来设置合理的时间周期,设置的时间越长,保存的历史数据越多,产生的存储费用就越多,而且也会一定程度上影响查询效率,如果用户不需要time travel查询历史数据,建议此属性值设置为0,代表关掉time travel功能,这样可以有效节省数据历史状态的存储成本。
Delta Table 支持完整的Schema Evolution操作,包括增加和删除列。在time travel查询历史数据时,会根据历史数据的Schema来读取数据。另外PK列不支持修改。
altertable dt add columns (val2 string);
altertable dt drop columns val;
存在的问题
Delta Table 典型场景之一是支持分钟级近实时增量数据导入,因此可能导致增量小文件数量膨胀,尤其是桶数量较大的情况,从而引发存储访问压力大、成本高,数据读写IO效率低下,文件元数据分析慢等问题,如果Update/Delete格式的数据较多,也会造成数据中间状态的冗余记录较多,进一步增加存储和计算的成本,查询效率降低等问题。
为此,后台存储引擎配套支持了合理高效的表数据服务对存储数据进行自动治理和优化,降低存储和计算成本,提升分析处理性能。
表数据组织格式
如上图所示,展示了分区表的数据结构,先按照分区对数据文件进行物理隔离,不同分区的数据在不同的目录之下; 每个分区内的数据按照桶数量来切分数据,每个桶的数据文件单独存放; 每个桶内的数据文件类型主要分成三种:
-
Delta Data File:每次事务写入或者小文件合并后生成的增量数据文件,会保存每行记录的中间历史状态,用于满足近实时增量读写需求。
-
Compacted Data File:Delta File经过Compact执行生成的数据文件,会消除数据记录的中间历史状态,PK值相同的记录只会保留一行,按照列式压缩存储,用来支撑高效的数据查询需求。
-
Delta CDC Log: 按照时序存储的CDC格式增量日志 (目前还未对外推出)。
如上图所示,Delta Table 的表数据服务主要分成Auto Sort / Auto Merge / Auto Compact / Auto Clean四种,用户无需主动配置,存储引擎后台服务会智能的自动收集各个维度的数据信息,配置合理的策略自动执行。
-
Auto Sort:
自动将实时写入的行存avro文件转换成aliorc列存文件,节省存储成本和提升读取效率。
-
Auto Merge:
自动合并小文件,解决小文件数量膨胀引发的各种问题。主要策略是周期性地根据数据文件大小/文件数量/写入时序等多个维度进行综合分析,进行分层次的合并。但它并不会消除任何一条记录的中间历史状态,主要用于time travel查询历史数据。
-
Auto Partial Compact:
自动合并文件并消除记录的历史状态,降低update/delete记录过多带来的额外存储成本,以及提升读取效率。主要策略是周期性地根据增量的数据大小/写入时序/time travel时间等多个维度进行综合分析来执行compact操作。该操作只针对超过time travel可查询时间范围的历史记录进行compact。
-
Auto Clean:
自动清理无效文件,节省存储成本
。
Auto Sort / Auto Merge / Auto Partial Compact操作执行完成后,会生成新的数据文件,所以老的数据文件其实没什么作用了,会被即时自动删除,及时节省存储成本。
如果用户对于查询性能的要求非常高,也可尝试手动执行全量数据的major compact操作,每个桶的所有数据会消除所有的历史状态,并且额外生成一个新的Aliorc列存数据文件,用于高效查询,但也会产生额外的执行成本,以及新文件的存储成本,因此非必要尽量不执行。
set odps.merge.task.mode=service;
altertable dt compact major;
MaxCompute离线架构一般在小时或天级别批量导入增量数据到一张新表或者新分区中,然后配置对应的离线ETL处理链路,将增量数据和存量表数据执行Join Merge操作,生成最新的全量数据,此离线链路的延时较长,计算和存储也会消耗一定的成本。
使用新架构的upsert实时导入链路基本可以保持数据从写入到查询可见的延时在5-10分钟,满足分钟级近实时业务需求,并且不需要复杂的ETL链路来进行增全量的Merge操作,节省相应的计算和存储成本。
实际业务数据处理场景中,涉及的数据源丰富多样,可能存在数据库、日志系统或者其他消息队列等系统,为了方便用户数据写入Delta Table, MaxCompute深度定制开发了开源
Flink Connector工具[4]
,
针对高并发、容错、事务提交等场景做了定制化的设计及开发优化,以满足延时低、正确性高等要求,同时也能很好的对接融合Flink生态。具体使用细节可以参考官网
产品说明[5]。
上图简单展示了整体写入的流程,可总结如下主要关键点:
-
基本大部份可融合flink生态的引擎或者工具都可通过flink任务,结合MaxCompute flink connector实时写入数据进 dt 表。
-
写入并发可以横向扩展,满足低延时高吞吐需求。写入流量吞吐跟flink sink并发数,Delta Table 桶数量等参数配置相关,可根据各自的业务场景进行合理配置。特别说明,针对 Delta Table 桶数量配置为Flink sink并发数的整数倍的场景,系统进行了高效优化,写入性能最佳。
-
满足数据分钟级可见,支持读写快照隔离
-
结合Flink的Checkpoint机制处理容错场景,保障exactly_once语义。
-
支持上千分区同时写入,满足海量分区并发写入场景需求。
-
流量吞吐上限可参考单个桶1MB/s的处理能力进行评估,不同环境不同配置都可能影响吞吐。如果对写入延时比较敏感,需要相对稳定的吞吐量,可考虑申请
独享的数据传输资源[6]
,
但需要额外收费。如果默认使用共享的公共数据传输服务资源组的话,在资源竞抢严重的情况下,可能保障不了稳定的写入吞吐量,并且可使用的资源量也有上限。
该链路可用来优化将多张增量表的数据列拼接到一张大宽表的场景,比较类似多流join的业务场景。
如上图所示,左边展示了MaxCompute的离线ETL链路处理此类场景,将多张增量表按照比较固定的时间来对齐数据,通常小时/天级别,然后触发一个join任务,把所有表的数据列拼接起来生成大宽表,如果有存量数据,还需要执行类似upsert的ETL链路。因此整体ETL链路延时较长,流程复杂,也比较消耗计算和存储资源,数据也容易遇到无法对齐的场景。
右边展示了通过 dt 表支持部分列更新的能力,只需要将各个表的数据列实时增量更新到 dt 大宽表中即可,dt 表的后台Compact服务以及查询时,会自动把相同PK值的数据行拼接成一行数据。该链路基本完全解决了离线链路遇到的问题,延时从小时/天级别降低到分钟级,而且链路简单,几乎是ZeroETL,也能成倍节省计算和存储成本。
目前支持以下两种方式进行部分列更新,功能还在灰度上线中,还未发布到官网(预计两个月内在公共云发布)。
createtable dt (pk bigint notnullprimarykey, val1 string, val2 string, val3 string) tblproperties ("transactional"="true");
insertinto dt (pk, val1) select pk, val1 from table1;
insertinto dt (pk, val2) select pk, val2 from table2;
insertinto dt (pk, val3) select pk, val3 from table3;
为了方便用户操作 dt 表,MaxCompute计算引擎对SQL全套的数据查询DQL语法和数据操作DML语法进行了支持,保障离线链路的高可用和良好的用户体验。SQL引擎的内核模块包括Compiler、Optimizer、Runtime等都做了专门适配开发以支持相关功能和优化,包括特定语法的解析,特定算子的Plan优化,针对pk列的去重逻辑,以及runtime upsert并发写入等。
数据处理完成之后,会由Meta Service来执行事务冲突检测,原子更新数据文件元信息等,保障读写隔离和事务一致性。
SQL DML具体语法可参考
官网文档[7]
,
对于Insert / Update / Delete / Merge Into都有详细的介绍和示例。
对于Upsert批式写入能力,由于 dt 表后台服务或者查询时会自动根据PK值来合并记录,因此对于Insert + Update场景,不需要使用复杂的Update/Merge Into语法,可统一使用Insert into插入新数据即可,使用简单,并且能节省一些读取IO和计算资源。
基于 Delta Table,计算引擎可高效支持
Time travel查询[8]
的典型业务场景,即查询历史版本的数据,可用于回溯业务数据的历史状态,或数据出错时,用来恢复历史状态数据进行数据纠正。
select * from dt timestampasof'2024-04-01 01:00:00';
select * from dt timestampasofcurrent_timestamp() - 300;
select * from dt timestampasof get_latest_timestamp('dt', 2);
可查询的历史数据时间范围,可通过表属性
acid.data.retain.hours
来配置,
配置策略
上文已介绍,配置参数详解参考
官网[9]
。
SQL引擎接收到用户侧输入的time travel查询语法后,会先从Meta服务中解析出来要查询的历史数据版本,然后过滤出来要读取的Compacted file和Delta file,进行合并merge输出,Compacted file可极大提升读取效率。
dt 表支持增量写入和存储,最重要的一个考虑就是支持增量查询以及增量计算链路,为此,也专门设计开发了新的SQL增量查询语法来支持近实时增量处理链路。用户通过增量查询语句可灵活构建增量数仓业务链路,近期正在规划开发支持增量物化视图来进一步简化使用门槛,提升用户体验,降低用户成本。
-
用户指定时间戳或者版本查询增量数据,详细语法参考
官网[11]
,简单示例:
//查询2024-04-0101:00:00-01:10:00之间十分钟的增量数据
select * from dt timestampbetween'2024-04-01 01:00:00'and'2024-04-01 01:10:00';
//查询前10分钟到前5分钟之间的增量数据
select * from dt timestampbetweencurrent_timestamp() - 601andcurrent_timestamp() - 300;
//查询最近一次commit的增量数据
select * from dt timestampbetween get_latest_timestamp('dt', 2) and get_latest_timestamp('dt');
-
引擎自动管理数据版本查询增量数据,不需要用户手动指定查询版本, 非常适合周期性的增量计算链路 (功能灰度发布中,以官网发布为准)。简单示例:
//绑定一个stream对象到dt表上
create stream dt_stream ontable dt;
insertinto dt values (1, 'a'), (2, 'b');
//自动查询出来新增的两条记录(1, 'a'), (2, 'b'), 并把下一次的查询版本更新到最新的数据版本
insert overwrite dest select * from dt_stream;
insertinto dt values (3, 'c'), (4, 'd');
//自动查询出来新增的两条记录(3, 'c'), (4, 'd')
insert overwrite dest select * from dt_stream;