第7章代码-林子雨编著-《Flink编程基础(Scala版)》

大数据学习路线图

林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码

第7章 Table API&SQL

//获取运行时
env=...;
//获取TableEnvironment对象
tableEnv = ...; 
//注册一个表(输入数据)
tableEnv.connect(...).createTemporaryTable("table1");
//注册一个表(输出数据)
tableEnv.connect(...).createTemporaryTable("outputTable");
//定义查询:通过Table API的查询创建一个Table对象
tapiResult = tableEnv.from("table1").select(...);
//定义查询:通过SQL查询的查询创建一个Table对象
sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
//输出结果
tapiResult.insertInto("outputTable");
//启动程序
tableEnv.execute("Table API and SQL");
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
  <version>1.11.2</version>  
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.12</artifactId>
  <version>1.11.2</version>  
</dependency>
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.2</version>
</dependency>
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.2</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.11.2</version> 
</dependency>
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
//创建 TableEnvironment
val tableEnv = ... 

//使用环境对象从外部表中查询,并将结果创建一张表 
val projTable: Table = tableEnv.from("X").select(...)

//使用表projTable 注册成临时表 "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)
tableEnvironment
  .connect(...)  //指定表连接器的描述符
  .withFormat(...) //指定数据格式
  .withSchema(...) //指定表结构
  .inAppendMode() //指定更新模式
  .createTemporaryTable("MyTable") //注册表
bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice.txt")
    )
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
.withFormat(
  new Csv()
    .field("field1", Types.STRING)    //根据顺序指定字段名称和类型(必选)
    .field("field2", Types.TIMESTAMP) //根据顺序指定字段名称和类型(必选)
    .fieldDelimiter(",")             //指定列切割符,默认使用","(可选)
    .lineDelimiter("\n")            //指定行切割符,默认使用"\n"(可选)
    .quoteCharacter('"')            //指定字符串中的单个字符,默认为空(可选)
    .commentPrefix('#')            //指定注释的前缀,默认为空(可选)
    .ignoreFirstLine()              //是否忽略第一行(可选)
    .ignoreParseErrors()            //是否忽略解析错误的数据(可选)
)
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.11.2</version>
</dependency>
.withSchema(
  new Schema()
    .field("MyField1", Types.INT)     // 根据顺序指定第1个字段的名称和类型
    .field("MyField2", Types.STRING)  // 根据顺序指定第2个字段的名称和类型
    .field("MyField3", Types.BOOLEAN) // 根据顺序指定第3个字段的名称和类型
)
.connect(...)
  .inAppendMode()  //仅交互INSERT操作更新数据
  .inUpsertMode()  //仅交互INSERT、UPDATE、DELETE操作更新数据
  .inRetractMode()  //仅交互INSERT和DELETE操作更新数据
stock_2,1602031562148,43.5
stock_1,1602031562148,22.9
stock_0,1602031562153,8.3
stock_2,1602031562153,42.1
stock_1,1602031562158,22.2
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors._

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object TableAPITest {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val stockTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice.txt")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("timeStamp", DataTypes.BIGINT())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("stocktable")

    //使用Table API查询
val stock = bsTableEnv.from("stocktable") .select($"stockId",$"timeStamp",$"price")

    //打印输出
stock.toAppendStream[(String, Long, Double)].print()

    //程序触发执行
bsEnv.execute("TableAPITest")  
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object MySQLConnector{
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

//创建一个数据流
val dataStream = bsEnv.fromElements(Tuple3("95003","3",97))

//把数据流转换成表(这个知识点会在第7.1.7节介绍)
val table1 = bsTableEnv.fromDataStream(dataStream)

    //创建表student
    val sinkDDL: String =
      """
        |create table student (
        | sno varchar(20) not null,
        | cno varchar(20) not null,
        | grade int
        |) with (
        | 'connector.type' = 'jdbc',
        | 'connector.url' = 'jdbc:mysql://localhost:3306/flink',
        | 'connector.table' = 'student',
        | 'connector.driver' = 'com.mysql.jdbc.Driver',
        | 'connector.username' = 'root',
        | 'connector.password' = '123456'
        | )
        """.stripMargin
    //执行SQL语句
bsTableEnv.executeSql(sinkDDL)

//注册表
val mystudent=bsTableEnv.from("student")

//执行SQL查询(这个知识点会在第7.1.5节介绍)
val result=bsTableEnv.sqlQuery(s"select sno,cno,grade from $mystudent")

//打印输出
result.toRetractStream[(String,String,Int)].print()  

    //把数据插入到student中
table1.executeInsert("student")

//触发程序执行
    bsEnv.execute("MySQLConnector")
  }
}
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.40</version>
</dependency>
//获取TableEnvironment
val tableEnv = ... 

//注册一个表,名称为Orders

//扫描注册的Orders表
val orders = tableEnv.from("Orders")

//计算来自法国的所有顾客的收益
val revenue = orders
  .filter($"cCountry" === "FRANCE")
  .groupBy($"cID", $"cName")
  .select($"cID", $"cName", $"revenue".sum AS "revSum")

//执行表的转换
//执行查询
//获取TableEnvironment
val tableEnv = ...

//注册一个表,名称为Orders

//计算来自法国的所有顾客的收益
val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

//执行表的转换
//执行查询
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors._

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object TableAPIAndSQLTest {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val stockTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice.txt")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("timeStamp", DataTypes.BIGINT())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("stocktable")

    //使用Table API查询
    val stock = bsTableEnv.from("stocktable")
val stock1 = stock.select($"stockId",$"price").filter('stockId==="stock_1")

    //注册表
bsTableEnv.createTemporaryView("stockSQLTable",stock)

    //设置SQL语句
    val sql=
      """
        |select stockId,price from stockSQLTable
        |where stockId='stock_2'
        |""".stripMargin

    //执行SQL查询
val stock2=bsTableEnv.sqlQuery(sql)

//打印输出
    stock1.toAppendStream[(String, Double)].print("stock_1")
stock2.toAppendStream[(String,Double)].print("stock_2")

    //程序触发执行
    bsEnv.execute("TableAPIAndSQLTest")
  }
}
//获取TableEnvironment
val tableEnv = ... 

//创建一个输出表
val schema = new Schema()
    .field("a", DataTypes.INT())
    .field("b", DataTypes.STRING())
    .field("c", DataTypes.BIGINT())

tableEnv.connect(new FileSystem().path("/path/to/file"))
    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("CsvSinkTable")

//使用Table API或者SQL计算一个结果表
val result: Table = ...

//把结果表写入到已经注册的TableSink
result.executeInsert("CsvSinkTable")
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors._

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object TableSinkTest {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val stockTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice.csv")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("timeStamp", DataTypes.BIGINT())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("stocktable")

    //使用Table API查询
    val stock = bsTableEnv.from("stocktable")
    val stock1 = stock.select("stockId,price").filter('stockId==="stock_1") 

    //创建一个输出表
    val schema = new Schema()
      .field("stockId", DataTypes.STRING())
      .field("price", DataTypes.DOUBLE())

    bsTableEnv.connect(new FileSystem().path("file:///home/hadoop/output.csv"))
      .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
      .withSchema(schema)
      .createTemporaryTable("CsvSinkTable")

    //把查询结果stock1发送给已经注册的TableSink
    stock1.executeInsert("CsvSinkTable")

    //打印输出
stock1.toAppendStream[(String, Double)].print("stock_1")

    //程序触发执行
bsEnv.execute("TableSinkTest")
  }
}
stock_1|22.9
stock_1|22.2
// 获取TableEnvironment 

val tableEnv: StreamTableEnvironment = ... 

//创建一个DataStream
val stream: DataStream[(Long, String)] = ...

// 把这个DataStream注册成为视图"myTable",视图的两个字段是"f0"和"f1"
tableEnv.createTemporaryView("myTable", stream)

// 把这个DataStream注册成为视图"myTable2",视图的两个字段是"myLong"和"myString"
tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString)
//获取TableEnvironment
val tableEnv = ...

//创建一个DataStream
val stream: DataStream[(Long, String)] = ...

//把一个DataStream转换成一个表,表的默认字段是"_1"和"_2"
val table1: Table = tableEnv.fromDataStream(stream)

//把这个DataStream注册成为表,表的两个字段是"myLong"和"myString"
val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")
//获取TableEnvironment
val tableEnv: StreamTableEnvironment = ...

//创建一个具有两个字段的表, (String name, Integer age)
val table: Table = ...

//把表转换成一个DataStream,每个元素类型为Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

//把表转换成一个DataStream,每个元素类型为Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] = tableEnv.toAppendStream[(String, Int)](table)
//获取TableEnvironment 
val tableEnv = BatchTableEnvironment.create(env)

//创建一个具有两个字段的表,(String name, Integer age)
val table: Table = ...

//把表转换成一个DataSet,每个元素类型为Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

//把表转换成一个DataSet,每个元素类型为Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
//获取TableEnvironment
val tableEnv: StreamTableEnvironment = ... 

//创建数据集
val stream: DataStream[(Long, Int)] = ...

//把DataStream转换成表,使用默认的字段名称"_1"和"_2"
val table: Table = tableEnv.fromDataStream(stream)

//把DataStream转换成表,只使用一个字段名称 "myLong" 
val table: Table = tableEnv.fromDataStream(stream, $"myLong")

//把DataStream转换成表,使用两个字段名称 "myLong"和"myInt"
val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myInt")
//获取TableEnvironment
val tableEnv: StreamTableEnvironment = ...

//创建数据集
val stream: DataStream[(Long, Int)] = ...

//把DataStream转换成表,使用默认字段名称"_1"和"_2"
val table: Table = tableEnv.fromDataStream(stream)

//把DataStream转换成表,只使用一个字段名称"_2"
val table: Table = tableEnv.fromDataStream(stream, $"_2")

//把DataStream转换成表,并交换两个字段的顺序
val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")

//把DataStream转换成表,交换两个字段的顺序,并且重命名为"myInt"和"myLong"
val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "myLong")
//获取TableEnvironment
val tableEnv: StreamTableEnvironment = ... 

//创建数据集
val stream: DataStream[Long] = ...

//把DataStream转变成表,使用默认的字段名称"f0"
val table: Table = tableEnv.fromDataStream(stream)

//把DataStream转变成表,使用字段名称"myLong"
val table: Table = tableEnv.fromDataStream(stream, $"myLong")
//获取TableEnvironment
val tableEnv: StreamTableEnvironment = ...

//创建数据集
val stream: DataStream[(Long, String)] = ...

//把DataStream转换成表,使用重命名的默认名称"_1"和"_2"
val table: Table = tableEnv.fromDataStream(stream)

//把DataStream转换成表,使用字段名称"myLong"和"myString" (基于位置)
val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")

//把DataStream转换成表,使用重新排序的字段"_2"和"_1" (基于名称)
val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")

//把DataStream转换成表,使用映射后的字段"_2" (基于名称)
val table: Table = tableEnv.fromDataStream(stream, $"_2")

//把DataStream转换成表,使用重新排序和重新命名的字段"myString"和"myLong" (基于名字)
val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myString", $"_1" as "myLong")

//定义Case Class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

//把DataStream转变成表使用默认的字段名字'name和'age
val table = tableEnv.fromDataStream(streamCC)

//把DataStream转变成表,使用字段名称'myName和'myAge (基于位置)
val table = tableEnv.fromDataStream(streamCC, $"myName", $"myAge")

//把DataStream转换成表,使用重新排序和重新命名的字段"myAge"和"myName" (基于名称)
val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")
// 获取TableEnvironment
val tableEnv: StreamTableEnvironment = ... 

// Person是一个POJO对象,具有字段名称"name"和"age"
val stream: DataStream[Person] = ...

// 把DataStream转变成表,使用默认的字段名称"age"和"name" (字段根据名称进行排序)
val table: Table = tableEnv.fromDataStream(stream)

// 把DataStream转变成表,使用重命名字段"myAge"和"myName" (基于名称)
val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")

// 把DataStream转换成表,使用映射后的字段名称"name" (基于名称)
val table: Table = tableEnv.fromDataStream(stream, $"name")

// 把DataStream转变成表,使用映射后的和重命名的字段"myName" (基于名称)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")
//获取TableEnvironment
val tableEnv: StreamTableEnvironment = ... 

// Row类型的DataStream,具有两个字段"name"和"age" ,字段由`RowTypeInfo`声明
val stream: DataStream[Row] = ...

// 把DataStream转变成表,使用默认的字段名称"name"和"age"
val table: Table = tableEnv.fromDataStream(stream)

// 把DataStream转变成表,使用重命名的字段名称"myName"和"myAge" (基于位置)
val table: Table = tableEnv.fromDataStream(stream, $"myName", $"myAge")

// 把DataStream转变成表,使用重命名的字段"myName"和"myAge" (基于名称)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName", $"age" as "myAge")

// 把DataStream转变成表,使用映射后的字段"name" (基于名称)
val table: Table = tableEnv.fromDataStream(stream, $"name")

// 把DataStream转变成表,使用映射后的并且重命名的字段"myName" (基于名称)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object TableAPIAndSQLDemo {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
val inputData=bsEnv.readTextFile("file:///home/hadoop/stockprice.txt")

    //设置对数据集的转换操作逻辑
val dataStream=inputData.map(line=>{
      val arr=line.split(",")
      StockPrice(arr(0),arr(1).toLong,arr(2).toDouble)
})

    //从DataStream生成表
val stockTable=bsTableEnv.fromDataStream(dataStream,$"stockId",$"timeStamp",$"price")

    //使用Table API查询
val stock1=stockTable.select($"stockId",$ "price").filter('stockId==="stock_1")

    //注册表
bsTableEnv.createTemporaryView("stocktable",stockTable)

    //设置SQL语句
val sql=
      """
        |select stockId,price from stocktable
        |where stockId='stock_2'
        |""".stripMargin

    //执行SQL查询
val stock2=bsTableEnv.sqlQuery(sql)

    //把结果打印输出
stock1.toAppendStream[(String,Double)].print("stock_1")
stock2.toAppendStream[(String,Double)].print("stock_2")

//程序触发执行
    bsEnv.execute("TableAPIAndSQLDemo ")
  }
}
//方案1

//基于stream中的事件产生时间戳和水位线
val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)

//声明一个额外的逻辑字段作为事件时间属性
val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)

//方案2

//从第一个字段获取事件时间,并且产生水位线
val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)

//第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")

//使用方法

val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow") //本行代码的含义将在第7.2.4节中介绍
val stream: DataStream[(String, String)] = ...

//声明一个额外的字段作为时间属性字段
val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)

val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")  //本行代码的含义将在7.2.4节中介绍
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._

import scala.collection.mutable

object TableAPIDemo {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //生成测试数据
val data = Seq("Flink", "Spark", "HBase", "Spark", "Hadoop", "Flink", "Hive")

    //创建数据源
val source = bsEnv.fromCollection(data).toTable(bsTableEnv, 'word)

    //单词统计核心逻辑
    val result = source
      .groupBy('word)  //单词分组
      .select('word, 'word.count)  //单词统计

    //打印输出计算结果
result.toRetractStream[(String, Long)].print()

    //程序触发执行
    bsEnv.execute
  }
}
val stock: Table = tableEnv.from("stocktable")
val table = tableEnv.fromValues(
   row(1, "ABC"),
   row(2L, "ABCDE"))
val stock = tableEnv.from("stocktable")
val result = stock.select($"stockId", $"price" as "stockPrice")
val stock = tableEnv.from("stocktable")
val result = stock.select($"*")
val stock = tableEnv.from("stocktable").as("myStockId","myTimeStamp","myPrice")
val stock = tableEnv.from("stocktable")
val result = stock.where($"stockId" === "stock_1")
val stock = tableEnv.from("stocktable")
val result = stock.filter($"stockId" === "stock_1")
val stock = tableEnv.from("stocktable")
val result = stock.addColumns(concat($"stockId", "_good"))
(stock_2,1602031562148,43.5,stock_2_good)
(stock_1,1602031562148,22.9,stock_1_good)
(stock_0,1602031562153,8.3,stock_0_good)
(stock_2,1602031562153,42.1,stock_2_good)
(stock_1,1602031562158,22.2,stock_1_good)
val stock = tableEnv.from("stocktable")
val result = stock.addOrReplaceColumns(concat($"stockId", "_good") as "goodstock")
val stock = tableEnv.from("stocktable")
val result = stock.dropColumns($"price")
val stock = tableEnv.from("stocktable")
val result = stock.renameColumns($"stockId" as "id", $"price" as "stockprice")
val stock = tableEnv.from("stocktable")
val result = stock.groupBy($"stockId").select($"stockId", $"price".sum().as("price_sum"))
val stock = tableEnv.from("stocktable")
val result: Table = stock
      .window(Tumble over 5.seconds() on $"timeStamp" as "w") //定义窗口
      .groupBy($"stockId", $"w") //根据键和窗口进行分组
      .select($"stockId", $"w".start, $"w".end, $"w".rowtime, $"price".sum as "price_sum")
val stock = tableEnv.from("stocktable")
val result: Table = stock
      .window(Slide over 5.seconds() every 1.seconds() on $"timeStamp" as "w") //定义窗口
      .groupBy($"stockId", $"w") //根据键和窗口进行分组
      .select($"stockId", $"w".start, $"w".end, $"w".rowtime, $"price".sum as "price_sum")
val stock = tableEnv.from("stocktable")
val result: Table = stock
      .window(Session withGap 5.seconds() on $"timeStamp" as "w") //定义窗口
      .groupBy($"stockId", $"w") //根据键和窗口进行分组
      .select($"stockId", $"w".start, $"w".end, $"w".rowtime, $"price".sum as "price_sum")
package cn.edu.xmu.cn

import java.text.SimpleDateFormat

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object GroupByWindowAggregation {
  def main(args: Array[String]): Unit = {

    //获取运行时
    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置时间特性
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val source = bsEnv.socketTextStream("localhost", 9999)

    //指定针对数据流的转换操作逻辑
    val stockDataStream = source
      .map(s => s.split(","))
      .map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))

    //为数据流分配时间戳和水位线
val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)

    //从DataStream生成表
val stockTable=bsTableEnv.fromDataStream(watermarkDataStream,$"stockId",$"timeStamp".rowtime,$"price")

    //使用Table API查询
    val result: Table = stockTable
      .window(Tumble over 5.seconds() on $"timeStamp" as "w") // 定义窗口
      .groupBy($"stockId", $"w") // 根据键和窗口进行分组
      .select($"stockId",$"price".sum as "price_sum")

    //打印输出
result.toRetractStream[(String, Double)].print()

    //程序触发执行
    bsEnv.execute("TableAPIandSQL")
  }

  //指定水位线生成策略
  class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {
    override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={
      new SerializableTimestampAssigner[StockPrice] {
        override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {
          element.timeStamp //从到达消息中提取时间戳
        }
      }
    }

    override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={
      new WatermarkGenerator[StockPrice](){
        val maxOutOfOrderness = 10000L //设定最大延迟为10秒
        var currentMaxTimestamp: Long = 0L
        var a: Watermark = null
        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

        override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {
          currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)
          a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
          output.emitWatermark(a)
          println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)
        }

        override def onPeriodicEmit(output:WatermarkOutput): Unit = {
          // 没有使用周期性发送水印,因此这里没有执行任何操作
        }
      }
    }
  }
}
nc -lk 9999
stock_1,1602031567000,8.14
stock_1,1602031568000,8.22
stock_1,1602031575000,8.14
stock_1,1602031577000,8.14
stock_1,1602031593000,8.14
val stock = tableEnv.from("stocktable")
val result = stock.distinct()
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"d", $"e", $"f")
val result = left.join(right).where($"a" === $"d").select($"a", $"b", $"e")
val left = tableEnv.fromDataSet(ds1, $"a", $"b", $"c")
val right = tableEnv.fromDataSet(ds2, $"d", $"e", $"f")

val leftOuterResult = left.leftOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e")
val rightOuterResult = left.rightOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e")
val fullOuterResult = left.fullOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e")
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"a", $"b", $"c")
val result = left.union(right)
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"a", $"b", $"c")
val result = left.unionAll(right)
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"e", $"f", $"g")
val result = left.intersect(right)
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"e", $"f", $"g")
val result = left.intersectAll(right)
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"a", $"b", $"c")
val result = left.minus(right)
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"a", $"b", $"c")
val result = left.minusAll(right)
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"a")
val result = left.select($"a", $"b", $"c").where($"a".in(right))
val in = ds.toTable(tableEnv, $"a", $"b", $"c")
val result = in.orderBy($"a".asc)
val stock: Table = bsTableEnv.from("stocktable")
stock.executeInsert("OutStocks")
package cn.edu.xmu.dblab

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object TableAPIMap {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val stockTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice.txt")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("timeStamp", DataTypes.BIGINT())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("stocktable")

    //使用Table API查询
    val stock = bsTableEnv.from("stocktable")
    val func = new MyMapFunction()
    val result = stock.map(func($"stockId")).as("a","b")
result.toRetractStream[(String,String)].print()

    //程序触发执行
    bsEnv.execute("TableAPIandSQL")
  }

  class MyMapFunction extends ScalarFunction {
    def eval(a: String): Row = {
      Row.of(a, "my-" + a)
    }
    override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
      Types.ROW(Types.STRING, Types.STRING())
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors._
import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.types.Row

case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object TableAPIFlatMap {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val stockTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice2.txt")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("timeStamp", DataTypes.BIGINT())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("stocktable")

    //使用Table API查询
    val stock = bsTableEnv.from("stocktable")
    val func = new MyFlatMapFunction()
    val result = stock.flatMap(func($"stockId")).as("a","b")
result.toRetractStream[(String,Int)].print()

    //程序触发执行
    bsEnv.execute("TableAPIandSQL")
  }

  class MyFlatMapFunction extends TableFunction[Row] {
    def eval(str: String): Unit = {
      if (str.contains("#")) {
        str.split("#").foreach({ s =>
          val row = new Row(2)
          row.setField(0, s)
          row.setField(1, s.length)
          collect(row)
        })
      }
    }
    override def getResultType: TypeInformation[Row] = {
      Types.ROW(Types.STRING, Types.INT)
    }
  }
}
stock#01,1602031567000,8.17
stock#02,1602031568000,8.22
stock#01,1602031575000,8.14
package cn.edu.xmu.dblab

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors._
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.apache.flink.types.Row

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object TableAPIAggregate {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val stockTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice.txt")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("timeStamp", DataTypes.BIGINT())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("stocktable")

    //使用Table API查询
    val stock = bsTableEnv.from("stocktable")
    val myAggFunc = new MyMinMax()
    val result = stock
      .groupBy($"stockId")
      .aggregate(myAggFunc($"price") as ("x", "y"))
      .select($"stockId", $"x", $"y")
result.toRetractStream[(String,Double,Double)].print()

    //程序触发执行
    bsEnv.execute("TableAPIandSQL")
  }

  case class MyMinMaxAcc(var min: Double, var max: Double)

  class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {
    def accumulate(acc: MyMinMaxAcc, value: Double): Unit = {
      if (value < acc.min) {
        acc.min = value
      }
      if (value > acc.max) {
        acc.max = value
      }
    }
    override def createAccumulator(): MyMinMaxAcc = MyMinMaxAcc(0.0, 0.0)
    def resetAccumulator(acc: MyMinMaxAcc): Unit = {
      acc.min = 0.0
      acc.max = 0.0
    }
    override def getValue(acc: MyMinMaxAcc): Row = {
      Row.of(java.lang.Double.valueOf(acc.min), java.lang.Double.valueOf(acc.max))
    }
    override def getResultType: TypeInformation[Row] = {
      new RowTypeInfo(Types.DOUBLE(), Types.DOUBLE())
    }
  }
}
package cn.edu.xmu.dblab

import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.Row

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object Test2 {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置时间特性
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
val source = bsEnv.socketTextStream("localhost", 9999)

    //指定针对数据流的转换操作逻辑
    val stockDataStream = source
      .map(s => s.split(","))
      .map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))

    //为数据流分配时间戳和水位线
val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)

    //从DataStream生成表
    val stockTable=bsTableEnv.fromDataStream(watermarkDataStream,$"stockId",$"timeStamp".rowtime,$"price")

    //使用Table API查询
    val myAggFunc = new MyMinMax()
    val result = stockTable
      .window(Tumble over 5.seconds on $"timeStamp" as "w")
      .groupBy($"stockId",$"w")
      .aggregate(myAggFunc($"price") as ("x", "y"))
      .select($"stockId", $"x", $"y")

    //打印输出
result.toRetractStream[(String,Double,Double)].print()

    //程序触发执行
    bsEnv.execute("TableAPIandSQL")
  }

  case class MyMinMaxAcc(var min: Double, var max: Double)

  class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {

    def accumulate(acc: MyMinMaxAcc, value: Double): Unit = {
      if (value < acc.min) {
        acc.min = value
      }
      if (value > acc.max) {
        acc.max = value
      }
    }

    override def createAccumulator(): MyMinMaxAcc = MyMinMaxAcc(0.0, 0.0)

    def resetAccumulator(acc: MyMinMaxAcc): Unit = {
      acc.min = 0.0
      acc.max = 0.0
    }

    override def getValue(acc: MyMinMaxAcc): Row = {
      Row.of(java.lang.Double.valueOf(acc.min), java.lang.Double.valueOf(acc.max))
    }

    override def getResultType: TypeInformation[Row] = {
      new RowTypeInfo(Types.DOUBLE(), Types.DOUBLE())
    }
  }

  //指定水位线生成策略
  class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {
    override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={
      new SerializableTimestampAssigner[StockPrice] {
        override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {
          element.timeStamp //从到达消息中提取时间戳
        }
      }
    }

    override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={
      new WatermarkGenerator[StockPrice](){
        val maxOutOfOrderness = 10000L //设定最大延迟为10秒
        var currentMaxTimestamp: Long = 0L
        var a: Watermark = null
        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

        override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {
          currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)
          a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
          output.emitWatermark(a)
          println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)
        }

        override def onPeriodicEmit(output:WatermarkOutput): Unit = {
          // 没有使用周期性发送水印,因此这里没有执行任何操作
        }
      }
    }
  }
}
stock_1,1602031567000,8.14
stock_2,1602031568000,18.22
stock_2,1602031575000,8.14
stock_1,1602031577000,18.21
stock_1,1602031593000,8.98
timestamp:stock_1,1602031567000|2020-10-07 08:46:07.000,1602031567000|2020-10-07 08:46:07.000,Watermark @ 1602031557000 (2020-10-07 08:45:57.000)
timestamp:stock_2,1602031568000|2020-10-07 08:46:08.000,1602031568000|2020-10-07 08:46:08.000,Watermark @ 1602031558000 (2020-10-07 08:45:58.000)
timestamp:stock_2,1602031575000|2020-10-07 08:46:15.000,1602031575000|2020-10-07 08:46:15.000,Watermark @ 1602031565000 (2020-10-07 08:46:05.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031577000|2020-10-07 08:46:17.000,Watermark @ 1602031567000 (2020-10-07 08:46:07.000)
timestamp:stock_1,1602031593000|2020-10-07 08:46:33.000,1602031593000|2020-10-07 08:46:33.000,Watermark @ 1602031583000 (2020-10-07 08:46:23.000)
(true,(stock_1,0.0,8.14))
(true,(stock_2,0.0,18.22))
(true,(stock_1,0.0,18.21))
(true,(stock_2,0.0,8.14))
package cn.edu.xmu.dblab

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object FlinkSQLSelect {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val stockTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice.txt")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("timeStamp", DataTypes.BIGINT())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("stocktable")

//创建输出表
    val outTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/out.txt")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("outtable")

//使用SQL查询
    bsTableEnv.executeSql(
      "INSERT INTO outtable SELECT stockId,price FROM stocktable WHERE stockId LIKE '%stock_1%'")

    //使用SQL查询
    val stock = bsTableEnv.from("stocktable")
    val result = bsTableEnv.sqlQuery(s"SELECT stockId,price FROM $stock")
    result.toRetractStream[(String,Double)].print()

 //程序触发执行
    bsEnv.execute("TableAPIandSQL")
  }
}
SELECT * FROM stock

SELECT stockId, price AS stockprice FROM stock
SELECT * FROM stock WHERE stockId = 'stock_1'
SELECT * FROM stock WHERE price > 10
SELECT stockId, AVG(price) as avg_price
FROM stock
GROUP BY stockId
SELECT stockId, AVG(price)
FROM stock
GROUP BY TUMBLE(timeStamp, INTERVAL '1' DAY), stockId
SELECT DISTINCT stockId FROM stock
SELECT AVG(price)
FROM stock
GROUP BY stockId
HAVING AVG(price) > 20
SELECT *
FROM stock INNER JOIN stock_info ON stock.stockId = stock_info.stockId
SELECT * FROM stock LEFT JOIN stock_info ON stock.stockId = stock_info.stockId
SELECT * FROM stock RIGHT JOIN stock_info ON stock.stockId = stock_info.stockId
SELECT * FROM stock FULL OUTER JOIN stock_info ON stock.stockId = stock_info.stockId
SELECT *
FROM (
    (SELECT stockId FROM stock WHERE stockId=’stock_1’)
  UNION
    (SELECT stockId FROM stock WHERE stockId=’stock_2’
)
SELECT *
FROM (
    (SELECT stockId FROM stock WHERE stockId=’stock_1’)
  UNION ALL
    (SELECT stockId FROM stock WHERE stockId=’stock_2’
)
SELECT *
FROM (
    (SELECT stockId FROM stock WHERE price > 10.0)
  INTERSECT
    (SELECT stockId FROM stock WHERE stockId=’stock_1’)
)
SELECT *
FROM (
    (SELECT stockId FROM stock WHERE price > 10.0)
  EXCEPT
    (SELECT stockId FROM stock WHERE stockId=’stock_1’)
)
SELECT stockId, price
FROM stock
WHERE stockId IN (
    SELECT stockId FROM newstock
)
SELECT stockId, price
FROM stock
WHERE stockId EXISTS (
    SELECT stockId FROM newstock
)
SELECT *
FROM stock
ORDER BY timeStamp
SELECT *
FROM stock
ORDER BY timeStamp
LIMIT 3
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object ScalarFunctionDemo {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val stockTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice.txt")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("timeStamp", DataTypes.BIGINT())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("stocktable")
val stock = bsTableEnv.from("stocktable")

    //在 Table API 里不经注册直接“内联”调用函数
val result1 = stock.select(call(classOf[SubstringFunction], $"stockId",6,7))

    //注册函数
bsTableEnv.createTemporarySystemFunction("SubstringFunction", classOf[SubstringFunction])

    //在Table API里调用注册好的函数
    val result2 = stock.select(call("SubstringFunction", $"stockId",6,7))
val result3 = bsTableEnv.sqlQuery("SELECT SubstringFunction(stockId, 6, 7) FROM stocktable")

    //打印输出
    result1.toAppendStream[Row].print("result1")
    result2.toAppendStream[Row].print("result2")
    result3.toAppendStream[Row].print("result3")
    bsEnv.execute("ScalarFunctionDemo ")
  }

  //用户自定义函数
  class SubstringFunction extends ScalarFunction {
    def eval(s: String, begin: Integer, end: Integer): String = {
      s.substring(begin, end)
    }
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors._
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.apache.flink.types.Row

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object TableFunctionDemo {
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val stockTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice.txt")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("timeStamp", DataTypes.BIGINT())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("stocktable")

    //使用Table API查询
    val stock = bsTableEnv.from("stocktable")
    val result = stock
       //.joinLateral(call(classOf[MySplitFunction], $"stockId")
        .leftOuterJoinLateral(call(classOf[MySplitFunction], $"stockId"))
        .select($"stockId", $"word", $"length")
result.toRetractStream[(String,String,Long)].print()

    //程序触发执行
    bsEnv.execute("TableFunctionDemo")
  }

  //通过注解指定返回类型
  @FunctionHint(output = new DataTypeHint("ROW<word STRING, length INT>"))
  class MySplitFunction extends TableFunction[Row] {
    def eval(str: String): Unit = {
      //使用collect(...)把行发送(emit)出去
      str.split("_").foreach(s => collect(Row.of(s, Int.box(s.length))))
    }
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors._
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.apache.flink.types.Row

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object SelfAggFunc{
  def main(args: Array[String]): Unit = {

    //获取运行时
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1
bsEnv.setParallelism(1)

    //获取EnvironmentSettings
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //获取TableEnvironment
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //创建数据源
    val stockTable = bsTableEnv.connect(
      new FileSystem()
        .path("file:///home/hadoop/stockprice.txt")
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("stockId", DataTypes.STRING())
        .field("timeStamp", DataTypes.BIGINT())
        .field("price", DataTypes.DOUBLE())
      ).createTemporaryTable("stocktable")

    //使用Table API查询
    val stock = bsTableEnv.from("stocktable")
    val myCountFunction = new MyCountFunction()
    val result = stock
      .groupBy($"stockId")
      .aggregate(myCountFunction() as ("x"))
      .select($"stockId", $"x")
result.toRetractStream[(String,Long)].print()

    //程序触发执行
    bsEnv.execute("SelfAggFunc")
  }

  case class MyCountAccumulator(var count:Long)

  class MyCountFunction extends AggregateFunction[Row, MyCountAccumulator] {
    def accumulate(acc: MyCountAccumulator): Unit = {
      acc.count = acc.count + 1
    }

    override def createAccumulator(): MyCountAccumulator = MyCountAccumulator(0)

    override def getValue(acc: MyCountAccumulator): Row = Row.of(java.lang.Long.valueOf(acc.count))

    override def getResultType: TypeInformation[Row] = {
      new RowTypeInfo(Types.LONG())
    }
  }
}