专栏名称: 语落心生
后端->大数据研发
目录
相关文章推荐
硬派健身  ·  趣图:我不胖,我只是…… ·  3 天前  
硬派健身  ·  人厌狗嫌的行为,却让你健康又减肥? ·  3 天前  
辉哥奇谭  ·  以运动为核心规划每天时间 ·  3 天前  
時間的玩家TimeIsArt  ·  萨古鲁关于入睡和醒来的十条建议 ·  3 天前  
51好读  ›  专栏  ›  语落心生

Flink源码阅读(五) FLink SQL之解析流程

语落心生  · 掘金  ·  · 2021-03-06 11:19

正文

阅读 11

Flink源码阅读(五) FLink SQL之解析流程

Table/SQL模块是flink 为支持数仓开发人员,数据分析的人员的需求,基于calcite定义的一套语义规范。

本文以51信用卡开源的自定义SQL引擎为叙述主线,从旁解释相关源码

这一部分大致分为两小节描述

  • Flink SQL解析流程 (本篇)
  • 自研SQL引擎的实现方式 (Flink源码阅读(六) 自研SQL引擎的实现方法 )

对于一次Sql查询,一般经过以下流程:

  • 先由Parser解析生成SqlNode节点树;
  • Validator完成节点类型推导以及必要的表达式验证优化;
  • SqlToRelConverter将SqlNode节点树转化为代表逻辑计划的RelNode
  • 在查询优化器(QueryOptimizer)里内置了100+的转换规则,分别用于逻辑计划优化以及物理计划转换;
  • 生成的物理计划RelNode被代码实现器遍历处理,各子节点由对应的实现器生成执行代码,最后组装成一个执行Class;
  • Calcite会为每个SQL动态生成一个实现了Bindable接口的Class,然后动态编译创建实例后传入数据集执行计算。这里的数据集可以通过Schema进行灵活定义,在业务上针对每一次业务请求会创建一份Schema,存储当前请求处理过程的所有表数据,处理完成后堆内存后面会被回收。

Flink SQL解析流程

平常我们一般会配置TableEnv来执行SQL相关逻辑

TableEnvironment tableEnv = TableEnvironment.create(settings);`
复制代码

对TableEnvironment的实现类进行展开 对于Flink基于calcite 重写SQL的逻辑,需要重点关注的是以下四个对象

  • TableConfig : 当前{@link TableEnvironment}会话的配置,以调整Table&SQL API程序。
  • executor 它可以执行由{@link Planner}生成的{@link Transformation}图。也就是通过执行计划翻译成算子链图
  • parser: sql的语法解析器
  • planner: sql的执行计划

TableEnvironmentImpl.java

/**
	 * 调用TableEnv的底层实现类
	 * @param catalogManager 用于处理目录对象(例如表,视图,函数和类型)的管理器。它封装
	 *  *所有可用的目录并存储临时对象。
	 * @param moduleManager 负责加载/卸载模块,管理模块的生命周期以及解决模块对象。
	 * @param tableConfig 当前{@link TableEnvironment}会话的配置,以调整Table&SQL API程序。
	 * @param executor 它可以执行由{@link Planner}生成的{@link Transformation}图。
	 * @param functionCatalog 简单的函数目录,用于在目录中存储{@link FunctionDefinition}。
	 * @param planner 该接口有两个作用:
	 *  * <ul>
	 *  *通过{@link #getParser()}的<li> SQL解析器-将SQL字符串转换为Table API特定的对象
	 *  *例如{@link Operation} s的树</ li>
	 *  * <li>关系计划器-提供一种计划,优化和转换以下内容的树的方法
	 *  * {@link ModifyOperation}变成可运行的形式({@link Transformation})</ li>
	 * @param isStreamingMode 是否是流SQL
	 ***/**
protected TableEnvironmentImpl(
			CatalogManager catalogManager,
			ModuleManager moduleManager,
			TableConfig tableConfig,
			Executor executor,
			FunctionCatalog functionCatalog,
			Planner planner,
			boolean isStreamingMode) {
		this.catalogManager = catalogManager;
		this.moduleManager = moduleManager;
		this.execEnv = executor;

		this.tableConfig = tableConfig;

		this.functionCatalog = functionCatalog;
		this.planner = planner;
		this.parser = planner.getParser();
		this.operationTreeBuilder = OperationTreeBuilder.create(
			tableConfig,
			functionCatalog.asLookup(parser::parseIdentifier),
			catalogManager.getDataTypeFactory(),
			path -> {
				try {
					UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
					Optional<CatalogQueryOperation> catalogQueryOperation = scanInternal(unresolvedIdentifier);
					return catalogQueryOperation.map(t -> ApiExpressionUtils.tableRef(path, t));
				} catch (SqlParserException ex) {
					// The TableLookup is used during resolution of expressions and it actually might not be an
					// identifier of a table. It might be a reference to some other object such as column, local
					// reference etc. This method should return empty optional in such cases to fallback for other
					// identifiers resolution.
					return Optional.empty();
				}
			},
			isStreamingMode
		);
	}
复制代码
Parser的创建

首先我们需要了解calcite相关概念

freemarker&javacc

  • freemarker通常用作前端的模板引擎,简单的说,就是通过#{}和一些列语法添数
  • javacc主要用于代码的生成,通过编译 后缀名 .jj的文件,生成我们想要的文件

Flink源码中解析器的模块如下所示

右侧的代码段为ParserImpl#parse 方法,也是flink 一条sql开始解析的入口 每次调用parse时重新创建parser和planner。这么设计的目的是catalog在每次调用时会变化,所以才动态创建对象。

@Override
public List<Operation> parse(String statement) {
    //基于calcite创建语法解析器
    CalciteParser parser = calciteParserSupplier.get();
    //实现执行计划类,即验证阶段
    FlinkPlannerImpl planner = validatorSupplier.get();
    // 将sql装换为SQLNode,例如:\
    //select id, cast(score as int), 'hello' from T where id < ?
	// 以上SQL中, 1. id, score, T 等为SqlIdentifier 
    // 2. cast()为SqlCall 
    // 3. int 为SqlDataTypeSpec 
    // 4. 'hello' 为SqlLiteral 
    // 5. '?' 为SqlDynamicParam
    // 这些都可以看作为一个SqlNode
    SqlNode parsed = parser.parse(statement);
    //根据SqlNode的不同类型 转换成不同的SqlNode,比如 cast()从SqlNode这个父级抽象转换为具体的类型sqlCall
    Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
        .orElseThrow(() -> new TableException("Unsupported query: " + statement));
    return Collections.singletonList(operation);
}
复制代码

Continue..QwQ