本文主要介绍 Hulu 用户分析平台使用的OLAP引擎——Nesto(Nested Store),是一个提供近实时数据导入,嵌套结构、TB级数据量、秒级查询延迟的分布式OLAP解决方案,包括一个交互式查询引擎和数据处理基础设施。
1. 项目背景
Nesto起源于用户分析团队,业务上需要一个面向用户、分析型的产品,提供任意维度的Ad-Hoc交互式查询、导出数据,供运营、产品、第三方数据公司使用。 一个典型场景是:导出2018年1月看过《冰与火之歌》第7季第7集(S7E7)超过5次的新注册用户,包括用户名、email两个域,用于发送营销邮件。2. 数据平台pipeline
在正式介绍Nesto之前,有必要先介绍其产生的背景,所以先介绍数据平台pipeline。 如下图所示,用户分析平台的最核心的资产是一套pipeline,通过整合公司内多个团队的数据,使用HBase集中存储起来,提供一个UI Portal让用户简单的描述需求,把需求存储于metadata db中,然后定期运行一个Spark Job去scan HBase,进行批量的计算,最终把有价值的数据服务出去,例如作为用户标签服务的上游方等。
|
|
3. 数据规模
目前用户数据规模如下,
1、HBase 1500 Regions, 占HDFS 20T (一副本、LZO)
2、300+列,其中有50多是嵌套的结构,例如观看行为。
3、1亿+用户数据。
4、历史全量近1000亿观看行为,最近一年近300亿次。
4. Nesto的诞生
为了满足OLAP的需求,包括
1、支持filter, projection和aggregation和自定义UDF。
2、Ad-hoc查询, 普通列响应时间秒级,嵌套列小于百s。
3、数据从进入OLAP到能够被查询到,延迟要在小时级别。
我们对比了如下开源方案,
1、ROLAP
类似SparkSQL、Presto、Impala等,它们都必须把数据抽象为关系型的表,可以使用表达丰富的SQL,不存在数据冗余,在实际运行期间往往会经过SQL词法解析、语法解析、逻辑执行计划生成和优化,再到物理执行计划的生成和优化,会存在数据shuffle、join。这与现有的用户分析平台已存数据模型——大宽表,不一致,需要经过ETL做数据转换。另外我们的另一个目标支持近实时的数据导入,而这些方案的OLAP目前都不支持。
2、MOLAP
类似Druid、Kylin、百度Palo等,他们都支持多维查询,通过预聚合的方式来提升查询性能,但是需要抽象出维度列、指标列,甚至某个维度的分区等。同样数据模型和大宽表不一致。另外,用户分析平台往往涉及一个用户所有行为数据,查询请求往往就是要查询若干月,甚至若干年之前的,涉及大量fact数据的全表scan,这也不能很好的match这种物化视图或者上卷表的模式。
5. Nesto的基础
Nesto的实现依赖于一些已有的技术和理论。
1、存储模型。采用嵌套模型,非关系型。
2、存储格式。列式存储,对于OLAP,可以跳过不符合条件的数据,降低IO数据量,加大磁盘吞吐。通过压缩、编码可以降低磁盘存储空间。只读取需要的列,甚至可以支持向量运算,能够获取更好的扫描性能。Nesto采用Parquet作为存储格式,是 Google Dremel 的开源实现。Parquet对嵌套数据结构实现了打平和重构算法,实现了按行分割(形成row group),按列存储(由多个page组成),对列数据引入更具针对性的编码和压缩方案,来降低存储代价,提升IO和计算性能。
3、MPP架构。大规模并行处理架构,可以支持查询的横向扩展,为海量数据查询提供高性能解决方案,实际上Nesto借鉴了Presto。一个Parquet文件是splitable的,因此利用DAC分治的思想,把大问题划分为小问题,分布式并行解决。
4、RPC选型。分布式系统的RPC通信是基础,Presto大量采用RESTFul API解决,而Nesto选择使用Thrift进行封装解决,提供基于NIO全双工、非阻塞I/O的通信模型,通过Reactor模式实现线程池和串行无锁化来实现服务端API的暴露。
5、分布式配置。Nesto中的表结构存储在分布式配置系统中,可做到热部署更新。
6、高可用保证。使用YARN管理实例,保证高可用和资源的合理分配。使用Zookeeper做集群节点变更的通知与分发。
7、海量数据近实时查询支持。借鉴Google MESA的思想,关于MESA的模型,请参考这篇文章了解 《浅谈从Google Mesa到百度PALO》 。
8、其他技术点。使用MySQL存储已完成任务情况,使用web技术构建管理Portal。使用Hadoop基础设施,包括HDFS存储数据。
9、实现语言。Java。
6. Nesto的存储模型
逻辑上,可以看做一张嵌套的大平表(flat table),数据按照行存储,每一行的结构都是嵌套的。和第二章提到的HBase的模型逻辑上是一致的。
物理上,采用开源列式存储方案,Nesto选择 Parquet ,它独立于计算框架,按照 Google Dremel 提到的方案做按行切割,按列编码压缩。一张表对应1到N个Parquet文件。
下图是 Parquet官网 的物理存储图。每个Parquet文件都是包含若干row group,这是做MPP的基本分割单元,一个MPP的sub-task可以对应K个row group,一个row group包含了若干用户的全部信息,按照schema定义的列,进行列式存储,每列包含若干个page,每个page是最小的编码压缩单元。每个列都可以采用自由的编码方式,例如run length encoding、dict encoding、bit packed encoding,delta encoding等等,或者他们的组合。
用于MPP架构的存在,通常会多增加副本数,来支持读负载均衡和本地化locality查询。
表结构元数据不用DML描述,而是使用Parquet提供的proto schema方式,目前通过分布式配置中心管理,通过管理控制台新增表和修改表。例如,在配置中心表RegUsersAttributes的schema描述如下。
7. Nesto整体架构
Nesto分为查询引擎和数据处理基础设施两大部分。
查询引擎的架构如下。
Nesto-portal 。客户端,用于提供基于web的查询请求输入和下载结果。
Nesto-cli 。客户端,提供命令行交互式的查询。
State store 。使用zookeeper来做集群管理,进而实现高可用的分布式系统,任何节点都可以知道整个Nesto的拓扑结构。
Nesto server 。非中心化的设计思想,类似 Presto ,任意节点分为两种角色,包括coordinator和worker,一般来说都是少数的coordinator加上大量的worker的拓扑。每一个部署节点都是一个Nesto server,只不过角色有区分。目前nesto-server均部署在YARN上,做常驻进程,YARN做高可用保障和资源分配管理。
Coordinator 。是某一个查询的管理节点,负责接收客户端的请求,解析请求,由于使用大宽表的数据结构,加上复用predicate lib,因此做完词法分析、语法分析,生成AST后,查询计划的生成很简单,把filter的逻辑全部下推到底层的worker去执行即可,只需要做table的split就可以生成sub-tasks,这些sub-tasks就是物理执行计划的体现,所以不存在stage或者fragement等类似Presto、Impala的概念。Coordinator通过State store感知集群的拓扑,同时和每个worker都保持了一个心跳,worker通过心跳信息上报自己的状态,coordinator可以进一步了解worker的负载和健康状态。Coordinator通过一定的调度算法,把sub-tasks分发给worker去执行,等待worker的结果汇总过来,如果需要再做一些aggregation和merge的工作。最终,流式的传输结果给客户端展现或者下载。
Worker 。接收coordinator分发的若干sub-tasks,放到线程池中执行(线程池就是槽位slot),通过Parquet提供的API,逐一读取一行的数据,利用predicate lib进行filter,通过一个异步的生产者消费模型,批量处理数据,然后分批序列化后,按照data chunk的方式,源源不断的发送回coordinator。另外,worker会做一些优化工作,除了天然的filter pushdown,还包括pre-aggregation,limit pushdown等等。
数据处理基础设施请参考第9章节。
8. Nesto的查询引擎
下面针对一个典型的查询执行过程,进一步展开描述各个组件的工作流程。
8.1 State store
主要做高可用,节点发现、上下线通知使用。依托zookeeper,每个nesto-server在启动的时候都会注册自己到zookeeper的某个临时节点,任何想知道集群拓扑的地方,例如其他nesto-server、nesto-cli、nesto-portal都通过订阅zookeeper得到集群的拓扑情况。当集群发生变化的时候,可以通过zookeeper订阅变化。8.2 Nesto-server之Coordinator
API
Nesto server需要提供两套API,一个是客户端与之交互,另一个是coordinator和worker通信的API,都通过thrift进行开发和编写。使用TThreadedSelectorServer作为通信基础设施,Linux上使用I/O多路复用的epoll技术,同时使用两种线程池,一个做accept连接通信握手技术,一个做编解码和业务逻辑的处理。解析请求
通过thrift API方法提交上来的请求,针对JSON或者SQL++ DSL解析,同时得到抽象语法树AST,包含查询表,filter条件,表的schema信息,要查询的列,以及每一列的聚合函数。执行计划生成
由于Ad-hoc查询和MPP的架构,因此系统的并发查询能力要做一定的限制,Nesto可通过参数配置并发执行请求的数量,在解析请求完毕后,会把request放入带有超时机制的线程池中,如果查询没有在规定时间内完成,那么就会取消查询,并且revoke掉所有已经分发的sub-tasks。线程池中会进行执行计划的生成和后续的处理流程。 执行计划的生成分为两步,第一步把大的表,也就是Parquet文件进行split,每个split都是一个sub-task,这里复用了 parquet-mr 项目中的ParquetInputFormat,把一个大的Parquet文件split为若干个InputSplit,对于每一个split的大小可以通过参数控制,也就调整了每个sub-task扫描的数据大小,可以避免data skew问题,例如1G一个split,这1G可以包含多个Row Group,而每个Row Group可能是HDFS Block Size,例如256MB。第二步,根据filter条件列以及结果列构造新的schema,目的是用Parquet读取文件的时候需要传入这个schema,这样就可以只查询需要的列,发挥列式存储在IO上面的优势。调度分发任务
执行计划生成了本次查询的所有分片信息,如何调度分片给合适的worker去执行,也就是生成worker到多个Task的映射,是调度任务负责的。这里可以包含很多策略,例如可以轮训的assign task到每个node,也可以按照HDFS Locality来进行数据本地化的优化,还需要综合考虑每个worker的负载状况,把task分配给负载较为轻的worker,通常负载要考虑的维度,包括worker节点的CPU、IO、slot占用数、权重等。如下图所示。查询执行
当sub-tasks都分发出去后,worker会源源不断通过thrift RPC把结果批量的发送回coordinator,每一批可以看做是一个data chunk。 coordinator在内存中维护一个aggregator数据结构,来merge所有worker返回的data chunk,一个data chunk就是一个批次的结果,data chunk使用某种序列化协议(例如Java原生或者Kyro),data chunk可以支持去重。 一个data chunk包含了一部分结果列,例如select列为reg.raw_attributes.username, reg.raw_attributes.email,则在coordinator会追加累积这些数据,然后再源源不断的推送给客户端。 如果结果列包含聚合函数,例如GROUP BY cid,COUNT(userid),那么worker会做pre-aggregation,把聚合pushdown到worker执行,最终给coordinator的数据就是聚合后的结果,aggregator做combine即可。同样,常用的LIMIT pushdown也是支持的。失败处理
类似Presto对于失败的态度就是不太容忍,对于分发失败的任务,在超过重试次数后就failfast整个query。对于worker返回失败的data chunk,包括丢失响应,或者返回的data chunk非法等,超过一定的重试次数后也failfast整个query。8.3 Nesto-server之worker
worker和coordinator类似,都存在一个thrift API,用于接收coordinator发送的sub-tasks查询请求。 worker充分利用了线程池技术,池子里就是一些槽位(slot),每一个sub-task会占用一个slot,当slot占满后就不能执行新的请求了,从而限制worker的计算能力不超负荷。 worker和coordinator维持一个心跳,定期汇报自己的负载信息,包括CPU、IO、slot占用情况等,供coordinator调度算法使用。 和Presto类型,抽象出connector的概念,Nesto的worker抽象出了scanner的概念,这是一个可扩展的接口,目前只支持Parquet文件的查询,后续可以扩展到CarbonData等。 worker要做的工作就是根据Parquet提供的client API读取文件,解压缩、解码文件,在内存中构造出所有row group的行视图,这个过程是一个流水线式的,保证尽可能低的使用内存。文章最早提到了pipeline中已经存在的predicate lib,使用这个lib,apply到一行,就可以做filter,也就天然实现了filter pushdown的功能,对于符合filter条件的行,取其结果列,在内部维护一个队列,批量的构造data chunk,序列化后不需要进行shuffle落盘,直接在内存里面,网络直连的发送回coordinator即可。这种filter pushdown到最底层的方式,避免了落盘和JOIN operator,所以在性能上对于无法剪枝的scan型的场景会非常高效。具体流程如下图所示。8.4 Portal & Cli
Nesto Portal提供了UI,可以方便PM、运营人员提交查询,Portal截图如下。例子中使用JSON DSL发送请求。