代码-第6章 Table API&SQL-林子雨编著《Flink编程基础(Java版)》

大数据学习路线图

厦门大学林子雨编著《Flink编程基础(Java版)》教材中的命令行和代码(教材官网
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Flink编程基础(Java版)》教材中的所有命令行和代码

需要注意的是,如果需要在代码中使用Table API,必须在pom.xml文件中加入相应的flink-table-api-java-bridge依赖库,具体如下:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge</artifactId>
      <version>1.17.0</version>
</dependency>

此外,如果要在本地用IDE(比如IntelliJ IDEA或Eclipse)调试Table API&SQL程序,则还需要加入如下依赖:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-loader</artifactId>
      <version>1.17.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>1.17.0</version>
</dependency>
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>1.17.0</version>
</dependency>

如果要实现用户自定义函数或者要与Kafka交互,则还需要加入以下依赖库:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.17.0</version> 
</dependency>

代码实例SQLDemo.java

package cn.edu.xmu;

import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SQLDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
                .schema(Schema.newBuilder()
                        .column("f0", DataTypes.STRING())
                        .build())
                .option(DataGenConnectorOptions.ROWS_PER_SECOND, 1L)
                .build());
        // 使用SQL语句创建一个输出表SinkTable
        tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable(str STRING) WITH ('connector' = 'print')");
        // 使用Table API创建一个Table对象table1
        Table table1 = tableEnv.from("SourceTable");
        // 使用SQL语句创建一个Table对象able2
        Table table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable");
        // 把Table对象table1写入到输出表SinkTable中
        TableResult tableResult = table1.executeInsert("SinkTable");
    }
}

代码文件InputFromFileDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class InputFromFileDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table stockPriceTable (" +
                        "stockId STRING," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        // 使用SQL语句创建一个输出表SinkTable
        tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable(stockId STRING,price DOUBLE) WITH ('connector' = 'print')");
        // 使用Table API创建一个Table对象table1
        Table table1 = tableEnv.from("stockPriceTable");
        // 把Table对象table1写入到输出表SinkTable中
        TableResult tableResult = table1.executeInsert("SinkTable");
    }
}

要想在IDEA中运行该程序,还需要在pom.xml中添加如下依赖:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>1.17.0</version>
</dependency>

在Linux中新建一个文件“file:///home/hadoop/stockprice.csv”,在文件中输入如下两行内容:

stock1,123
stock2,456

如果要从HDFS中读取文件,可以把代码中的文件路径替换成HDFS文件路径,例如:

'path' = 'hdfs://hadoop01:9000/stockprice.csv'

在Linux中启动MySQL,进入MySQL Shell交互式执行环境,执行如下命令创建数据库:

mysql> create database stockdb;
mysql> use stockdb;
mysql> create table stockprice (stockId varchar(20),price double);
mysql> insert into stockprice values("stock1",123);
mysql> insert into stockprice values("stock2",456);
mysql> select * from stockprice;

代码文件SQLDemo2.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class SQLDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table stockPriceTable (" +
                        " stockId STRING, " +
                        " price DOUBLE " +
                        ") with (" +
                        " 'connector' = 'jdbc', " +
                        " 'url' = 'jdbc:mysql://localhost:3306/stockdb', " +
                        " 'table-name' = 'stockprice', " +
                        " 'driver' = 'com.mysql.jdbc.Driver', " +
                        " 'username' = 'root', " +
                        " 'password' = '123456' )";
        tableEnv.executeSql(sourceDDL);
        // 使用SQL语句创建一个输出表SinkTable
        tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable(stockId STRING,price DOUBLE) WITH ('connector' = 'print')");
        // 使用Table API创建一个Table对象table1
        Table table1 = tableEnv.from("stockPriceTable");
        // 把Table对象table1写入到输出表SinkTable中
        TableResult tableResult = table1.executeInsert("SinkTable");
    }
}

要想在IDEA中运行该程序,需要在pom.xml中添加如下依赖:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc</artifactId>
      <version>3.1.1-1.17</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.33</version>
</dependency>

这里给出一个实例,要求使用Flink Table API从Kafka消费点击日志(JSON),转化为CSV格式之后输出到Kafka。
需要在pom.xml中添加如下依赖:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>1.17.0</version>
</dependency>
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>1.17.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>1.17.0</version>
</dependency>

代码文件FlinkTableAPIKafka2Kafka.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;

public class FlinkTableAPIKafka2Kafka {
    public static final String input_topic = "clicklog_input";
    public static final String out_topic = "clicklog_output";

    public static void main(String[] args) {
        //1、创建TableEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);

        //2、创建Kafka source table
        final Schema schema = Schema.newBuilder()
                .column("user", DataTypes.STRING())
                .column("url", DataTypes.STRING())
                .column("cTime", DataTypes.STRING())
                .build();

        tableEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")
                .schema(schema)
                .format("json")
                .option("topic",input_topic)
                .option("properties.bootstrap.servers","hadoop01:9092")
                .option("properties.group.id","clicklog")
                .option("scan.startup.mode","latest-offset") //每次都从最早的offsets开始
                .build());

        //3、创建 Kafka sink table
        tableEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("kafka")
                .schema(schema)
                .format("csv")
                .option("topic",out_topic)
                .option("properties.bootstrap.servers","hadoop01:9092")
                .build());

        //4、输出
        tableEnv.from("sourceTable")
                .select($("user"), $("url"),$("cTime"))
                .executeInsert("sinkTable");
    }
}
cd  /usr/local/kafka
./bin/zookeeper-server-start.sh  config/zookeeper.properties
cd  /usr/local/kafka
./bin/kafka-server-start.sh  config/server.properties
cd  /usr/local/kafka
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic clicklog_input --replication-factor 1 --partitions 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic clicklog_output --replication-factor 1 --partitions 1
bin/kafka-topics.sh --zookeeper localhost:2181 --list
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic clicklog_output
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic clicklog_input
{"user":"Mary","url":"./home","cTime":"2023-09-02 12:00:00"}

这里给出一个简单的Table API和SQL数据处理应用程序。假设已经存在一个文本文件“/home/hadoop/stockprice0.csv”,文件内容如下:

stock1,123
stock2,456
stock1,100
stock2,200

代码文件TableAPIandSQLDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;

public class TableAPIandSQLDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 使用SQL语句创建一个输入表sourceTable
        String sourceDDL =
                "create table sourceTable (" +
                        "stockId STRING," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice0.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        // 使用SQL语句创建一个输出表sinkTable
        tableEnv.executeSql("CREATE TEMPORARY TABLE sinkTable(stockId STRING,price DOUBLE) WITH ('connector' = 'print')");
        // 使用Table API创建一个Table对象table1
        Table table1 = tableEnv.from("sourceTable");
        // 把table1注册成一个临时表
        tableEnv.createTemporaryView("myTable1", table1);
        // 使用Table API进行查询
        Table result1 = table1.groupBy($("stockId")).select($("stockId"), $("price").sum().as("sum-price"));
        result1.execute().print();
        // 使用SQL语句进行查询
        Table result2 = tableEnv.sqlQuery("select * from sourceTable where price>200");
        result2.execute().print();
        Table result3 = tableEnv.sqlQuery("select * from myTable1 where price>200");
        result3.execute().print();
    }
}

代码文件OutputToFileDemo1.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class OutputToFileDemo1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table sourceTable (" +
                        "stockId STRING," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice0.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        String sinkDDL =
                "create table sinkTable (" +
                        "stockId STRING," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice2', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";

        // 使用SQL语句创建一个输出表SinkTable
        tableEnv.executeSql(sinkDDL);
        // 使用Table API创建一个Table对象table1
        Table table1 = tableEnv.from("sourceTable");
        // 把Table对象table1写入到输出表SinkTable中
        TableResult tableResult = table1.executeInsert("sinkTable");
    }
}

代码文件 OutputToFileDemo2.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class OutputToFileDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table sourceTable (" +
                        "stockId STRING," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice0.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        Table table1 = tableEnv.from("sourceTable");
        final Schema schema = Schema.newBuilder()
                .column("stockId", DataTypes.STRING())
                .column("price", DataTypes.DOUBLE())
                .build();
        tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem")
                .schema(schema)
                .option("path", "file:///home/hadoop/stockprice")
                .format(FormatDescriptor.forFormat("csv")
                        .option("field-delimiter", ",")
                        .build())
                .build());
        // Prepare the insert into pipeline
        // 把表对象table1插入到pipeline中
        TablePipeline pipeline =table1.insertInto("CsvSinkTable");
        // 打印执行过程的明细
        pipeline.printExplain();
        // 调用pipeline的execute()方法,把表对象table1的数据发送到注册好的CsvSinkTable中
        pipeline.execute();
    }
}

代码文件OutputToMySQLDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class OutputToMySQLDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table sourceTable (" +
                        "stockId STRING," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        String sinkDDL =
                "create table sinkTable (" +
                        " stockId STRING, " +
                        " price DOUBLE " +
                        ") with (" +
                        " 'connector' = 'jdbc', " +
                        " 'url' = 'jdbc:mysql://localhost:3306/stockdb', " +
                        " 'table-name' = 'stockprice', " +
                        " 'driver' = 'com.mysql.jdbc.Driver', " +
                        " 'username' = 'root', " +
                        " 'password' = '123456' )";

        // 使用SQL语句创建一个输出表SinkTable
        tableEnv.executeSql(sinkDDL);
        // 使用Table API创建一个Table对象table1
        Table table1 = tableEnv.from("sourceTable");
        // 把Table对象table1写入到输出表SinkTable中
        TableResult tableResult = table1.executeInsert("sinkTable");
    }
}

代码文件DataStreamToTableDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class DataStreamToTableDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个DataStream
        DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
        //把DataStream转化成一个Table对象
        Table inputTable = tableEnv.fromDataStream(dataStream);
        // 把Table对象注册成一个视图
        tableEnv.createTemporaryView("InputTable", inputTable);
        // 对视图进行查询
        Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");
        // 把insert-only类型的Table对象转换成一个DataStream
        DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
        // 调用DataStream API执行流计算
        resultStream.print();
        env.execute();
    }
}

代码文件DataStreamToTableDemo2.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class DataStreamToTableDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个DataStream
        DataStream<Row> dataStream = env.fromElements(
                Row.of("Alice", 12),
                Row.of("Bob", 10),
                Row.of("Alice", 100));
        // 把DataStream转换成一个Table对象
        Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
        // 把Table注册成一个视图并进行查询
        // 这个查询包含了聚合结果,并且聚合结果会发生更新
        tableEnv.createTemporaryView("InputTable", inputTable);
        Table resultTable = tableEnv.sqlQuery(
                "SELECT name, SUM(score) FROM InputTable GROUP BY name");
        // 把updating类型的Table对象转换为一个ChangelogDataStream
        DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
        // 调用DataStream API执行流计算
        resultStream.print();
        env.execute();
    }
}

在创建表的DDL中定义事件时间的方法如下:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 把user_action_time声明为事件时间属性,并且使用5秒延迟的水位线策略
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

在创建表的DDL中定义处理时间的方法如下:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外的字段作为处理时间属性
) WITH (
  ...
);

代码文件TableAPIDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;

public class TableAPIDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个DataStream
        DataStream<String> dataStream = env.fromElements("Flink", "Spark", "Spark", "Flink");
        //把DataStream转化成一个Table对象

        Table inputTable = tableEnv.fromDataStream(dataStream).as("word");
        Table wordCount = inputTable.groupBy($("word"))
                .select($("word"), $("word").count().as("cnt"));
        wordCount.execute().print();
    }
}

fromValues的用法和SQL中的VALUES从句的用法类似,它会从用户提供的行中生成一个表。具体用法如下:

import static org.apache.flink.table.api.Expressions.row;
Table table = tableEnv.fromValues(
   row(1, "ABC"),
   row(2L, "ABCDE")
);

也可以在创建表对象的时候指定字段名称,具体用法如下:

Table table = tableEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
                        DataTypes.FIELD("name", DataTypes.STRING())
                ),
                row(1, "ABC"),
                row(2L, "ABCDE")
        );
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.select($("stockId"), $("price"). as("stockPrice"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.select($("*"));
Table stock = tableEnv.from("stockPriceTable");
Table result =stock.as("myStockId,myTimeStamp,myPrice");
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.where($("stockId").isEqual("stock_1"));
Table stock = tableEnv.from("stockPriceTable");
Table result =stock.filter($("stockId").isEqual("stock_1"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.addColumns(concat($("stockId"), "_good"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.addOrReplaceColumns(concat($("stockId"), "_good").as("goodstock"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.dropColumns($("timeStamp"), $("price"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.renameColumns($("stockId").as("id"), $("price").as("stockprice"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.groupBy($("stockId")).select($("stockId"), $("price").sum().as("price_sum"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock
    .window(Tumble.over(lit(5).minutes()).on($("timeStamp")).as("w")) // 定义窗口
    .groupBy($("stockId"), $("w")) // //根据键和窗口进行分组
    // 访问窗口属性并聚合
    .select(
        $("stockId"),
        $("w").start(),
        $("w").end(),
        $("w").rowtime(),
        $("price").sum().as("price_sum")
    );

代码文件GroupByWindowAggregationDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.text.Format;
import java.text.SimpleDateFormat;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

public class GroupByWindowAggregationDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(new MyWatermarkStrategy());
        Table stock = tableEnv.fromDataStream(stockPriceDSWithWatermark, $("timeStamp").rowtime(), $("stockId"), $("price"));
        Table result = stock
                .window(Tumble.over(lit(10).seconds()).on($("timeStamp")).as("w")) // 定义窗口
                .groupBy($("stockId"), $("w")) // //根据键和窗口进行分组
                // 访问窗口属性并聚合
                .select(
                        $("stockId"),
                        $("w").start(),
                        $("w").end(),
                        $("w").rowtime(),
                        $("price").sum().as("price_sum")
                );
        result.execute().print();
    }
    public static class MyWatermarkStrategy implements WatermarkStrategy<StockPrice>{

        @Override
        public WatermarkGenerator<StockPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<StockPrice>(){
                long maxOutOfOrderness = 10000L; //设定最大延迟为10秒
                long currentMaxTimestamp = 0L;
                Watermark a = null;
                Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                @Override
                public void onEvent(StockPrice event, long eventTimestamp, WatermarkOutput output) {
                    currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp);
                    a = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                    output.emitWatermark(a);
                    System.out.println("timestamp:" + event.stockId + "," + event.timeStamp + "|" + format.format(event.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString());
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    // 没有使用周期性发送水印,因此这里没有执行任何操作
                }
            };
        }

        @Override
        public TimestampAssigner<StockPrice> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<StockPrice>() {
                @Override
                public long extractTimestamp(StockPrice element, long recordTimestamp) {
                    return element.timeStamp; //从到达消息中提取时间戳;
                }
            };
        }
    }
}
nc -lk 9999
stock_1,1602031567000,8.14
stock_1,1602031568000,8.22
stock_1,1602031575000,8.14
stock_1,1602031577000,8.14
stock_1,1602031593000,8.14
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.distinct();
Table left = tableEnv.from("MyTable").select($("a"), $("b"), $("c"));
Table right = tableEnv.from("MyTable").select($("d"), $("e"), $("f"));
Table result = left.join(right)
    .where($("a").isEqual($("d")))
    .select($("a"), $("b"), $("e"));
Table left = tableEnv.from("MyTable").select($("a"), $("b"), $("c"));
Table right = tableEnv.from("MyTable").select($("d"), $("e"), $("f"));

Table leftOuterResult = left.leftOuterJoin(right, $("a").isEqual($("d")))
                            .select($("a"), $("b"), $("e"));
Table rightOuterResult = left.rightOuterJoin(right, $("a").isEqual($("d")))
                            .select($("a"), $("b"), $("e"));
Table fullOuterResult = left.fullOuterJoin(right, $("a").isEqual($("d")))
                            .select($("a"), $("b"), $("e"));
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.union(right);
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.unionAll(right);
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.intersect(right);
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.intersectAll(right);
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.minus(right);
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.minusAll(right);
Table left = tableEnv.from("Orders1")
Table right = tableEnv.from("Orders2");
Table result = left.select($("a"), $("b"), $("c")).where($("a").in(right));
Table result = tab.orderBy($("a").asc());
Table orders = tableEnv.from("Orders");
orders.insertInto("OutOrders").execute();

代码文件MapFunctionDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class MapFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table stockPriceTable (" +
                        "stockId STRING," +
                        "myTimeStamp BIGINT," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);       
        Table inputTable = tableEnv.from("stockPriceTable");
        ScalarFunction func = new MyMapFunction();
        tableEnv.registerFunction("func", func);
        Table result = inputTable
                .map(call("func", $("stockId"))).as("stockId", "myStockId");
        result.execute().print();
    }
    public static class MyMapFunction extends ScalarFunction {
        public Row eval(String a) {
            return Row.of(a, "my-" + a);
        }
        @Override
        public TypeInformation<?> getResultType(Class<?>[] signature) {
            return Types.ROW(Types.STRING, Types.STRING);
        }
    }
}

在Linux中新建一个文件“file:///home/hadoop/stockprice.csv”,内容如下:

stock_1,1602031567000,8.17
stock_2,1602031568000,8.22
stock_1,1602031575000,8.14

代码文件FlatMapFunctionDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class FlatMapFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table stockPriceTable (" +
                        "stockId STRING," +
                        "myTimeStamp BIGINT," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        Table inputTable = tableEnv.from("stockPriceTable");
        TableFunction func = new MyFlatMapFunction();
        tableEnv.registerFunction("func", func);
        Table result = inputTable
                .flatMap(call("func", $("stockId"))).as("a", "b");
        result.execute().print();
    }
    public static class MyFlatMapFunction extends TableFunction<Row> {
        public void eval(String str) {
            if (str.contains("#")) {
                String[] array = str.split("#");
                for (int i = 0; i < array.length; ++i) {
                    collect(Row.of(array[i], array[i].length()));
                }
            }
        }

        @Override
        public TypeInformation<Row> getResultType() {
            return Types.ROW(Types.STRING, Types.INT);
        }
    }
}

在Linux中新建一个文件“file:///home/hadoop/stockprice.csv”,内容如下:

stock#01,1602031567000,8.17
stock#02,1602031568000,8.22
stock#01,1602031575000,8.14

代码文件AggregateFunctionDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class AggregateFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table stockPriceTable (" +
                        "stockId STRING," +
                        "myTimeStamp BIGINT," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        Table inputTable = tableEnv.from("stockPriceTable");
        AggregateFunction myAggFunc = new MyMinMax();
        tableEnv.registerFunction("myAggFunc", myAggFunc);
        Table result = inputTable
                .groupBy($("stockId"))
                .aggregate(call("myAggFunc", $("price")).as("min_price", "max_price"))
                .select($("stockId"), $("min_price"), $("max_price"));
        result.execute().print();
    }
    public static class MyMinMaxAcc {
        public double min = Double.MAX_VALUE;
        public double max = 0D;
    }

    public static class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> {

        public void accumulate(MyMinMaxAcc acc, double value) {
            if (value < acc.min) {
                acc.min = value;
            }
            if (value > acc.max) {
                acc.max = value;
            }
        }

        @Override
        public MyMinMaxAcc createAccumulator() {
            return new MyMinMaxAcc();
        }

        public void resetAccumulator(MyMinMaxAcc acc) {
            acc.min = 0D;
            acc.max = 0D;
        }

        @Override
        public Row getValue(MyMinMaxAcc acc) {
            return Row.of(acc.min, acc.max);
        }

        @Override
        public TypeInformation<Row> getResultType() {
            return new RowTypeInfo(Types.DOUBLE, Types.DOUBLE);
        }
    }
}

在Linux中新建一个文件file:///home/hadoop/stockprice.csv,内容如下:

stock_1,1602031567000,8.17
stock_2,1602031568000,6.22
stock_1,1602031575000,5.14
stock_2,1602031573000,3.29

代码文件WindowAggregateFunctionDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*;

public class WindowAggregateFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table stockPriceTable (" +
                        "stockId STRING," +
                        "ts BIGINT," +
                        "price DOUBLE," +
                        "ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3)," +
                        "WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        Table inputTable = tableEnv.from("stockPriceTable");
        AggregateFunction myAggFunc = new MyMinMax();
        tableEnv.registerFunction("myAggFunc", myAggFunc);
        Table result = inputTable
                .window(Tumble.over(lit(5).seconds())
                        .on($("ts_ltz"))
                        .as("w"))
                .groupBy($("stockId"), $("w"))
               .aggregate(call("myAggFunc", $("price")).as("min_price", "max_price"))
                .select($("stockId"), $("min_price"), $("max_price"),$("w").start(), $("w").end());
        result.execute().print();
    }
    public static class MyMinMaxAcc {
        public double min = Double.MAX_VALUE;
        public double max = 0D;
    }

    public static class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> {

        public void accumulate(MyMinMaxAcc acc, double value) {
            if (value < acc.min) {
                acc.min = value;
            }
            if (value > acc.max) {
                acc.max = value;
            }
        }

        @Override
        public MyMinMaxAcc createAccumulator() {
            return new MyMinMaxAcc();
        }

        public void resetAccumulator(MyMinMaxAcc acc) {
            acc.min = 0D;
            acc.max = 0D;
        }

        @Override
        public Row getValue(MyMinMaxAcc acc) {
            return Row.of(acc.min, acc.max);
        }

        @Override
        public TypeInformation<Row> getResultType() {
            return new RowTypeInfo(Types.DOUBLE, Types.DOUBLE);
        }
    }
}

在Linux中新建一个文件file:///home/hadoop/stockprice.csv,内容如下:

stock_1,1602031567000,8.14
stock_2,1602031568000,18.22
stock_2,1602031575000,8.14
stock_1,1602031577000,18.21
stock_1,1602031593000,8.98
cd /usr/local/flink
./bin/start-cluster.sh
./bin/sql-client.sh
Flink SQL> SHOW DATABASES;
Flink SQL> EXIT;
Flink SQL> show databases;
Flink SQL> exit;
Flink SQL> QUIT;
cd /usr/local/flink
./bin/stop-cluster.sh
SET 'sql-client.execution.result-mode' = 'table';
SET 'sql-client.execution.result-mode' = 'changelog';
SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL> SET 'sql-client.execution.result-mode' = 'table';
Flink SQL> SELECT  ' Hello World';
Flink SQL> SET 'sql-client.execution.result-mode' = 'changelog';
Flink SQL> SELECT  ' Hello World';
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL> SELECT  ' Hello World';
CREATE DATABASE mydatabase;
SHOW DATABASES;
SHOW CURRENT DATABASE;
USE mydatabase;
DROP DATABASE mydatabase;
CREATE DATABASE mydatabase;
USE mydatabase;
CREATE TABLE stockprice(
stockId STRING,
myTimeStamp BIGINT,
price DOUBLE
) WITH (
    'connector' = 'print'
);
SHOW TABLES;
DESC stockprice;
DROP TABLE stockprice;
CREATE TABLE source (
   f_sequence INT,
   f_random INT,
   f_random_str STRING
) WITH (
   'connector' = 'datagen', --连接器类型是datagen,会自动生成数据
   'rows-per-second'='5', --每秒生成几行数据
   'fields.f_sequence.kind'='sequence', --设置f_sequence字段的类型为序列类型
   'fields.f_sequence.start'='1',  --设置f_sequence字段的起始值是1
   'fields.f_sequence.end'='1000', --设置f_sequence字段的终止值是1000
 'fields.f_random.min'='1', --设置f_random字段的随机数的最小值
   'fields.f_random.max'='1000', --设置f_random字段的随机数的最大值
   'fields.f_random_str.length'='5' --设置f_random_str字段的随机字符串的长度
);
SELECT * FROM source;
SET sql-client.execution.result-mode=tableau;
SELECT * FROM source;
SET 'sql-client.execution.result-mode' = 'changelog';
CREATE TABLE sink(
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
    'connector' = 'print'
);
INSERT INTO sink SELECT * FROM source;

在Linux系统中新建一个文件“file:///home/hadoop/stockprice.csv”,内容如下:

stock_1,1602031567000,8.14
stock_2,1602031568000,18.22
stock_2,1602031575000,8.14
stock_1,1602031577000,18.21
stock_1,1602031593000,8.98
CREATE TABLE stockPriceTable (
      stockId STRING,
      ts BIGINT,
      price DOUBLE,
      ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
      WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND
) WITH (
      'connector' = 'filesystem',
      'path' = 'file:///home/hadoop/stockprice.csv', 
      'format' = 'csv', 
      'csv.field-delimiter' = ',', 
      'csv.ignore-parse-errors' = 'true' 
);
SET 'sql-client.execution.result-mode' = 'tableau';
SELECT stockId,price FROM stockPriceTable;
SELECT * FROM stockPriceTable;
SELECT stockId, price AS stockprice FROM stockPriceTable;
SELECT * FROM stockPriceTable WHERE stockId = 'stock_1';
SELECT * FROM stockPriceTable WHERE price > 10;
SELECT stockId, AVG(price) as avg_price
FROM stockPriceTable
GROUP BY stockId;
SELECT stockId, AVG(price)
FROM stockPriceTable
GROUP BY  stockId, TUMBLE(ts_ltz, INTERVAL '5' SECOND);
SELECT
stockId,
sum(price) as priceSum,
window_start, 
window_end
FROM TABLE(
    TUMBLE(TABLE stockPriceTable, DESCRIPTOR(ts_ltz), INTERVAL '5' SECOND )
)
GROUP BY window_start, window_end, stockId;
SELECT
stockId,
sum(price) as priceSum,
window_start, 
window_end
FROM TABLE(
   TUMBLE(
     DATA => TABLE stockPriceTable,
     TIMECOL => DESCRIPTOR(ts_ltz),
     SIZE => INTERVAL '5' SECOND))
GROUP BY window_start, window_end, stockId;
SELECT
stockId,
sum(price) as priceSum,
window_start, 
window_end
FROM TABLE(
    HOP(TABLE stockPriceTable, DESCRIPTOR(ts_ltz), INTERVAL '5' SECOND, INTERVAL '10' SECOND)
)
GROUP BY window_start, window_end, stockId;
SELECT 
stockId,
sum(price) as priceSum,
window_start, 
window_end
FROM TABLE(
    HOP(
      DATA => TABLE stockPriceTable,
      TIMECOL => DESCRIPTOR(ts_ltz),
      SLIDE => INTERVAL '5' SECOND,
      SIZE => INTERVAL '10' SECOND))
GROUP BY window_start, window_end, stockId;
SELECT
stockId,
ts_ltz,
COUNT(stockId) OVER (PARTITION BY stockId ORDER BY ts_ltz range BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) cnt
FROM stockPriceTable;
SELECT
stockId,
ts_ltz,
COUNT(stockId) OVER w AS cnt
FROM stockPriceTable
WINDOW w AS (
PARTITION BY stockId
ORDER BY ts_ltz
RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
);
SELECT
stockId,
ts_ltz,
AVG(price) OVER (PARTITION BY stockId ORDER BY ts_ltz ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) avgPrice
FROM stockPriceTable;
SELECT
stockId,
ts_ltz,
AVG(price) OVER w AS avgPrice
FROM stockPriceTable
WINDOW w AS (
PARTITION BY stockId
ORDER BY ts_ltz
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
);
SELECT
stockId,
price,
rownum
FROM
(
SELECT
stockId,
ts_ltz,
price,
ROW_NUMBER() OVER (PARTITION BY stockId ORDER BY price DESC) AS rownum
FROM stockPriceTable
)
WHERE rownum<=2;
SELECT DISTINCT stockId FROM stockPriceTable;
SELECT AVG(price)
FROM stockPriceTable
GROUP BY stockId
HAVING AVG(price) > 20;
SELECT *
FROM stockPriceTable INNER JOIN stock_info
ON stockPriceTable.stockId = stock_info.stockId;
SELECT * FROM stockPriceTable LEFT JOIN stock_info
ON stockPriceTable.stockId = stock_info.stockId;

SELECT * FROM stockPriceTable RIGHT JOIN stock_info
ON stockPriceTable.stockId = stock_info.stockId;

SELECT * FROM stockPriceTable FULL OUTER JOIN stock_info
ON stockPriceTable.stockId = stock_info.stockId;
SELECT *
FROM (
    (SELECT stockId FROM stockPriceTable WHERE stockId='stock_1')
    UNION
    (SELECT stockId FROM stockPriceTable WHERE stockId='stock_2')
);
SELECT *
FROM (
    (SELECT stockId FROM stockPriceTable WHERE stockId='stock_1')
    UNION ALL
    (SELECT stockId FROM stockPriceTable WHERE stockId='stock_2')
);
SELECT *
FROM (
    (SELECT stockId FROM stockPriceTable WHERE price > 10.0)
    INTERSECT
    (SELECT stockId FROM stockPriceTable WHERE stockId='stock_1')
);

SELECT *
FROM (
    (SELECT stockId FROM stockPriceTable WHERE price > 10.0)
    EXCEPT
    (SELECT stockId FROM stockPriceTable WHERE stockId='stock_1')
);
SELECT stockId, price
FROM stockPriceTable
WHERE stockId IN (
    SELECT stockId FROM newstock
);
SELECT stockId, price
FROM stockPriceTable
WHERE stockId EXISTS (
    SELECT stockId FROM newstock
);
SELECT *
FROM stockPriceTable
ORDER BY ts_ltz;
SELECT *
FROM stockPriceTable
ORDER BY ts_ltz
LIMIT 3;
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc/3.1.1-1.17
https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33
mysql> CREATE DATABASE stockdb;
mysql> USE stockdb;
mysql> CREATE TABLE stockprice (stockId VARCHAR(20),price DOUBLE);
mysql> INSERT INTO stockprice VALUES("stock1",123);
mysql> INSERT INTO stockprice VALUES("stock2",456);
mysql> SELECT * FROM stockprice;
SET sql-client.execution.result-mode=tableau;
SHOW CATALOGS;
USE catalog default_catalog;
SHOW DATABASES;
CREATE CATALOG mysql_catalog WITH (
'type' = 'jdbc',
'default-database' = 'stockdb',
'username' = 'root',
'password' = '123456',
'base-url' = 'jdbc:mysql://localhost:3306'
);
USE catalog mysql_catalog;
SHOW DATABASES;
USE stockdb;
SHOW TABLES;
SELECT * FROM stockprice;
INSERT INTO stockprice VALUES('stock1',789);
mysql> SELECT * FROM stockprice;

在Linux中新建一个文件“file:///home/hadoop/stockprice.csv”,内容如下:

stock_1,1602031567000,8.14
stock_2,1602031568000,18.22
stock_2,1602031575000,8.14
stock_1,1602031577000,18.21
stock_1,1602031593000,8.98

代码文件ScalarFunctionDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class ScalarFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table stockPriceTable (" +
                        "stockId STRING," +
                        "myTimeStamp BIGINT," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        Table inputTable = tableEnv.from("stockPriceTable");
        //注册函数
        tableEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
        //在Table API里调用注册好的函数
        Table table1 = inputTable.select(call("SubstringFunction",$("stockId"),6,7));
        table1.execute().print();
        //在SQL里调用注册好的函数
        Table table2 = tableEnv.sqlQuery("SELECT SubstringFunction(stockId, 6, 7) FROM stockPriceTable");
        table2.execute().print();
    }
    //用户自定义函数
    public static class SubstringFunction extends ScalarFunction {
        public String eval(String s, Integer begin,  Integer end){
            return s.substring(begin, end);
        }
    }
}

代码文件TableFunctionDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class TableFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table stockPriceTable (" +
                        "stockId STRING," +
                        "myTimeStamp BIGINT," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        Table inputTable = tableEnv.from("stockPriceTable");
        tableEnv.createTemporarySystemFunction("SplitFunction", MySplitFunction.class);
        Table result = inputTable                
                .leftOuterJoinLateral(call("SplitFunction", $("stockId")))
                .select($("stockId"), $("word"), $("length"));
        result.execute().print();
    }

    //通过注解指定返回类型
    @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
    public static class MySplitFunction extends TableFunction<Row>{
        public void eval(String str) {
            for (String s : str.split("_")) {
                //使用collect(...)把行发送(emit)出去
                collect(Row.of(s, s.length()));
            }
        }
    }
}

代码文件TableFunctionDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class TableFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        // 创建一个输入表SourceTable
        String sourceDDL =
                "create table stockPriceTable (" +
                        "stockId STRING," +
                        "myTimeStamp BIGINT," +
                        "price DOUBLE" +
                        ") with (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = 'file:///home/hadoop/stockprice.csv', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ',', " +
                        " 'csv.ignore-parse-errors' = 'true' " +
                        " )";
        tableEnv.executeSql(sourceDDL);
        Table inputTable = tableEnv.from("stockPriceTable");
        // 注册函数
        tableEnv.createTemporarySystemFunction("MyCountFunction",MyCountFunction.class);
        // 在Table API中调用函数
        Table table1 = inputTable
                .groupBy($("stockId"))
                .select($("stockId"), call("MyCountFunction"));
        table1.execute().print();
    }
    public static class MyCountAccumulator {
        public long count = 0L;
    }
    public static class MyCountFunction extends AggregateFunction<Long, MyCountAccumulator> {
        // 获取结果
        @Override
        public Long getValue(MyCountAccumulator myCountAccumulator) {
            return myCountAccumulator.count;
        }
        // 初始化累加器
        @Override
        public MyCountAccumulator createAccumulator() {
            return new MyCountAccumulator();
        }
        // 设置累加方法,每次新到达一条数据,count值就增加1
        public void accumulate(MyCountAccumulator myCountAccumulator){
            myCountAccumulator.count = myCountAccumulator.count + 1;
        }
        // 合并累加结果
        public void merge(MyCountAccumulator myCountAccumulator, Iterable<MyCountAccumulator> it) {
            for (MyCountAccumulator a : it) {
                myCountAccumulator.count += a.count;
            }
        }
    }
}

代码文件TableAggregateFunctionDemo.java

package cn.edu.xmu;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class TableAggregateFunctionDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        DataStreamSource<Integer> numDS = env.fromElements(4, 7, 1, 3, 12, 9, 5);
        Table numTable = tableEnv.fromDataStream(numDS).as("num");
        // 注册函数
        tableEnv.createTemporarySystemFunction("Top2", Top2.class);
        // 在Table API中调用函数
        // call调用函数返回二元组,使用as方法把二元组的第1个元素赋值给value列,把二元组的第2个元素赋值给rank列
        Table result = numTable
                .flatAggregate(
                        call("Top2",$("num")).as("value","rank")
                )
                .select($("value"),$("rank"));
        result.execute().print();
    }
    // 继承TableAggregateFunction类
    // TableAggregateFunction的第1个参数类型是Tuple2<Integer,Integer>,表示输出数据类型
    // 输出数据时1个二元组,比如(12,1)表示12这个数排名第1位
    // 第2个参数类型是Tuple2<Integer,Integer>,里面封装了两个最大的数
    public static class Top2 extends TableAggregateFunction<Tuple2<Integer,Integer>,Tuple2<Integer,Integer>>{

        @Override
        public Tuple2<Integer, Integer> createAccumulator() {
            return Tuple2.of(0,0);
        }
        // 每来一条数据就调用一次accumulate方法,比较值的大小,并把最大的前2个数保存到acc中
        // 第1个参数类型Tuple2<Integer,Integer>表示累加器的类型,第2个参数类型Integer表示到达的数据的类型
        public void accumulate(Tuple2<Integer,Integer> acc, Integer num){
            if (num > acc.f0){
                // 新来的数据变成排名第1,原来的数据变成排名第2
                acc.f1 = acc.f0;
                acc.f0 = num;
            }else if (num > acc.f1){
                // 新来的数据变成排名第2,原来排在第2位的数据丢弃
                acc.f1 = num;
            }
        }
        //输出结果,输出格式是(数值,排名)
        // 采集器Collector的类型是<Tuple2<Integer,Integer>,表述输出数据的类型
        public void emitValue(Tuple2<Integer,Integer> acc, Collector<Tuple2<Integer,Integer>> out){
            if (acc.f0 != 0){
                out.collect(Tuple2.of(acc.f0,1));
            }
            if (acc.f1 != 0){
                out.collect(Tuple2.of(acc.f1,2));
            }
        }
    }
}