Table/SQL模块是flink 为支持数仓开发人员,数据分析的人员的需求,基于calcite定义的一套语义规范。
本文以51信用卡
开源的自定义SQL引擎为叙述主线,从旁解释相关源码
这一部分大致分为两小节描述
- Flink SQL解析流程 (本篇)
- 自研SQL引擎的实现方式 (Flink源码阅读(六) 自研SQL引擎的实现方法 )
对于一次Sql查询,一般经过以下流程:
- 先由
Parser
解析生成SqlNode节点树; - 由
Validator
完成节点类型推导以及必要的表达式验证优化; - 由
SqlToRelConverter
将SqlNode节点树转化为代表逻辑计划的RelNode
; - 在查询优化器
(QueryOptimizer)
里内置了100+的转换规则,分别用于逻辑计划优化以及物理计划转换; - 生成的物理计划
RelNode
被代码实现器遍历处理,各子节点由对应的实现器生成执行代码,最后组装成一个执行Class; Calcite
会为每个SQL
动态生成一个实现了Bindable
接口的Class
,然后动态编译创建实例后传入数据集执行计算。这里的数据集可以通过Schema
进行灵活定义,在业务上针对每一次业务请求会创建一份Schema
,存储当前请求处理过程的所有表数据,处理完成后堆内存后面会被回收。
Flink SQL解析流程
平常我们一般会配置TableEnv来执行SQL相关逻辑
TableEnvironment tableEnv = TableEnvironment.create(settings);`
复制代码
对TableEnvironment的实现类进行展开 对于Flink基于calcite 重写SQL的逻辑,需要重点关注的是以下四个对象
- TableConfig : 当前{@link TableEnvironment}会话的配置,以调整Table&SQL API程序。
- executor 它可以执行由{@link Planner}生成的{@link Transformation}图。也就是通过执行计划翻译成算子链图
- parser: sql的语法解析器
- planner: sql的执行计划
TableEnvironmentImpl.java
/**
* 调用TableEnv的底层实现类
* @param catalogManager 用于处理目录对象(例如表,视图,函数和类型)的管理器。它封装
* *所有可用的目录并存储临时对象。
* @param moduleManager 负责加载/卸载模块,管理模块的生命周期以及解决模块对象。
* @param tableConfig 当前{@link TableEnvironment}会话的配置,以调整Table&SQL API程序。
* @param executor 它可以执行由{@link Planner}生成的{@link Transformation}图。
* @param functionCatalog 简单的函数目录,用于在目录中存储{@link FunctionDefinition}。
* @param planner 该接口有两个作用:
* * <ul>
* *通过{@link #getParser()}的<li> SQL解析器-将SQL字符串转换为Table API特定的对象
* *例如{@link Operation} s的树</ li>
* * <li>关系计划器-提供一种计划,优化和转换以下内容的树的方法
* * {@link ModifyOperation}变成可运行的形式({@link Transformation})</ li>
* @param isStreamingMode 是否是流SQL
***/**
protected TableEnvironmentImpl(
CatalogManager catalogManager,
ModuleManager moduleManager,
TableConfig tableConfig,
Executor executor,
FunctionCatalog functionCatalog,
Planner planner,
boolean isStreamingMode) {
this.catalogManager = catalogManager;
this.moduleManager = moduleManager;
this.execEnv = executor;
this.tableConfig = tableConfig;
this.functionCatalog = functionCatalog;
this.planner = planner;
this.parser = planner.getParser();
this.operationTreeBuilder = OperationTreeBuilder.create(
tableConfig,
functionCatalog.asLookup(parser::parseIdentifier),
catalogManager.getDataTypeFactory(),
path -> {
try {
UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
Optional<CatalogQueryOperation> catalogQueryOperation = scanInternal(unresolvedIdentifier);
return catalogQueryOperation.map(t -> ApiExpressionUtils.tableRef(path, t));
} catch (SqlParserException ex) {
// The TableLookup is used during resolution of expressions and it actually might not be an
// identifier of a table. It might be a reference to some other object such as column, local
// reference etc. This method should return empty optional in such cases to fallback for other
// identifiers resolution.
return Optional.empty();
}
},
isStreamingMode
);
}
复制代码
Parser的创建
首先我们需要了解calcite相关概念
freemarker&javacc
- freemarker通常用作前端的模板引擎,简单的说,就是通过#{}和一些列语法添数
- javacc主要用于代码的生成,通过编译 后缀名 .jj的文件,生成我们想要的文件
Flink源码中解析器的模块如下所示
右侧的代码段为ParserImpl#parse
方法,也是flink 一条sql开始解析的入口
每次调用parse时重新创建parser和planner。这么设计的目的是catalog在每次调用时会变化,所以才动态创建对象。
@Override
public List<Operation> parse(String statement) {
//基于calcite创建语法解析器
CalciteParser parser = calciteParserSupplier.get();
//实现执行计划类,即验证阶段
FlinkPlannerImpl planner = validatorSupplier.get();
// 将sql装换为SQLNode,例如:\
//select id, cast(score as int), 'hello' from T where id < ?
// 以上SQL中, 1. id, score, T 等为SqlIdentifier
// 2. cast()为SqlCall
// 3. int 为SqlDataTypeSpec
// 4. 'hello' 为SqlLiteral
// 5. '?' 为SqlDynamicParam
// 这些都可以看作为一个SqlNode
SqlNode parsed = parser.parse(statement);
//根据SqlNode的不同类型 转换成不同的SqlNode,比如 cast()从SqlNode这个父级抽象转换为具体的类型sqlCall
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
复制代码
Continue..QwQ