SQL验证完成后,需要将SQL语法树转换成Operation的集合
1
2
3
4
5
6
|
public List<Operation> parse(String statement) {
...
return Collections.singletonList(
SqlNodeToOperationConversion.convert(planner, catalogManager, parsed.get(0))
.orElseThrow(() -> new TableException("Unsupported query: " + statement)));
}
|
在这里将已经验证好的SQL 转换成Oper的过程,就会使用到SqlNodeToOperationConversion.convert
SqlNodeToOperationConversion.convert
1
2
3
4
5
6
|
public static Optional<Operation> convert(
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {
// validate the query
final SqlNode validated = flinkPlanner.validate(sqlNode);
return convertValidatedSqlNode(flinkPlanner, catalogManager, validated);
}
|
convertValidatedSqlNode 就是转换Operation的入口,一共有三个参数
- flinkPlanner: FlinkPlannerImpl 优化器,将SQL语法树生成flink的执行计划
- catalogManager: CatalogManager 当前作业的库表等信息的管理
- validated: SqlNode 已经检验好的SQL语法树
convert 参数
FlinkPlannerImpl
在实际运行中, flinkPlanner 是通过 FlinkPlannerImpl 实例实现的,具体是在 PlannerContext进行实现的
1
2
3
4
5
6
7
8
9
10
|
package org.apache.flink.table.planner.delegation;
public class PlannerContext {
...
public FlinkPlannerImpl createFlinkPlanner() {
return new FlinkPlannerImpl(
createFrameworkConfig(), this::createCatalogReader, typeFactory, cluster);
}
...
}
|
- 一套flink 使用的schema,类型系统以及SQL优化器等组件的framework配置
- 一个读取flink的catalog的入口实例
- 生成flink类型的工厂实例
- 专门为Flink服务的 calcite 关系优化cluster
planner 的作用就是用于将 FlinkSQL 转换成Flink 可以识别的DAG作业,运行到Flink 环境中
CatalogManager
CatalogManager 隐藏的很深,在最初创建的时候他就已经生成了。如果没有特殊要求,catalog 回创建一个名为 “default_catalog” 的catalog,在此之下,还会创建一个 名为 “default_database”的数据库
通常情况下,这里的catalog是不需要我们修改的,在flink UI 界面里我们也经常可以看到,所有的表都是放在这个“default_catlog.default_database"下的
流式任务的创建的代码是在 org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl 下的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package org.apache.flink.table.api.bridge.scala.internal
...
object StreamTableEnvironmentImpl {
...
val catalogManager = CatalogManager.newBuilder
.classLoader(userClassLoader)
.config(tableConfig)
.defaultCatalog(
settings.getBuiltInCatalogName,
new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
.executionConfig(executionEnvironment.getConfig)
.catalogModificationListeners(TableFactoryUtil
.findCatalogModificationListenerList(tableConfig.getConfiguration, userClassLoader))
.catalogStoreHolder(
CatalogStoreHolder
.newBuilder()
.catalogStore(catalogStore)
.factory(catalogStoreFactory)
.config(tableConfig)
.classloader(userClassLoader)
.build())
.build
...
}
|
这里的所有参数都是默认值
SQL验证
写入的SQL语句 会先进行验证,通过Flink自己写的Planner,设置自己的规则然后借助Calicte 进行验证
具体验证过程: FlinkSQL - 验证过程
convertValidatedSqlNode
从这里开始SqlNode进入后就要转换成Operation了
1
2
3
4
5
6
7
8
9
|
package org.apache.flink.table.planner.operations;
public class SqlNodeToOperationConversion {
...
private static Optional<Operation> convertValidatedSqlNode(
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode validated) {
...
}
...
}
|
该方法接收3个参数:
- flinkPlanner: 就是从上层传进来的执行计划
- catalogManager: 用来管理数据的schema
- validated: 验证后的Sql语句
1
2
3
4
5
6
7
8
9
10
11
|
private static Optional<Operation> convertValidatedSqlNode(
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode validated) {
beforeConversion();
// delegate conversion to the registered converters first
SqlNodeConvertContext context = new SqlNodeConvertContext(flinkPlanner, catalogManager);
Optional<Operation> operation = SqlNodeConverters.convertSqlNode(validated, context);
if (operation.isPresent()) {
return operation;
}
...
}
|
首先做一些前置工作
- beforeConversion: 清除行级修改上下文,在普通的SQL作业中这一步没什么变化
- 如果SQL之前注册过,那么就返回之前的SQL,这里因为第一次启动,之前也没有任务,所以也不会直接返回
Q&A
- 为什么 parse 方法返回的是 Operation的集合呢?