当Sql语句转换为SqlNode 之后 就会进入下一个阶段:将SqlNode 转换成 Operations
@Override
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
Optional<Operation> command = EXTENDED_PARSER.parse(statement);
if (command.isPresent()) {
return Collections.singletonList(command.get());
}
// parse the sql query
// use parseSqlList here because we need to support statement end with ';' in sql client.
SqlNodeList sqlNodeList = parser.parseSqlList(statement);
List<SqlNode> parsed = sqlNodeList.getList();
Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");
return Collections.singletonList(
SqlNodeToOperationConversion.convert(planner, catalogManager, parsed.get(0))
.orElseThrow(() -> new TableException("Unsupported query: " + statement)));
}
其中 SqlNodeToOperationConversion.convert方法,就算是将 SqlNode转换成 Operation
SqlNod会先进行验证工作,然后再进行转换
public static Optional<Operation> convert(
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {
// validate the query
final SqlNode validated = flinkPlanner.validate(sqlNode);
return convertValidatedSqlNode(flinkPlanner, catalogManager, validated);
}
为什么 SqlNode 需要验证#
目的当然是为了确保SQL语句的正确性和合法性,避免在后续的执行过程中出现错误 举例来说,假设有以下的SQL语句:
SELECT name, age FROM student WHERE age > 18 AND gender = 'male'
在进行转换成Operation的过程中,需要先进行validate,检查SQL语句是否符合语法规则和语义规则。如果SQL语句中存在语法错误或者语义错误,那么validate过程会抛出异常,提示用户需要修改SQL语句 例如,如果SQL语句中存在以下错误:
SELECT name, age FROM student WHERE age > 'two' AND gender = 'male'
其中,age的类型是整数,但是在SQL语句中使用了字符串类型的’two’进行比较,这是一个语义错误。在进行validate的过程中,会检测到这个错误并抛出异常,提示用户需要修改SQL语句中的比较条件 因此,先进行validate的过程可以帮助用户在SQL语句执行之前就发现错误,避免在后续的执行过程中出现问题,提高SQL语句的执行效率和准确性
validate#
validate的过程的输入和输出都是SqlNode,除了检查类型之外还会更改SqlNode的节点信息
通过FlinkPlanner 对SqlNode进行检查
def validate(sqlNode: SqlNode): SqlNode = {
val validator = getOrCreateSqlValidator()
validate(sqlNode, validator)
}
FlinkCalciteSqlValidator 实现 了Calicte 的SqlValidatorImpl 作为validator,就是用来检查FlinkSql的Sql语法的
/** Extends Calcite's {@link SqlValidator} by Flink-specific behavior. */
@Internal
public final class FlinkCalciteSqlValidator extends SqlValidatorImpl {
// Enables CallContext#getOutputDataType() when validating SQL expressions.
private SqlNode sqlNodeForExpectedOutputType;
private RelDataType expectedOutputType;
public FlinkCalciteSqlValidator(
SqlOperatorTable opTab,
SqlValidatorCatalogReader catalogReader,
RelDataTypeFactory typeFactory,
SqlValidator.Config config) {
super(opTab, catalogReader, typeFactory, config);
}
创建 validator#
第一次写入Sql时,会新建一个validator,后面再创建就会使用一个单例来处理
def getOrCreateSqlValidator(): FlinkCalciteSqlValidator = {
if (validator == null) {
val catalogReader = catalogReaderSupplier.apply(false)
validator = createSqlValidator(catalogReader)
}
validator
}
其中,catalogReaderSupplier是一个函数,目的是读取和解析数据库中的元信息,比如数据库,表,列等信息
函数的输入是一个bool值,用来确定大小写是否敏感;输出是FlinkCalciteCatalogReader
FlinkCalciteCatalogReader相比于CalciteCatalogReader,包含了Flink特有的信息,例如:Flink的UDF函数信息和Flink的Table信息
FlinkCalciteSqlValidator#
通过数据元信息,就可以创建 SqlValidator
private def createSqlValidator(catalogReader: CalciteCatalogReader) = {
val validator = new FlinkCalciteSqlValidator(
operatorTable,
catalogReader,
typeFactory,
SqlValidator.Config.DEFAULT
.withIdentifierExpansion(true)
.withDefaultNullCollation(FlinkPlannerImpl.defaultNullCollation)
.withTypeCoercionEnabled(false)
) // Disable implicit type coercion for now.
validator
}
创建SqlValidator 需要以下信息
- SqlOperatorTable, 定义了Sql操作符和方法,也可以查找这些操作符和方法
这个SqlOperatorTable是在创建 StreamTableEnvironment的时候就创建了,具体是在
// org.apache.flink.table.planner.delegation.PlannerContext
public FrameworkConfig createFrameworkConfig() {
return Frameworks.newConfigBuilder()
.defaultSchema(rootSchema.plus())
.parserConfig(getSqlParserConfig())
.costFactory(new FlinkCostFactory())
.typeSystem(typeSystem)
.convertletTable(FlinkConvertletTable.INSTANCE)
.sqlToRelConverterConfig(getSqlToRelConverterConfig())
.operatorTable(getSqlOperatorTable(getCalciteConfig())) // 这里创建
// set the executor to evaluate constant expressions
.executor(
new ExpressionReducer(
context.getTableConfig(), context.getClassLoader(), false))
.context(context)
.traitDefs(traitDefs)
.build();
}
一般来说 opTab 里包含的都是 Calcite自带的操作符 比如 not like, is not null 这类
- SqlValidatorCatalogReader, 就是上面所说的FlinkCalciteCatalogReader,保存的SQL的元数据
- RelDataTypeFactory, FlinkSQL的类型,连接 Flink 的 LogicalType 和 Calcite 的 RelDataType。 RelDataType 是 RelNode关系代数中的数据类型,使用Calcite解析SQL的时候内部会转换成这种类型
/**
* Flink specific type factory that represents the interface between Flink's [[LogicalType]] and
* Calcite's [[RelDataType]].
*/
class FlinkTypeFactory(
classLoader: ClassLoader,
typeSystem: RelDataTypeSystem = FlinkTypeSystem.INSTANCE)
extends JavaTypeFactoryImpl(typeSystem)
with ExtendedRelTypeFactory {
private val seenTypes = mutable.HashMap[LogicalType, RelDataType]()
/**
* Create a calcite field type in table schema from [[LogicalType]]. It use PEEK_FIELDS_NO_EXPAND
* when type is a nested struct type (Flink [[RowType]]).
*
* @param t
* flink logical type.
* @return
* calcite [[RelDataType]].
*/
def createFieldTypeFromLogicalType(t: LogicalType): RelDataType = {
def newRelDataType(): RelDataType = t.getTypeRoot match {
case LogicalTypeRoot.NULL => createSqlType(NULL)
case LogicalTypeRoot.BOOLEAN => createSqlType(BOOLEAN)
case LogicalTypeRoot.TINYINT => createSqlType(TINYINT)
case LogicalTypeRoot.SMALLINT => createSqlType(SMALLINT)
case LogicalTypeRoot.INTEGER => createSqlType(INTEGER)
case LogicalTypeRoot.BIGINT => createSqlType(BIGINT)
case LogicalTypeRoot.FLOAT => createSqlType(FLOAT)
case LogicalTypeRoot.DOUBLE => createSqlType(DOUBLE)
case LogicalTypeRoot.VARCHAR => createSqlType(VARCHAR, t.asInstanceOf[VarCharType].getLength)
case LogicalTypeRoot.CHAR => createSqlType(CHAR, t.asInstanceOf[CharType].getLength)
// temporal types
case LogicalTypeRoot.DATE => createSqlType(DATE)
case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE => createSqlType(TIME)
// interval types
case LogicalTypeRoot.INTERVAL_YEAR_MONTH =>
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
case LogicalTypeRoot.INTERVAL_DAY_TIME =>
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
case LogicalTypeRoot.BINARY => createSqlType(BINARY, t.asInstanceOf[BinaryType].getLength)
case LogicalTypeRoot.VARBINARY =>
createSqlType(VARBINARY, t.asInstanceOf[VarBinaryType].getLength)
case LogicalTypeRoot.DECIMAL =>
t match {
case decimalType: DecimalType =>
createSqlType(DECIMAL, decimalType.getPrecision, decimalType.getScale)
case legacyType: LegacyTypeInformationType[_]
if legacyType.getTypeInformation == BasicTypeInfo.BIG_DEC_TYPE_INFO =>
createSqlType(DECIMAL, 38, 18)
}
case LogicalTypeRoot.ROW =>
val rowType = t.asInstanceOf[RowType]
buildStructType(
rowType.getFieldNames,
rowType.getChildren,
// fields are not expanded in "SELECT *"
StructKind.PEEK_FIELDS_NO_EXPAND)
case LogicalTypeRoot.STRUCTURED_TYPE =>
t match {
case structuredType: StructuredType => StructuredRelDataType.create(this, structuredType)
case legacyTypeInformationType: LegacyTypeInformationType[_] =>
createFieldTypeFromLogicalType(
PlannerTypeUtils.removeLegacyTypes(legacyTypeInformationType))
}
case LogicalTypeRoot.ARRAY =>
val arrayType = t.asInstanceOf[ArrayType]
createArrayType(createFieldTypeFromLogicalType(arrayType.getElementType), -1)
case LogicalTypeRoot.MAP =>
val mapType = t.asInstanceOf[MapType]
createMapType(
createFieldTypeFromLogicalType(mapType.getKeyType),
createFieldTypeFromLogicalType(mapType.getValueType))
case LogicalTypeRoot.MULTISET =>
val multisetType = t.asInstanceOf[MultisetType]
createMultisetType(createFieldTypeFromLogicalType(multisetType.getElementType), -1)
case LogicalTypeRoot.RAW =>
t match {
case rawType: RawType[_] =>
new RawRelDataType(rawType)
case genericType: TypeInformationRawType[_] =>
new GenericRelDataType(genericType, true, getTypeSystem)
case legacyType: LegacyTypeInformationType[_] =>
createFieldTypeFromLogicalType(PlannerTypeUtils.removeLegacyTypes(legacyType))
}
case LogicalTypeRoot.SYMBOL =>
createSqlType(SqlTypeName.SYMBOL)
case _ @t =>
throw new TableException(s"Type is not supported: $t")
}
// Kind in TimestampType do not affect the hashcode and equals, So we can't put it to seenTypes
val relType = t.getTypeRoot match {
case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
val timestampType = t.asInstanceOf[TimestampType]
timestampType.getKind match {
case TimestampKind.ROWTIME => createRowtimeIndicatorType(t.isNullable, false)
case TimestampKind.REGULAR => createSqlType(TIMESTAMP, timestampType.getPrecision)
case TimestampKind.PROCTIME =>
throw new TableException(
s"Processing time indicator only supports" +
s" LocalZonedTimestampType, but actual is TimestampType." +
s" This is a bug in planner, please file an issue.")
}
case LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
val lzTs = t.asInstanceOf[LocalZonedTimestampType]
lzTs.getKind match {
case TimestampKind.PROCTIME => createProctimeIndicatorType(t.isNullable)
case TimestampKind.ROWTIME => createRowtimeIndicatorType(t.isNullable, true)
case TimestampKind.REGULAR =>
createSqlType(TIMESTAMP_WITH_LOCAL_TIME_ZONE, lzTs.getPrecision)
}
case _ =>
seenTypes.get(t) match {
case Some(retType: RelDataType) => retType
case None =>
val refType = newRelDataType()
seenTypes.put(t, refType)
refType
}
}
createTypeWithNullability(relType, t.isNullable)
}
- SqlValidator.Config, SqlValidator的配置信息,这里默认使用的是 ImmutableSqlValidator
SqlValidator.Config DEFAULT = ImmutableSqlValidator.Config.builder()
.withTypeCoercionFactory(TypeCoercions::createTypeCoercion)
.build();
开始验证#
创建validator 后 根据validate 来验证 sqlNode
graph TB
start(("开始")) --> A("预处理重写SqlNode") --> B("如果有扩展节点,先处理扩展验证")
B --> C{"是否是DDL等"}
C --Y--> ends("结束")
C --N-->D("根据不同的Sql语句进行验证") -->E("返回SqlNode")-->ends
重写#
为什么要重写?
重写相当于是在SqlNode进行一些预处理主要作用是检查查询语句是否符合Flink Table的语法规范,并对查询语句进行一些必要的转换和优化,以便更好地支持Flink的执行引擎
Flink中重写是依靠PreValidateReWriter来实现的
sqlNode.accept(new PreValidateReWriter(validator, typeFactory))
如果是简单的SQL其实不需要重写
override def visit(call: SqlCall): Unit = {
call match {
case e: SqlRichExplain =>
e.getStatement match {
case r: RichSqlInsert => rewriteInsert(r)
case _ => // do nothing
}
case r: RichSqlInsert => rewriteInsert(r)
case _ => // do nothing
}
}
validator验证SqlNode#
如果不是特殊的SQL,应该都会使用 validator来验证SqlNode
case _ =>
validator.validate(sqlNode)
// org.apache.calcite.sql.validate.SqlValidatorImpl;
@Override
public SqlNode validate(SqlNode topNode) {
SqlValidatorScope scope = new EmptyScope(this);
scope = new CatalogScope(scope, ImmutableList.of("CATALOG"));
final SqlNode topNode2 = validateScopedExpression(topNode, scope);
final RelDataType type = getValidatedNodeType(topNode2);
Util.discard(type);
return topNode2;
}
SqlValidatorScope#
SqlValidatorScope是Calcite中的一个接口,用于表示SQL语句中的作用域。它包含了当前作用域中可见的所有表、列、函数等信息。 具体来说,SqlValidatorScope中包含了以下信息:
- 当前作用域中可见的所有表,包括别名和表的元数据信息。
- 当前作用域中可见的所有列,包括别名和列的元数据信息。
- 当前作用域中可见的所有函数,包括函数的元数据信息和参数信息。
- 当前作用域中可见的所有变量,包括变量的类型和值。
举例说明,假设有以下SQL语句:
SELECT a.name, b.salary FROM employee a JOIN salary b ON a.id = b.id WHERE b.salary > 5000;
在这个SQL语句中,作用域可以分为以下几个部分:
- SELECT子句中的作用域,包括a.name和b.salary两个列。
- FROM子句中的作用域,包括employee和salary两个表。
- JOIN子句中的作用域,包括a和b两个表的别名。
- WHERE子句中的作用域,包括b.salary列和5000常量。
在每个作用域中,SqlValidatorScope都会包含相应的表、列、函数等信息,以便进行语法和语义的验证。
SqlValidatorScope 本身也是一个树形结构,根节点是一个空scope
validateScopedExpression#
验证过程的第一步就是重写SqlNode,将SqlNode中的不确定的地方都进行更新
然后如果是Select这种简单的SQL,会将查询进行注册
graph TB
A(("开始")) --> B("SqlNode重写") -->C("注册查询")
X --> Z(("结束"))
private SqlNode validateScopedExpression(SqlNode topNode, SqlValidatorScope scope) {
SqlNode outermostNode = performUnconditionalRewrites(topNode, false);
cursorSet.add(outermostNode);
top = outermostNode;
TRACER.trace("After unconditional rewrite: {}", outermostNode);
if (outermostNode.isA(SqlKind.TOP_LEVEL)) {
registerQuery(scope, null, outermostNode, outermostNode, null, false);
}
outermostNode.validate(this, scope);
if (!outermostNode.isA(SqlKind.TOP_LEVEL)) {
// force type derivation so that we can provide it to the
// caller later without needing the scope
deriveType(scope, outermostNode);
}
TRACER.trace("After validation: {}", outermostNode);
return outermostNode;
}
performUnconditionalRewrites
graph TB
A(("开始")) --> B("如果SqlNode是SqlCall") -->|是|C("获取SqlNode所有子SqlNode(算子)") --> C1("遍历算子,递归调用得到重写的算子,并更新") -->|递归| B
C1 --> C2("如果SqlNode的操作符是函数") -->|是|C3("查找内部函数并替换") --> C4("如果配置重写,则操作符更新")
B -->|否|D{"是否是SqlNodeList"} -->|是|E("遍历List,递归调用重写,并更新")-->|递归| B
C4 --> D
E -->F{"判断SqlNode的Kind类型"}
F -->|"VALUES,ORDER_BY等其他关键字"|G("根据不同的关键字规则重写") -->H("返回SqlNode") --> Z(("结束"))
F -->|"其他(如Select)"|H
这里大部分都是递归调用,能够实际重写的大部分都是函数调用
if (call.getOperator() instanceof SqlUnresolvedFunction) {
assert call instanceof SqlBasicCall;
final SqlUnresolvedFunction function = (SqlUnresolvedFunction) call.getOperator();
// This function hasn't been resolved yet. Perform
// a half-hearted resolution now in case it's a
// builtin function requiring special casing. If it's
// not, we'll handle it later during overload resolution.
final List<SqlOperator> overloads = new ArrayList<>();
opTab.lookupOperatorOverloads(
function.getNameAsId(),
function.getFunctionType(),
SqlSyntax.FUNCTION,
overloads,
catalogReader.nameMatcher());
if (overloads.size() == 1) {
((SqlBasicCall) call).setOperator(overloads.get(0));
}
}
SqlNode的lookupOperatorOverloads方法主要是用于查找可用的操作符重载。具体来说,它会根据传入的操作符名称和参数类型,查找符合条件的操作符重载方法。
举个例子,假设有以下的SqlNode:
SELECT * FROM table WHERE column1 + column2 = 10
在这个SqlNode中,有一个加法操作符“+”,它的左右两边分别是column1和column2这两个列。因此,lookupOperatorOverloads方法会首先根据“+”操作符名称查找可用的操作符重载方法。如果找到了多个重载方法,它会根据参数类型进一步筛选出符合条件的重载方法。
假设我们定义了以下的操作符重载方法:
public static int operator +(int a, int b) {...}
public static double operator +(double a, double b) {...}
public static string operator +(string a, string b) {...}
在这种情况下,lookupOperatorOverloads方法会根据column1和column2的数据类型来选择合适的重载方法。
- 如果column1和column2都是int类型,那么会选择第一个重载方法;
- 如果column1和column2都是double类型,那么会选择第二个重载方法;
- 如果column1和column2都是string类型,那么会选择第三个重载方法。
如果找不到符合条件的操作符重载方法,lookupOperatorOverloads方法会抛出异常。
registerQuery
graph TB
A(("开始")) --> B("区分不同的nodeKind") -->|SELECT|C("创建并注册Select命名空间") -->D("创建Select作用域")
D --> E("注册where子句") --> F("注册from子句") --> G("创建并注册From Identifier命名空间") --> H("设置TableScope") --> I("Select的SqlNode设置新的From的Node")
I --> J("如果有聚合方法,需要分别注册groupByScope, HavingScope") --> K("如果有orderby还需要注册OrderByScope")
K --> Z(("结束"))
private void registerQuery(
SqlValidatorScope parentScope,
@Nullable SqlValidatorScope usingScope,
SqlNode node,
SqlNode enclosingNode,
@Nullable String alias,
boolean forceNullable,
boolean checkUpdate) {
...
}
enclosingNode和Node都是AST节点,但是它们的含义和使用方式有所不同。
enclosingNode表示当前节点所在的最近的语法结构节点,例如SELECT语句中的FROM子句,WHERE子句等。enclosingNode可以通过调用getEnclosingNode方法获取。
Node表示当前节点本身,例如SELECT语句中的SELECT子句,FROM子句等。Node可以通过调用getNode方法获取。
举个例子,假设有以下SQL语句:
SELECT a, b FROM table1 WHERE a > 10
在validate过程中,对于SELECT子句中的a和b,它们的enclosingNode都是SELECT语句本身,而它们的Node分别是ColumnRef节点,表示一个列引用。
对于WHERE子句中的a > 10,它的enclosingNode是SELECT语句的WHERE子句,而它的Node是一个比较操作符节点,表示一个大于号操作。
在validate过程中,Calcite会对每个节点进行类型检查、语法检查等操作,以确保SQL语句的正确性。在这个过程中,enclosingNode和Node都会被用到,以确定当前节点的上下文和语义。
validate
- 检查验证状态,如果没有验证,则开始验证,然后验证类型,Select子句会调用SelectNamespace的验证,最后根据不同的子句分别验证
- 验证对表的访问(只有from子句才会真正被验证)
- 验证快照到表
- 验证Select和其子句的形态 Relation 还是 Stream
Modality指的是数据流的模式,即数据流的类型和结构。在Relational模式下,数据流是以表格的形式呈现,每个数据流都有一个固定的列集合和数据类型。而在Streaming模式下,数据流是以流的形式呈现,数据是按照时间顺序到达的,每个数据流都是一个无限的数据流。
举例来说,如果我们有一个查询语句:
SELECT name, age FROM employees WHERE age > 30
- 在Relational模式下,我们需要知道employees表格的列集合和数据类型,以及age列的数据类型是什么,才能验证这个查询语句是否合法。
- 而在Streaming模式下,我们需要知道employees表格的列集合和数据类型,以及数据流是按照时间顺序到达的,才能验证这个查询语句是否合法。
验证过程中我们需要根据数据流的模式来验证查询语句是否合法。如果数据流的模式与查询语句不匹配,就会抛出异常。
例如,在Relational模式下,如果查询语句中引用了不存在的列,就会抛出异常;在Streaming模式下,如果查询语句中使用了聚合函数,就会抛出异常,因为数据流是无限的,无法计算聚合函数的结果。
举例#
以
select * from tableA where amount > 2
为例,
graph BT
C("SelectScope") --> B("CatalogScope") --> A("EmptyScope")
E("tableA 命名空间") --> D("TableScope") --> B