厦门大学林子雨编著《Flink编程基础(Java版)》教材中的命令行和代码(教材官网)
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Flink编程基础(Java版)》教材中的所有命令行和代码
需要注意的是,如果需要在代码中使用Table API,必须在pom.xml文件中加入相应的flink-table-api-java-bridge依赖库,具体如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.17.0</version>
</dependency>
此外,如果要在本地用IDE(比如IntelliJ IDEA或Eclipse)调试Table API&SQL程序,则还需要加入如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.17.0</version>
</dependency>
如果要实现用户自定义函数或者要与Kafka交互,则还需要加入以下依赖库:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.17.0</version>
</dependency>
代码实例SQLDemo.java
package cn.edu.xmu;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SQLDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 1L)
.build());
// 使用SQL语句创建一个输出表SinkTable
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable(str STRING) WITH ('connector' = 'print')");
// 使用Table API创建一个Table对象table1
Table table1 = tableEnv.from("SourceTable");
// 使用SQL语句创建一个Table对象able2
Table table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable");
// 把Table对象table1写入到输出表SinkTable中
TableResult tableResult = table1.executeInsert("SinkTable");
}
}
代码文件InputFromFileDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class InputFromFileDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table stockPriceTable (" +
"stockId STRING," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
// 使用SQL语句创建一个输出表SinkTable
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable(stockId STRING,price DOUBLE) WITH ('connector' = 'print')");
// 使用Table API创建一个Table对象table1
Table table1 = tableEnv.from("stockPriceTable");
// 把Table对象table1写入到输出表SinkTable中
TableResult tableResult = table1.executeInsert("SinkTable");
}
}
要想在IDEA中运行该程序,还需要在pom.xml中添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.17.0</version>
</dependency>
在Linux中新建一个文件“file:///home/hadoop/stockprice.csv”,在文件中输入如下两行内容:
stock1,123
stock2,456
如果要从HDFS中读取文件,可以把代码中的文件路径替换成HDFS文件路径,例如:
'path' = 'hdfs://hadoop01:9000/stockprice.csv'
在Linux中启动MySQL,进入MySQL Shell交互式执行环境,执行如下命令创建数据库:
mysql> create database stockdb;
mysql> use stockdb;
mysql> create table stockprice (stockId varchar(20),price double);
mysql> insert into stockprice values("stock1",123);
mysql> insert into stockprice values("stock2",456);
mysql> select * from stockprice;
代码文件SQLDemo2.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SQLDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table stockPriceTable (" +
" stockId STRING, " +
" price DOUBLE " +
") with (" +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://localhost:3306/stockdb', " +
" 'table-name' = 'stockprice', " +
" 'driver' = 'com.mysql.jdbc.Driver', " +
" 'username' = 'root', " +
" 'password' = '123456' )";
tableEnv.executeSql(sourceDDL);
// 使用SQL语句创建一个输出表SinkTable
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable(stockId STRING,price DOUBLE) WITH ('connector' = 'print')");
// 使用Table API创建一个Table对象table1
Table table1 = tableEnv.from("stockPriceTable");
// 把Table对象table1写入到输出表SinkTable中
TableResult tableResult = table1.executeInsert("SinkTable");
}
}
要想在IDEA中运行该程序,需要在pom.xml中添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.1-1.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
这里给出一个实例,要求使用Flink Table API从Kafka消费点击日志(JSON),转化为CSV格式之后输出到Kafka。
需要在pom.xml中添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
代码文件FlinkTableAPIKafka2Kafka.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkTableAPIKafka2Kafka {
public static final String input_topic = "clicklog_input";
public static final String out_topic = "clicklog_output";
public static void main(String[] args) {
//1、创建TableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
//2、创建Kafka source table
final Schema schema = Schema.newBuilder()
.column("user", DataTypes.STRING())
.column("url", DataTypes.STRING())
.column("cTime", DataTypes.STRING())
.build();
tableEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")
.schema(schema)
.format("json")
.option("topic",input_topic)
.option("properties.bootstrap.servers","hadoop01:9092")
.option("properties.group.id","clicklog")
.option("scan.startup.mode","latest-offset") //每次都从最早的offsets开始
.build());
//3、创建 Kafka sink table
tableEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("kafka")
.schema(schema)
.format("csv")
.option("topic",out_topic)
.option("properties.bootstrap.servers","hadoop01:9092")
.build());
//4、输出
tableEnv.from("sourceTable")
.select($("user"), $("url"),$("cTime"))
.executeInsert("sinkTable");
}
}
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka
./bin/kafka-server-start.sh config/server.properties
cd /usr/local/kafka
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic clicklog_input --replication-factor 1 --partitions 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic clicklog_output --replication-factor 1 --partitions 1
bin/kafka-topics.sh --zookeeper localhost:2181 --list
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic clicklog_output
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic clicklog_input
{"user":"Mary","url":"./home","cTime":"2023-09-02 12:00:00"}
这里给出一个简单的Table API和SQL数据处理应用程序。假设已经存在一个文本文件“/home/hadoop/stockprice0.csv”,文件内容如下:
stock1,123
stock2,456
stock1,100
stock2,200
代码文件TableAPIandSQLDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class TableAPIandSQLDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 使用SQL语句创建一个输入表sourceTable
String sourceDDL =
"create table sourceTable (" +
"stockId STRING," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice0.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
// 使用SQL语句创建一个输出表sinkTable
tableEnv.executeSql("CREATE TEMPORARY TABLE sinkTable(stockId STRING,price DOUBLE) WITH ('connector' = 'print')");
// 使用Table API创建一个Table对象table1
Table table1 = tableEnv.from("sourceTable");
// 把table1注册成一个临时表
tableEnv.createTemporaryView("myTable1", table1);
// 使用Table API进行查询
Table result1 = table1.groupBy($("stockId")).select($("stockId"), $("price").sum().as("sum-price"));
result1.execute().print();
// 使用SQL语句进行查询
Table result2 = tableEnv.sqlQuery("select * from sourceTable where price>200");
result2.execute().print();
Table result3 = tableEnv.sqlQuery("select * from myTable1 where price>200");
result3.execute().print();
}
}
代码文件OutputToFileDemo1.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class OutputToFileDemo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table sourceTable (" +
"stockId STRING," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice0.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
String sinkDDL =
"create table sinkTable (" +
"stockId STRING," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice2', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
// 使用SQL语句创建一个输出表SinkTable
tableEnv.executeSql(sinkDDL);
// 使用Table API创建一个Table对象table1
Table table1 = tableEnv.from("sourceTable");
// 把Table对象table1写入到输出表SinkTable中
TableResult tableResult = table1.executeInsert("sinkTable");
}
}
代码文件 OutputToFileDemo2.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class OutputToFileDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table sourceTable (" +
"stockId STRING," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice0.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
Table table1 = tableEnv.from("sourceTable");
final Schema schema = Schema.newBuilder()
.column("stockId", DataTypes.STRING())
.column("price", DataTypes.DOUBLE())
.build();
tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "file:///home/hadoop/stockprice")
.format(FormatDescriptor.forFormat("csv")
.option("field-delimiter", ",")
.build())
.build());
// Prepare the insert into pipeline
// 把表对象table1插入到pipeline中
TablePipeline pipeline =table1.insertInto("CsvSinkTable");
// 打印执行过程的明细
pipeline.printExplain();
// 调用pipeline的execute()方法,把表对象table1的数据发送到注册好的CsvSinkTable中
pipeline.execute();
}
}
代码文件OutputToMySQLDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class OutputToMySQLDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table sourceTable (" +
"stockId STRING," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
String sinkDDL =
"create table sinkTable (" +
" stockId STRING, " +
" price DOUBLE " +
") with (" +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://localhost:3306/stockdb', " +
" 'table-name' = 'stockprice', " +
" 'driver' = 'com.mysql.jdbc.Driver', " +
" 'username' = 'root', " +
" 'password' = '123456' )";
// 使用SQL语句创建一个输出表SinkTable
tableEnv.executeSql(sinkDDL);
// 使用Table API创建一个Table对象table1
Table table1 = tableEnv.from("sourceTable");
// 把Table对象table1写入到输出表SinkTable中
TableResult tableResult = table1.executeInsert("sinkTable");
}
}
代码文件DataStreamToTableDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class DataStreamToTableDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个DataStream
DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
//把DataStream转化成一个Table对象
Table inputTable = tableEnv.fromDataStream(dataStream);
// 把Table对象注册成一个视图
tableEnv.createTemporaryView("InputTable", inputTable);
// 对视图进行查询
Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");
// 把insert-only类型的Table对象转换成一个DataStream
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
// 调用DataStream API执行流计算
resultStream.print();
env.execute();
}
}
代码文件DataStreamToTableDemo2.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class DataStreamToTableDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个DataStream
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100));
// 把DataStream转换成一个Table对象
Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
// 把Table注册成一个视图并进行查询
// 这个查询包含了聚合结果,并且聚合结果会发生更新
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
"SELECT name, SUM(score) FROM InputTable GROUP BY name");
// 把updating类型的Table对象转换为一个ChangelogDataStream
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
// 调用DataStream API执行流计算
resultStream.print();
env.execute();
}
}
在创建表的DDL中定义事件时间的方法如下:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 把user_action_time声明为事件时间属性,并且使用5秒延迟的水位线策略
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
在创建表的DDL中定义处理时间的方法如下:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外的字段作为处理时间属性
) WITH (
...
);
代码文件TableAPIDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class TableAPIDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个DataStream
DataStream<String> dataStream = env.fromElements("Flink", "Spark", "Spark", "Flink");
//把DataStream转化成一个Table对象
Table inputTable = tableEnv.fromDataStream(dataStream).as("word");
Table wordCount = inputTable.groupBy($("word"))
.select($("word"), $("word").count().as("cnt"));
wordCount.execute().print();
}
}
fromValues的用法和SQL中的VALUES从句的用法类似,它会从用户提供的行中生成一个表。具体用法如下:
import static org.apache.flink.table.api.Expressions.row;
Table table = tableEnv.fromValues(
row(1, "ABC"),
row(2L, "ABCDE")
);
也可以在创建表对象的时候指定字段名称,具体用法如下:
Table table = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "ABC"),
row(2L, "ABCDE")
);
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.select($("stockId"), $("price"). as("stockPrice"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.select($("*"));
Table stock = tableEnv.from("stockPriceTable");
Table result =stock.as("myStockId,myTimeStamp,myPrice");
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.where($("stockId").isEqual("stock_1"));
Table stock = tableEnv.from("stockPriceTable");
Table result =stock.filter($("stockId").isEqual("stock_1"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.addColumns(concat($("stockId"), "_good"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.addOrReplaceColumns(concat($("stockId"), "_good").as("goodstock"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.dropColumns($("timeStamp"), $("price"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.renameColumns($("stockId").as("id"), $("price").as("stockprice"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.groupBy($("stockId")).select($("stockId"), $("price").sum().as("price_sum"));
Table stock = tableEnv.from("stockPriceTable");
Table result = stock
.window(Tumble.over(lit(5).minutes()).on($("timeStamp")).as("w")) // 定义窗口
.groupBy($("stockId"), $("w")) // //根据键和窗口进行分组
// 访问窗口属性并聚合
.select(
$("stockId"),
$("w").start(),
$("w").end(),
$("w").rowtime(),
$("price").sum().as("price_sum")
);
代码文件GroupByWindowAggregationDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.text.Format;
import java.text.SimpleDateFormat;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;
public class GroupByWindowAggregationDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(new MyWatermarkStrategy());
Table stock = tableEnv.fromDataStream(stockPriceDSWithWatermark, $("timeStamp").rowtime(), $("stockId"), $("price"));
Table result = stock
.window(Tumble.over(lit(10).seconds()).on($("timeStamp")).as("w")) // 定义窗口
.groupBy($("stockId"), $("w")) // //根据键和窗口进行分组
// 访问窗口属性并聚合
.select(
$("stockId"),
$("w").start(),
$("w").end(),
$("w").rowtime(),
$("price").sum().as("price_sum")
);
result.execute().print();
}
public static class MyWatermarkStrategy implements WatermarkStrategy<StockPrice>{
@Override
public WatermarkGenerator<StockPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<StockPrice>(){
long maxOutOfOrderness = 10000L; //设定最大延迟为10秒
long currentMaxTimestamp = 0L;
Watermark a = null;
Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public void onEvent(StockPrice event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp);
a = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
output.emitWatermark(a);
System.out.println("timestamp:" + event.stockId + "," + event.timeStamp + "|" + format.format(event.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 没有使用周期性发送水印,因此这里没有执行任何操作
}
};
}
@Override
public TimestampAssigner<StockPrice> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<StockPrice>() {
@Override
public long extractTimestamp(StockPrice element, long recordTimestamp) {
return element.timeStamp; //从到达消息中提取时间戳;
}
};
}
}
}
nc -lk 9999
stock_1,1602031567000,8.14
stock_1,1602031568000,8.22
stock_1,1602031575000,8.14
stock_1,1602031577000,8.14
stock_1,1602031593000,8.14
Table stock = tableEnv.from("stockPriceTable");
Table result = stock.distinct();
Table left = tableEnv.from("MyTable").select($("a"), $("b"), $("c"));
Table right = tableEnv.from("MyTable").select($("d"), $("e"), $("f"));
Table result = left.join(right)
.where($("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Table left = tableEnv.from("MyTable").select($("a"), $("b"), $("c"));
Table right = tableEnv.from("MyTable").select($("d"), $("e"), $("f"));
Table leftOuterResult = left.leftOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Table rightOuterResult = left.rightOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Table fullOuterResult = left.fullOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.union(right);
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.unionAll(right);
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.intersect(right);
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.intersectAll(right);
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.minus(right);
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.minusAll(right);
Table left = tableEnv.from("Orders1")
Table right = tableEnv.from("Orders2");
Table result = left.select($("a"), $("b"), $("c")).where($("a").in(right));
Table result = tab.orderBy($("a").asc());
Table orders = tableEnv.from("Orders");
orders.insertInto("OutOrders").execute();
代码文件MapFunctionDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class MapFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table stockPriceTable (" +
"stockId STRING," +
"myTimeStamp BIGINT," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
Table inputTable = tableEnv.from("stockPriceTable");
ScalarFunction func = new MyMapFunction();
tableEnv.registerFunction("func", func);
Table result = inputTable
.map(call("func", $("stockId"))).as("stockId", "myStockId");
result.execute().print();
}
public static class MyMapFunction extends ScalarFunction {
public Row eval(String a) {
return Row.of(a, "my-" + a);
}
@Override
public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.ROW(Types.STRING, Types.STRING);
}
}
}
在Linux中新建一个文件“file:///home/hadoop/stockprice.csv”,内容如下:
stock_1,1602031567000,8.17
stock_2,1602031568000,8.22
stock_1,1602031575000,8.14
代码文件FlatMapFunctionDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class FlatMapFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table stockPriceTable (" +
"stockId STRING," +
"myTimeStamp BIGINT," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
Table inputTable = tableEnv.from("stockPriceTable");
TableFunction func = new MyFlatMapFunction();
tableEnv.registerFunction("func", func);
Table result = inputTable
.flatMap(call("func", $("stockId"))).as("a", "b");
result.execute().print();
}
public static class MyFlatMapFunction extends TableFunction<Row> {
public void eval(String str) {
if (str.contains("#")) {
String[] array = str.split("#");
for (int i = 0; i < array.length; ++i) {
collect(Row.of(array[i], array[i].length()));
}
}
}
@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.INT);
}
}
}
在Linux中新建一个文件“file:///home/hadoop/stockprice.csv”,内容如下:
stock#01,1602031567000,8.17
stock#02,1602031568000,8.22
stock#01,1602031575000,8.14
代码文件AggregateFunctionDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class AggregateFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table stockPriceTable (" +
"stockId STRING," +
"myTimeStamp BIGINT," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
Table inputTable = tableEnv.from("stockPriceTable");
AggregateFunction myAggFunc = new MyMinMax();
tableEnv.registerFunction("myAggFunc", myAggFunc);
Table result = inputTable
.groupBy($("stockId"))
.aggregate(call("myAggFunc", $("price")).as("min_price", "max_price"))
.select($("stockId"), $("min_price"), $("max_price"));
result.execute().print();
}
public static class MyMinMaxAcc {
public double min = Double.MAX_VALUE;
public double max = 0D;
}
public static class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> {
public void accumulate(MyMinMaxAcc acc, double value) {
if (value < acc.min) {
acc.min = value;
}
if (value > acc.max) {
acc.max = value;
}
}
@Override
public MyMinMaxAcc createAccumulator() {
return new MyMinMaxAcc();
}
public void resetAccumulator(MyMinMaxAcc acc) {
acc.min = 0D;
acc.max = 0D;
}
@Override
public Row getValue(MyMinMaxAcc acc) {
return Row.of(acc.min, acc.max);
}
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(Types.DOUBLE, Types.DOUBLE);
}
}
}
在Linux中新建一个文件file:///home/hadoop/stockprice.csv,内容如下:
stock_1,1602031567000,8.17
stock_2,1602031568000,6.22
stock_1,1602031575000,5.14
stock_2,1602031573000,3.29
代码文件WindowAggregateFunctionDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*;
public class WindowAggregateFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table stockPriceTable (" +
"stockId STRING," +
"ts BIGINT," +
"price DOUBLE," +
"ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3)," +
"WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
Table inputTable = tableEnv.from("stockPriceTable");
AggregateFunction myAggFunc = new MyMinMax();
tableEnv.registerFunction("myAggFunc", myAggFunc);
Table result = inputTable
.window(Tumble.over(lit(5).seconds())
.on($("ts_ltz"))
.as("w"))
.groupBy($("stockId"), $("w"))
.aggregate(call("myAggFunc", $("price")).as("min_price", "max_price"))
.select($("stockId"), $("min_price"), $("max_price"),$("w").start(), $("w").end());
result.execute().print();
}
public static class MyMinMaxAcc {
public double min = Double.MAX_VALUE;
public double max = 0D;
}
public static class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> {
public void accumulate(MyMinMaxAcc acc, double value) {
if (value < acc.min) {
acc.min = value;
}
if (value > acc.max) {
acc.max = value;
}
}
@Override
public MyMinMaxAcc createAccumulator() {
return new MyMinMaxAcc();
}
public void resetAccumulator(MyMinMaxAcc acc) {
acc.min = 0D;
acc.max = 0D;
}
@Override
public Row getValue(MyMinMaxAcc acc) {
return Row.of(acc.min, acc.max);
}
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(Types.DOUBLE, Types.DOUBLE);
}
}
}
在Linux中新建一个文件file:///home/hadoop/stockprice.csv,内容如下:
stock_1,1602031567000,8.14
stock_2,1602031568000,18.22
stock_2,1602031575000,8.14
stock_1,1602031577000,18.21
stock_1,1602031593000,8.98
cd /usr/local/flink
./bin/start-cluster.sh
./bin/sql-client.sh
Flink SQL> SHOW DATABASES;
Flink SQL> EXIT;
Flink SQL> show databases;
Flink SQL> exit;
Flink SQL> QUIT;
cd /usr/local/flink
./bin/stop-cluster.sh
SET 'sql-client.execution.result-mode' = 'table';
SET 'sql-client.execution.result-mode' = 'changelog';
SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL> SET 'sql-client.execution.result-mode' = 'table';
Flink SQL> SELECT ' Hello World';
Flink SQL> SET 'sql-client.execution.result-mode' = 'changelog';
Flink SQL> SELECT ' Hello World';
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL> SELECT ' Hello World';
CREATE DATABASE mydatabase;
SHOW DATABASES;
SHOW CURRENT DATABASE;
USE mydatabase;
DROP DATABASE mydatabase;
CREATE DATABASE mydatabase;
USE mydatabase;
CREATE TABLE stockprice(
stockId STRING,
myTimeStamp BIGINT,
price DOUBLE
) WITH (
'connector' = 'print'
);
SHOW TABLES;
DESC stockprice;
DROP TABLE stockprice;
CREATE TABLE source (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'datagen', --连接器类型是datagen,会自动生成数据
'rows-per-second'='5', --每秒生成几行数据
'fields.f_sequence.kind'='sequence', --设置f_sequence字段的类型为序列类型
'fields.f_sequence.start'='1', --设置f_sequence字段的起始值是1
'fields.f_sequence.end'='1000', --设置f_sequence字段的终止值是1000
'fields.f_random.min'='1', --设置f_random字段的随机数的最小值
'fields.f_random.max'='1000', --设置f_random字段的随机数的最大值
'fields.f_random_str.length'='5' --设置f_random_str字段的随机字符串的长度
);
SELECT * FROM source;
SET sql-client.execution.result-mode=tableau;
SELECT * FROM source;
SET 'sql-client.execution.result-mode' = 'changelog';
CREATE TABLE sink(
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink SELECT * FROM source;
在Linux系统中新建一个文件“file:///home/hadoop/stockprice.csv”,内容如下:
stock_1,1602031567000,8.14
stock_2,1602031568000,18.22
stock_2,1602031575000,8.14
stock_1,1602031577000,18.21
stock_1,1602031593000,8.98
CREATE TABLE stockPriceTable (
stockId STRING,
ts BIGINT,
price DOUBLE,
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND
) WITH (
'connector' = 'filesystem',
'path' = 'file:///home/hadoop/stockprice.csv',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true'
);
SET 'sql-client.execution.result-mode' = 'tableau';
SELECT stockId,price FROM stockPriceTable;
SELECT * FROM stockPriceTable;
SELECT stockId, price AS stockprice FROM stockPriceTable;
SELECT * FROM stockPriceTable WHERE stockId = 'stock_1';
SELECT * FROM stockPriceTable WHERE price > 10;
SELECT stockId, AVG(price) as avg_price
FROM stockPriceTable
GROUP BY stockId;
SELECT stockId, AVG(price)
FROM stockPriceTable
GROUP BY stockId, TUMBLE(ts_ltz, INTERVAL '5' SECOND);
SELECT
stockId,
sum(price) as priceSum,
window_start,
window_end
FROM TABLE(
TUMBLE(TABLE stockPriceTable, DESCRIPTOR(ts_ltz), INTERVAL '5' SECOND )
)
GROUP BY window_start, window_end, stockId;
SELECT
stockId,
sum(price) as priceSum,
window_start,
window_end
FROM TABLE(
TUMBLE(
DATA => TABLE stockPriceTable,
TIMECOL => DESCRIPTOR(ts_ltz),
SIZE => INTERVAL '5' SECOND))
GROUP BY window_start, window_end, stockId;
SELECT
stockId,
sum(price) as priceSum,
window_start,
window_end
FROM TABLE(
HOP(TABLE stockPriceTable, DESCRIPTOR(ts_ltz), INTERVAL '5' SECOND, INTERVAL '10' SECOND)
)
GROUP BY window_start, window_end, stockId;
SELECT
stockId,
sum(price) as priceSum,
window_start,
window_end
FROM TABLE(
HOP(
DATA => TABLE stockPriceTable,
TIMECOL => DESCRIPTOR(ts_ltz),
SLIDE => INTERVAL '5' SECOND,
SIZE => INTERVAL '10' SECOND))
GROUP BY window_start, window_end, stockId;
SELECT
stockId,
ts_ltz,
COUNT(stockId) OVER (PARTITION BY stockId ORDER BY ts_ltz range BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) cnt
FROM stockPriceTable;
SELECT
stockId,
ts_ltz,
COUNT(stockId) OVER w AS cnt
FROM stockPriceTable
WINDOW w AS (
PARTITION BY stockId
ORDER BY ts_ltz
RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
);
SELECT
stockId,
ts_ltz,
AVG(price) OVER (PARTITION BY stockId ORDER BY ts_ltz ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) avgPrice
FROM stockPriceTable;
SELECT
stockId,
ts_ltz,
AVG(price) OVER w AS avgPrice
FROM stockPriceTable
WINDOW w AS (
PARTITION BY stockId
ORDER BY ts_ltz
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
);
SELECT
stockId,
price,
rownum
FROM
(
SELECT
stockId,
ts_ltz,
price,
ROW_NUMBER() OVER (PARTITION BY stockId ORDER BY price DESC) AS rownum
FROM stockPriceTable
)
WHERE rownum<=2;
SELECT DISTINCT stockId FROM stockPriceTable;
SELECT AVG(price)
FROM stockPriceTable
GROUP BY stockId
HAVING AVG(price) > 20;
SELECT *
FROM stockPriceTable INNER JOIN stock_info
ON stockPriceTable.stockId = stock_info.stockId;
SELECT * FROM stockPriceTable LEFT JOIN stock_info
ON stockPriceTable.stockId = stock_info.stockId;
SELECT * FROM stockPriceTable RIGHT JOIN stock_info
ON stockPriceTable.stockId = stock_info.stockId;
SELECT * FROM stockPriceTable FULL OUTER JOIN stock_info
ON stockPriceTable.stockId = stock_info.stockId;
SELECT *
FROM (
(SELECT stockId FROM stockPriceTable WHERE stockId='stock_1')
UNION
(SELECT stockId FROM stockPriceTable WHERE stockId='stock_2')
);
SELECT *
FROM (
(SELECT stockId FROM stockPriceTable WHERE stockId='stock_1')
UNION ALL
(SELECT stockId FROM stockPriceTable WHERE stockId='stock_2')
);
SELECT *
FROM (
(SELECT stockId FROM stockPriceTable WHERE price > 10.0)
INTERSECT
(SELECT stockId FROM stockPriceTable WHERE stockId='stock_1')
);
SELECT *
FROM (
(SELECT stockId FROM stockPriceTable WHERE price > 10.0)
EXCEPT
(SELECT stockId FROM stockPriceTable WHERE stockId='stock_1')
);
SELECT stockId, price
FROM stockPriceTable
WHERE stockId IN (
SELECT stockId FROM newstock
);
SELECT stockId, price
FROM stockPriceTable
WHERE stockId EXISTS (
SELECT stockId FROM newstock
);
SELECT *
FROM stockPriceTable
ORDER BY ts_ltz;
SELECT *
FROM stockPriceTable
ORDER BY ts_ltz
LIMIT 3;
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc/3.1.1-1.17
https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33
mysql> CREATE DATABASE stockdb;
mysql> USE stockdb;
mysql> CREATE TABLE stockprice (stockId VARCHAR(20),price DOUBLE);
mysql> INSERT INTO stockprice VALUES("stock1",123);
mysql> INSERT INTO stockprice VALUES("stock2",456);
mysql> SELECT * FROM stockprice;
SET sql-client.execution.result-mode=tableau;
SHOW CATALOGS;
USE catalog default_catalog;
SHOW DATABASES;
CREATE CATALOG mysql_catalog WITH (
'type' = 'jdbc',
'default-database' = 'stockdb',
'username' = 'root',
'password' = '123456',
'base-url' = 'jdbc:mysql://localhost:3306'
);
USE catalog mysql_catalog;
SHOW DATABASES;
USE stockdb;
SHOW TABLES;
SELECT * FROM stockprice;
INSERT INTO stockprice VALUES('stock1',789);
mysql> SELECT * FROM stockprice;
在Linux中新建一个文件“file:///home/hadoop/stockprice.csv”,内容如下:
stock_1,1602031567000,8.14
stock_2,1602031568000,18.22
stock_2,1602031575000,8.14
stock_1,1602031577000,18.21
stock_1,1602031593000,8.98
代码文件ScalarFunctionDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class ScalarFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table stockPriceTable (" +
"stockId STRING," +
"myTimeStamp BIGINT," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
Table inputTable = tableEnv.from("stockPriceTable");
//注册函数
tableEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
//在Table API里调用注册好的函数
Table table1 = inputTable.select(call("SubstringFunction",$("stockId"),6,7));
table1.execute().print();
//在SQL里调用注册好的函数
Table table2 = tableEnv.sqlQuery("SELECT SubstringFunction(stockId, 6, 7) FROM stockPriceTable");
table2.execute().print();
}
//用户自定义函数
public static class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end){
return s.substring(begin, end);
}
}
}
代码文件TableFunctionDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class TableFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table stockPriceTable (" +
"stockId STRING," +
"myTimeStamp BIGINT," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
Table inputTable = tableEnv.from("stockPriceTable");
tableEnv.createTemporarySystemFunction("SplitFunction", MySplitFunction.class);
Table result = inputTable
.leftOuterJoinLateral(call("SplitFunction", $("stockId")))
.select($("stockId"), $("word"), $("length"));
result.execute().print();
}
//通过注解指定返回类型
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class MySplitFunction extends TableFunction<Row>{
public void eval(String str) {
for (String s : str.split("_")) {
//使用collect(...)把行发送(emit)出去
collect(Row.of(s, s.length()));
}
}
}
}
代码文件TableFunctionDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class TableFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL =
"create table stockPriceTable (" +
"stockId STRING," +
"myTimeStamp BIGINT," +
"price DOUBLE" +
") with (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///home/hadoop/stockprice.csv', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ',', " +
" 'csv.ignore-parse-errors' = 'true' " +
" )";
tableEnv.executeSql(sourceDDL);
Table inputTable = tableEnv.from("stockPriceTable");
// 注册函数
tableEnv.createTemporarySystemFunction("MyCountFunction",MyCountFunction.class);
// 在Table API中调用函数
Table table1 = inputTable
.groupBy($("stockId"))
.select($("stockId"), call("MyCountFunction"));
table1.execute().print();
}
public static class MyCountAccumulator {
public long count = 0L;
}
public static class MyCountFunction extends AggregateFunction<Long, MyCountAccumulator> {
// 获取结果
@Override
public Long getValue(MyCountAccumulator myCountAccumulator) {
return myCountAccumulator.count;
}
// 初始化累加器
@Override
public MyCountAccumulator createAccumulator() {
return new MyCountAccumulator();
}
// 设置累加方法,每次新到达一条数据,count值就增加1
public void accumulate(MyCountAccumulator myCountAccumulator){
myCountAccumulator.count = myCountAccumulator.count + 1;
}
// 合并累加结果
public void merge(MyCountAccumulator myCountAccumulator, Iterable<MyCountAccumulator> it) {
for (MyCountAccumulator a : it) {
myCountAccumulator.count += a.count;
}
}
}
}
代码文件TableAggregateFunctionDemo.java
package cn.edu.xmu;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class TableAggregateFunctionDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStreamSource<Integer> numDS = env.fromElements(4, 7, 1, 3, 12, 9, 5);
Table numTable = tableEnv.fromDataStream(numDS).as("num");
// 注册函数
tableEnv.createTemporarySystemFunction("Top2", Top2.class);
// 在Table API中调用函数
// call调用函数返回二元组,使用as方法把二元组的第1个元素赋值给value列,把二元组的第2个元素赋值给rank列
Table result = numTable
.flatAggregate(
call("Top2",$("num")).as("value","rank")
)
.select($("value"),$("rank"));
result.execute().print();
}
// 继承TableAggregateFunction类
// TableAggregateFunction的第1个参数类型是Tuple2<Integer,Integer>,表示输出数据类型
// 输出数据时1个二元组,比如(12,1)表示12这个数排名第1位
// 第2个参数类型是Tuple2<Integer,Integer>,里面封装了两个最大的数
public static class Top2 extends TableAggregateFunction<Tuple2<Integer,Integer>,Tuple2<Integer,Integer>>{
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0,0);
}
// 每来一条数据就调用一次accumulate方法,比较值的大小,并把最大的前2个数保存到acc中
// 第1个参数类型Tuple2<Integer,Integer>表示累加器的类型,第2个参数类型Integer表示到达的数据的类型
public void accumulate(Tuple2<Integer,Integer> acc, Integer num){
if (num > acc.f0){
// 新来的数据变成排名第1,原来的数据变成排名第2
acc.f1 = acc.f0;
acc.f0 = num;
}else if (num > acc.f1){
// 新来的数据变成排名第2,原来排在第2位的数据丢弃
acc.f1 = num;
}
}
//输出结果,输出格式是(数值,排名)
// 采集器Collector的类型是<Tuple2<Integer,Integer>,表述输出数据的类型
public void emitValue(Tuple2<Integer,Integer> acc, Collector<Tuple2<Integer,Integer>> out){
if (acc.f0 != 0){
out.collect(Tuple2.of(acc.f0,1));
}
if (acc.f1 != 0){
out.collect(Tuple2.of(acc.f1,2));
}
}
}
}