FlinkSQL - 开始

参数 说明
flink 版本 1.17
java 版本 1.8

测试SQL

1
2
3
select * 
from  tableA 
where amount > 2

运行环境

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        final DataStream<Order> orderA =
                env.fromCollection(
                        Arrays.asList(
                                new Order(1L, "beer", 3),
                                new Order(1L, "diaper", 4),
                                new Order(3L, "rubber", 2)));

        final Table tableA = tableEnv.fromDataStream(orderA);

        final Table result =
                tableEnv.sqlQuery(
                    "select * from " + tableA + " where amount > 2"
                );

        tableEnv.toDataStream(result, Row.class).print();
        env.execute();

运行结果为

1
2
(true,+I[1, beer, 3, 2, pen, 3])
(true,+I[1, beer, 3, 2, rubber, 3])

SQL流程

一条SQL语句 通过Calcite 转换成 物理计划,物理计划通过代码生成计划转换成Flink Transformation 从而最终转换成 Flink 的执行图

从代码来看

tableEnv.sqlQuery()

将sql 语句转换成了 逻辑计划 -> 物理计划

env.execute()

生成StreamGraph 最终执行语句

sqlQuery

sqlQuery 会把 输入的 SQL 语句转换成Operation

Operation 就是对于表的所有操作(DML, DDL, DQL, DCL)

Covers all sort of Table operations such as queries(DQL), modifications(DML), definitions(DDL), or control actions(DCL). This is the output of Planner.getParser() and Parser.parse(String).

在这里,Operation就是一个PlannerQueryOperation,里面包含了RelNode的信息。Operation 会包装成Table 对象返回

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    public Table sqlQuery(String query) {
        /**
         * 这里会解析Sql语句转换成关系代数
         */
        List<Operation> operations = getParser().parse(query);

        if (operations.size() != 1) {
            throw new ValidationException(
                    "Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
        }

        Operation operation = operations.get(0);

        if (operation instanceof QueryOperation && !(operation instanceof ModifyOperation)) {
            /**
             * 这里将转换的Operation转换成Flink Table API可以识别的Table对象
             */
            return createTable((QueryOperation) operation);
        } else {
            throw new ValidationException(
                    "Unsupported SQL query! sqlQuery() only accepts a single SQL query of type "
                            + "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
        }
    }
0%