51好读  ›  专栏  ›  狗厂

时序数据库HiTSDB:分布式流式聚合引擎

狗厂  · 掘金  ·  · 2018-04-23 03:45

正文

阿里妹导读:高性能时间序列数据库 (High-Performance Time Series Database , 简称 HiTSDB) 是一种高性能,低成本,稳定可靠的在线时序数据库服务, 提供高效读写,高压缩比存储、时序数据插值及聚合计算,时间线多维分析,主要服务于监控系统和IoT领域。 目前已在阿里巴巴集团多项内部业务中获得广泛运用,稳定服务于2016年双11、2017年双11,

背景

HiTSDB时序数据库引擎在服务于阿里巴巴集团内的客户时,根据集团业务特性做了很多针对性的优化。 然而在HiTSDB云产品的打磨过程中逐渐发现,很多针对性的优化很难在公有云上针对特定用户去实施。

于此同时, 在公有云客户使用HiTSDB的过程中,发现了越来越多由于聚合查询导致的问题,比如: 返回数据点过多会出现栈溢出等错误,聚合点过多导致OOM, 或者无法完成聚合,实例完全卡死等等问题。这些问题主要由于原始的聚合引擎架构上的缺陷导致。

因此HiTSDB开发团队评估后决定围绕新的聚合引擎架构对HiTSDB引擎进行升级,包含: 存储模型的改造,索引方式的升级,实现全新的流式聚合,数据迁移,性能评测。 本文主要围绕这5个方面进行梳理,重点在“全新的流式聚合部分”。

1. 时序数据存储模型:

1.1 时序的数据存储格式。

一个典型的时序数据由两个维度来表示,一个维度表示时间轴,随着时间的不断流入,数据会不断地追加。 另外一个维度是时间线,由指标和数据源组成,数据源就是由一系列的标签标示的唯一数据采集点。例如指标cpu.usage的数据来自于机房,应用,实例等维度组合成的采集点。 这样大家逻辑上就可以抽象出来一个id+{timestamp, value}的时序数据模型。这种数据模型的存储是如何呢。一般有两种典型的数据存储思路:

  • 一种按照时间窗口维度划分数据块,同一段自然时间窗口内的连续数据放到相邻的位置,比如{1:00, 2:00}->(id1, id2, id3, ... ... ,idN)。 采用这种方式的典型时序数据库包含InfluxDB, Promethues等等TSMT结构的数据库。OpenTSDB有些特殊,因为OpenTSDB是单值模型,指标这个维度在查询的时候是必带的。 所以可以先按照指标做了一级划分,再根据时间窗口做二级的划分,本质上还是同一时间窗口内的连续数据。 按照时间窗口切分的方式,优势是写入的时候可以很天然的按照窗口去落盘,对于高纬度的标签查询基本上是一些连续Scan. 这种方式有个比较难解的问题就是"out of order"乱序问题,对于时间窗口过期后再来的时间点,Promethues直接采用丢弃的方式,InfluxDB在这种情况下性能会有损耗。

  • 另外一种按照时间线维度划分数据块,同一时间线的数据放到相邻的位置,比如(id1)->(1:00, 2:00, 3:00, ... ... , 23:00)。 HiTSDB采用时间线维度划分的方式:目前落盘数据存储于HBASE,底层Rowkey由指标+标签+自然窗口的方式组合而成. Rowkey按照大小顺序合并某个时间线的数据点是连续相邻的。 因此对于一些低维的查询效率是非常高效的。根据目前接触的一些物联网服务,更多的是一些低维的访问。对于中等维度的查询采用流式scan。对于极高纬度标签的查询HiTSDB采用预聚合的服务(不在本文讨论范围内)。

1.2 时序模型的热点问题处理

生产环境中业务方采集的指标类型多种多样,对指标的采集周期各不相同。比如cpu.usage这个指标的变化频率比较快,业务方关注度高,采集周期通常很短,1秒,5秒,10秒等等。 然而指标disk.usage这个指标变化趋势相对平滑,采集周期通常为1分钟,5分钟, 10分钟等。这种情况下,数据的存储如果针对同一个指标不做特殊处理,容易形成热点问题。 假设按照指标类型进行存储资源的分片,想象一下如果有20个业务,每个业务10个集群,每个集群500台主机,采集周期是1秒的话,每秒就会有10万个cpu.usage的指标数据点落到同一个存储资源实例中, 而disk.usage采集周期为1分钟,所以大约只有1666个指标数据点落到另外一个存储资源上,这样数据倾斜的现象非常严重。

1.2.1 分桶

这类问题的经典解法就是分桶。比如除了指标类型外,同时将业务名和主机名作为维度标识tags,把指标cpu.usage划分到不同的桶里面。 写入时根据时间线哈希值分散写入到不同的桶里面。 OpenTSDB在处理热点问题也是采用了分桶模式,但是需要广播读取,根本原因在于查询方式需要在某个时间窗口内的全局扫描。 所以设置OpenTSDB的分桶数量需要一个平衡策略,如果数量太少,热点还是有局部性的问题,如果太多,查询时广播读带来的开销会非常大。

与其相比较,HiTSDB避免了广播读,提高了查询效率。由于HiTSDB在查询时,下发到底层存储扫描数据之前,首先会根据查询语句得到精确命中的时间线。 有了具体的时间线就可以确定桶的位置,然后到相应的块区域取数据,不存在广播读。 关于HiTSDB如何在查询数据的时候获取命中的时间线,相信读者这个疑问会在读取完倒排这一节的时候消释。

1.2.2 Region Pre-Split

当一个表刚被创建的时候,HBase默认分配一个Region给新表。所有的读写请求都会访问到同一个regionServer的同一个region中。 此时集群中的其他regionServer会处于比较空闲的状态,这个时候就达不到负载均衡的效果了。 解决这个问题使用pre-split,在创建新表的时候根据分桶个数采用自定义的pre-split的算法,生成多个region。 byte[][] splitKeys =new byte[bucketNumber-1][]; splitKeys[bucketIndex-1] = (bucketIndex&0xFF);

2. 倒排索引:

2.1 时序数据中的多维时间线

多维支持对于任何新一代时序数据库都是极其重要的。 时序数据的类型多种多样,来源更是非常复杂,不止有单一维度上基于时间的有序数值,还有多维时间线相关的大量组合。 举个简单例子,cpu的load可以有三个维度描述cpu core, host, app应用,每个维度可以有百级别甚至万级别的标签值。 sys.cpu.load cpu=1 host=ipA app=hitsdb,各个维度组合后时间线可以轻松达到百万级别。 如何管理这些时间线,建立索引并且提供高效的查询是时序数据库里面需要解决的重要问题。 目前时序领域比较主流的做法是采用倒排索引的方式。

2.2 倒排索引基本组合

基本的时间线在倒排中的组合思路如下:

时间线的原始输入值:

id time series
1 sys.cpu.load cpu=1 host=ipA app=hitsdb
2 sys.cpu.load cpu=2 host=ipA app=hitsdb
3 sys.cpu.load cpu=3 host=ipA app=hitsdb
4 sys.cpu.load cpu=4 host=ipA app=hitsdb
5 sys.cpu.load cpu=1 host=ipB app=hitsdb
6 sys.cpu.load cpu=2 host=ipB app=hitsdb
7 sys.cpu.load cpu=3 host=ipB app=hitsdb
8 sys.cpu.load cpu=4 host=ipB app=hitsdb

倒排构建后:

term posting list
cpu=1 1,5
cpu=2 2,6
cpu=3 3,7
cpu=4 4,8
host=ipA 1,2,3,4
host=ipB 5,6,7,8
app=histdb 1,2,3,4,5,6,7,8

查询时间线 cpu=3 and host=ipB:

term posting list
cpu=3 3,7
host=ipB 5,6,7,8

取交集后查询结果为7:

id time series
7 sys.cpu.load cpu=3 host=ipB app=hitsdb

2.3 倒排面临的问题以及优化思路

倒排主要面临的是内存膨胀的问题:

  • posting list过长, 对于高纬度的tag,比如“机房=杭州”,杭州可能会有千级别甚至万级别的机器,这就意味着posting list需要存储成千上万个64-bit的id。 解决这个问题的思路是采用压缩posting list的方式, 在构建posting list的时候对数组里面的id进行排序,然后采用delta编码的方式压缩。

  • 如果Tag键值对直接作为term使用,内存占用取决于字符串的大小,采用字符串字典化,也可大大减少内存开销。

3. 流式聚合引擎

3.1 HiTSDB聚合引擎的技术痛点

HiTSDB现有聚合引擎公有云公测以及集体内部业务运行中,暴露发现了以下问题:

3.1.1 Materialization执行模式造成Heap内存易打爆

下图显示了原查询引擎的架构图。HiTSDB以HBase作为存储,原引擎通过Async HBase client 从HBase获取时序数据。由于HBase的数据读取是一个耗时的过程,通常的解法是采用异步HBase client的API,从而有效提高系统的并行性。但原聚合引擎采用了一种典型的materialization的执行方式:1)启动多个异步HBase API启HBase读,2)只有当查询所涉及的全部时序数据读入到内存中后,聚合运算才开始启动。这种把HBase Scan结果先在内存中materialized再聚合的方式使得HiTSDB容易发生Heap内存打爆的现象。尤其当用户进行大时间范围查询,或者查询的时间线的数据非常多的时候,因为涉及的时序数据多,HiTSDB会发生Heap OOM而导致查询失败。

3.1.2 大查询打爆HBase的问题

两个原因造成HiTSDB处理聚合查询的时候,容易发生将底层HBase打爆。

  • HBase 可能读取多余时间线数据。HiTSDB的时间线采用指标+时间窗口+标签的编码方式存储在HBase。典型的查询是用户指定一个指标,时间范围,以及空间维度上标签要寻找的匹配值。空间维度的标签查询条件并不都是在标签编码前缀。当这种情况发生时,HiTSDB倒排索引不能根据空间维度的查询条件,精确定位到具体的HBase的查询条件,而是采用先读取再过滤的方式。这意味着HBase有可能读取很多冗余数据,从而加重HBase的负载。

  • HiTSDB有可能在短时间内下发太多HBase读请求。一方面,HiTSDB在HBase采用分片存储方式,对每一个分片,都至少启动一个读请求,另一方面,因为上面提到的materialization的执行方式,一个查询涉及到的HBase读请求同时异步提交,有可能在很短时间内向HBase下发大量的读请求。这样,一个大查询就有可能把底层的HBase打爆。

当这种情况发生时,更糟糕的场景是HiTSDB无法处理时序数据的写入请求,造成后续新数据的丢失。

3.1.3 执行架构高度耦合,修改或增加功能困难

聚合引擎主要针对应用场景是性能监控,查询模式固定,所以引擎架构采用单一模式,把查询,过滤,填值/插值,和聚合运算的逻辑高度耦合在一起。这种引擎架构对于监控应用的固定查询没有太多问题,但HiTSDB目标不仅仅是监控场景下的简单查询,而是着眼于更多应用场景下的复杂查询。

我们发现采用原有引擎的架构,很难在原有基础上进行增加功能,或修改原来的实现。本质上的原因在于原有聚合引擎没有采用传统数据库所通常采用的执行架构,执行层由可定制的多个执行算子组成,查询语义可以由不同的执行算子组合而完成。这个问题在产品开发开始阶段并不感受很深,但确是严重影响HiTSDB拓宽应用场景,增加新功能的一个重要因素。

3.1.4 聚合运算效率有待提高

原有引擎在执行聚合运算的时候,也和传统数据库所通常采用的iterative执行模式一样,迭代执行聚合运算。问题在于每次iteration执行,返回的是一个时间点。Iterative 执行每次返回一条时间点,或者一条记录,常见于OLTP这样的场景,因为OLTP的查询所需要访问的记录数很小。但对HiTSDB查询有可能需要访问大量时间线数据,这样的执行方式效率上并不可取。

原因1)每次处理一个时间点,都需要一系列的函数调用,性能上有影响,2)iterative循环迭代所涉及到的函数调用,无法利用新硬件所支持的SIMD并行执行优化,也无法将函数代码通过inline等JVM常用的hotspot的优化方式。在大数据量的场景下,目前流行的通用做法是引入Vectorization processing, 也就是每次iteration返回的不再是一条记录,而是一个记录集(batch of rows),比如Google Spanner 用batch-at-a-time 代替了row-at-a-time, Spark SQL同样也在其执行层采用了Vectorization的执行模式。

3.2 流式聚合引擎设计思路

针对HiTSDB原有聚合运算引擎上的问题,为了优化HiTSDB,支持HiTSDB商业化运营,我们决定改造HiTSDB聚合运算引擎。下图给出了新聚合查询引擎的基本架构。

3.2.1 pipeline执行模式

借鉴传统数据库执行模式,引入pipeline的执行模式(aka Volcano / Iterator 执行模式)。Pipeline包含不同的执行计算算子(operator), 一个查询被物理计划生成器解析分解成一个DAG或者operator tree, 由不同的执行算子组成,DAG上的root operator负责驱动查询的执行,并将查询结果返回调用者。在执行层面,采用的是top-down需求驱动 (demand-driven)的方式,从root operator驱动下面operator的执行。这样的执行引擎架构具有优点:

  • 这种架构方式被很多数据库系统采用并证明是有效;

  • 接口定义清晰,不同的执行计算算子可以独立优化,而不影响其他算子;

  • 易于扩展:通过增加新的计算算子,很容易实现扩展功能。比如目前查询协议里只定义了tag上的查询条件。如果要支持指标值上的查询条件(cpu.usage >= 70% and cpu.usage <=90%),可以通过增加一个新的FieldFilterOp来实现。

每个operator,实现如下接口:

  • Open : 初始化并设置资源

  • Next : 调用输入operator的next()获得一个batch of time series, 处理输入,输出batch of time series

  • Close : 关闭并释放资源

我们在HiTSDB中实现了以下算子:

  • ScanOp: 用于从HBase异步读取时间线数据

  • DsAggOp: 用于进行降采样计算,并处理填值

  • AggOp:用于进行分组聚合运算,分成PipeAggOp, MTAggOp

  • RateOp: 用于计算时间线值的变化率

3.2.2 执行计算算子一个batch的时间线数据为运算单位

在计算算子之间以一个batch的时间线数据为单位,提高计算引擎的执行性能。其思想借鉴于OLAP系统所采用的Vectorization的处理模式。这样Operator在处理一个batch的多条时间线,以及每条时间线的多个时间点,能够减少函数调用的代价,提高loop的执行效率。

每个Operator以流式线的方式,从输入获得时间线batch, 经过处理再输出时间线batch, 不用存储输入的时间线batch,从而降低对内存的要求。只有当Operator的语义要求必须将输入materialize,才进行这样的操作(参见下面提到的聚合算子的不同实现)。

3.2.3. 区分不同查询场景,采用不同聚合算子分别优化

HiTSDB原来的聚合引擎采用materialization的执行模式,很重要的一个原因在于处理时序数据的插值运算,这主要是因为时序数据的一个典型特点是时间线上不对齐:不同的时间线在不同的时间戳上有数据。HiTSDB兼容OpenTSDB的协议,引入了插值(interpolation)的概念,目的在于聚合运算时通过指定的插值方式,在不对齐的时间戳上插入计算出来的值,从而将不对齐的时间线数据转换成对齐的时间线。插值是在同一个group的所有时间线之间比较,来决定在哪个时间戳上需要进行插值 (参见OpenTSDB 文档)。

为了优化聚合查询的性能,我们引入了不同的聚合运算算子。目的在于针对不同的查询的语义,进行不同的优化。有些聚合查询需要插值,而有些查询并不要求插值;即使需要插值,只需要把同一聚合组的时间线数据读入内存,就可以进行插值运算。

  • PipeAggOp: 当聚合查询满足以下条件时,

  • 1)不需要插值: 查询使用了降采样(downsample),并且降采样的填值采用了非null/NaN的策略。这样的查询,经过降采样后,时间线的数据都是对齐补齐的,也就是聚合函数所用到的插值不再需要。

    2)聚合函数可以支持渐进式迭代计算模式 (Incremental iterative aggregation), 比如sum, count ,avg, min, max, zerosum, mimmim, mimmax,我们可以采用incremental聚合的方式,而不需要把全部输入数据读入内存。这个执行算子采用了流水线的方式,每次从输入的operator获得一系列时间线,计算分组并更新聚合函数的部分值,完成后可以清理输入的时间线,其自身只用保留每个分组的聚合函数的值。

  • MTAgOp: 需要插值,并且输入算子无法帮助将时间线ID预先分组,这种方式回退到原来聚合引擎所采用的执行模式。

对于MTAggOp, 我们可以引入分组聚合的方法进行优化:

  • GroupedAggOp: 需要插值,但是输入算子能够保证已经将时间线的ID根据标识(tags)进行排序分组,这样在流水线处理中,只要materialize最多一个组的数据,这样的算子比起内存保留所有分组时间线,内存要求要低,同时支持不同组之间的并行聚合运算。

3.2.4 查询优化器和执行器

引入执行算子和pipeline执行模式后,我们可以在HiTSDB分成两大模块,查询优化器和执行器。优化器根据查询语义和执行算子的不同特点,产生不同的执行计划,优化查询处理。例如HiTSDB可以利用上面讨论的三个聚合运算算子,在不同的场景下,使用不同的执行算子,以降低查询执行时的内存开销和提高执行效率为目的。这样的处理方式相比于原来聚合引擎单一的执行模式,更加优化。

4. 数据迁移







请到「今天看啥」查看全文