厦门大学林子雨编著《Flink编程基础(Java版)》教材中的命令行和代码(教材官网)
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Flink编程基础(Java版)》教材中的所有命令行和代码
文件名为FileSourceDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FileSourceDemo {
public FileSourceDemo(){}
public static void main(String[] args) throws Exception {
//1.构建流式执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//2.创建数据源,需要事先准备好一个word.txt文件
DataStreamSource<String> source = env.readTextFile("file:///home/hadoop/word.txt");
//3.指定对接收的数据进行转换操作的逻辑
//3.1 使用flatMap算子进行处理
DataStream<String> flatMapStream = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] words = s.split(" ");
for (String word : words) {
collector.collect(word);
}
}
});
//3.2 使用map算子进行转换
SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//3.3 使用keyBy算子进行单词分组
KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//3.4 使用sum算子进行汇总求和
SingleOutputStreamOperator<Tuple2<String, Integer>> sumStream = keyedStream.sum(1);
//4.指定数据计算的输出结果方式
sumStream.print();
//5.程序触发执行
env.execute();
}
}
pom.xml文件
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
文件FileSourceDemo2.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FileSourceDemo2 {
public FileSourceDemo2(){}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
FileSource<String> fileSource = FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("file:///home/hadoop/word.txt")
).build();
DataStreamSource<String> fileSourceStream = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource");
fileSourceStream.print();
env.execute();
}
}
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0
在pom.xml中增加如下两个依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
</dependency>
cd /usr/local/hadoop
./sbin/start-dfs.sh
cd /usr/local/flink
./bin/start-cluster.sh
cd /usr/local/flink
./bin/flink run \
> --class cn.edu.xmu.FileSourceDemo \
> ~/IdeaProjects/FlinkWordCount/target/wordcount-1.0.jar
nc -lk 9999
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka
./bin/kafka-server-start.sh config/server.properties
cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 --topic wordsendertest
#这个Topic叫wordsendertest,2181是Zookeeper默认的端口号,--partitions是Topic里面的分区数,--replication-factor是备份的数量,在Kafka集群中使用,由于这里是单机版,所以不用备份
#可以用list列出所有创建的Topic,来查看上面创建的Topic是否存在
./bin/kafka-topics.sh --list --zookeeper localhost:2181
./bin/kafka-console-producer.sh --broker-list localhost:9092 \
> --topic wordsendertest
cd /usr/local/kafka
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic wordsendertest --from-beginning
文件KafkaSourceDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaSourceDemo {
public KafkaSourceDemo(){}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop01:9092")
.setGroupId("dblab")
.setTopics("wordsendertest")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
DataStreamSource<String> kafkaSourceStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource");
kafkaSourceStream.print();
env.execute();
}
}
在IDEA中编辑pom.xml文件,文件内容需要在4.2.3节中的pom.xml的基础上,继续增加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.0
文件DataGeneratorSourceDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataGeneratorSourceDemo {
public DataGeneratorSourceDemo(){}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "Current Number is:" + value;
}
},
5,
RateLimiterStrategy.perSecond(1),
Types.STRING
);
DataStreamSource<String> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");
dataGeneratorSourceStream.print();
env.execute();
}
}
在IDEA中编辑pom.xml文件,文件内容需要在4.2.3节中的pom.xml的基础上,继续增加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>
代码文件StockPrice.java
package cn.edu.xmu;
public class StockPrice {
public String stockId;
public Long timeStamp;
public Double price;
// 一定要提供一个 空参 的构造器(反射的时候要使用)
public StockPrice() {
}
public StockPrice(String stockId, Long timeStamp, Double price) {
this.stockId = stockId;
this.timeStamp = timeStamp;
this.price = price;
}
public String getStockId() {
return stockId;
}
public void setStockId(String stockId) {
this.stockId = stockId;
}
public Long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(Long timeStamp) {
this.timeStamp = timeStamp;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return "StockPrice{" +
"stockId=" + stockId +
", timeStamp='" + timeStamp + '\'' +
", price=" + price +
'}';
}
}
代码文件MyGeneratorFunction.java
package cn.edu.xmu;
import cn.edu.xmu.StockPrice;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
public class MyGeneratorFunction implements GeneratorFunction<Long, StockPrice> {
// 定义随机数数据生成器
public RandomDataGenerator generator;
// 初始化随机数数据生成器
@Override
public void open(SourceReaderContext readerContext) throws Exception {
generator = new RandomDataGenerator();
}
@Override
public StockPrice map(Long value) throws Exception {
// 使用随机数数据生成器来创建StockPrice例
String stockId = "stock_"+value.toString();
StockPrice stockPrice = new StockPrice(stockId
, System.currentTimeMillis()
, generator.nextInt(1,100)*0.1
);
return stockPrice;
}
}
代码文件ReadDataGeneratorSource.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReadDataGeneratorSource {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.自定义数据生成器Source
DataGeneratorSource<StockPrice> dataGeneratorSource = new DataGeneratorSource<>(
// 指定GeneratorFunction 实现类
new MyGeneratorFunction(),
// 指定输出数据的总行数
5,
// 指定每秒发射的记录数
RateLimiterStrategy.perSecond(1),
// 指定返回值类型
TypeInformation.of(StockPrice.class) // 将Java的StockPrice封装成到TypeInformation
);
// 3.读取dataGeneratorSource中的数据
DataStreamSource<StockPrice> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource
, WatermarkStrategy.noWatermarks() //指定水位线生成策略
, "dataGeneratorSource");
dataGeneratorSourceStream.print();
env.execute();
}
}
代码文件MapDemo1.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MapDemo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
SingleOutputStreamOperator<String> mapDS =
stockPriceDS.map(
new MapFunction<StockPrice, String>() {
@Override
public String map(StockPrice value) throws Exception {
return value.stockId;
}
}
);
mapDS.print();
env.execute();
}
}
代码文件MapDemo2.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MapDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
SingleOutputStreamOperator<String> mapDS =
stockPriceDS.map(stockPrice->stockPrice.stockId);
mapDS.print();
env.execute();
}
}
代码文件MapDemo3.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MapDemo3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
SingleOutputStreamOperator<String> mapDS =
stockPriceDS.map(new MyMapFunction());
mapDS.print();
env.execute();
}
public static class MyMapFunction implements MapFunction<StockPrice,String>{
@Override
public String map(StockPrice value) throws Exception{
return value.stockId;
}
}
}
代码文件FlatMapDemo1.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlatMapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
SingleOutputStreamOperator<String> flatMapDS =
stockPriceDS.flatMap(new FlatMapFunction<StockPrice, String>() {
@Override
public void flatMap(StockPrice value, Collector<String> out) throws Exception {
if("stock_1".equals(value.stockId)){
out.collect(value.stockId);
}else if("stock_2".equals(value.stockId)){
out.collect(value.stockId);
out.collect(value.price.toString());
}
}
});
flatMapDS.print();
env.execute();
}
}
代码文件FlatMapDemo2.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlatMapDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
SingleOutputStreamOperator<String> flatMapDS =
stockPriceDS.flatMap((StockPrice value,Collector<String> out) ->{
if("stock_1".equals(value.stockId)){
out.collect(value.stockId);
}else if("stock_2".equals(value.stockId)){
out.collect(value.stockId);
out.collect(value.price.toString());
}
}
);
flatMapDS.returns(Types.STRING);
flatMapDS.print();
env.execute();
}
}
代码文件FlatMapDemo3.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlatMapDemo3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
SingleOutputStreamOperator<String> flatMapDS =
stockPriceDS.flatMap(new MyFlatMapFunction());
flatMapDS.print();
env.execute();
}
public static class MyFlatMapFunction implements FlatMapFunction<StockPrice,String>{
@Override
public void flatMap(StockPrice value,Collector<String> out) throws Exception{
if("stock_1".equals(value.stockId)){
out.collect(value.stockId);
}else if("stock_2".equals(value.stockId)){
out.collect(value.stockId);
out.collect(value.price.toString());
}
}
}
}
代码文件FilterDemo1.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FilterDemo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
SingleOutputStreamOperator<StockPrice> filterDS =
stockPriceDS.filter(new FilterFunction<StockPrice>() {
@Override
public boolean filter(StockPrice value) throws Exception {
return "stock_1".equals(value.stockId);
}
}
);
filterDS.print();
env.execute();
}
}
代码文件FilterDemo2.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FilterDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
SingleOutputStreamOperator<StockPrice> filterDS =
stockPriceDS.filter(stockPrice->"stock_1".equals(stockPrice.stockId));
filterDS.print();
env.execute();
}
}
代码文件FilterDemo3.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FilterDemo3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
SingleOutputStreamOperator<StockPrice> filterDS =
stockPriceDS.filter(new MyFilterFunction());
filterDS.print();
env.execute();
}
public static class MyFilterFunction implements FilterFunction<StockPrice>{
@Override
public boolean filter(StockPrice value) throws Exception{
return "stock_1".equals(value.stockId);
}
}
}
代码文件KeyByDemo.java
package cn.edu.xmu;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyByDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D),
new StockPrice("stock_1", 10004L, 5.2D)
);
KeyedStream<StockPrice, String> stockPriceKeyedStream = stockPriceDS.keyBy(new KeySelector<StockPrice, String>() {
@Override
public String getKey(StockPrice value) throws Exception {
return value.stockId;
}
});
stockPriceKeyedStream.print();
env.execute();
}
}
代码文件SimpleAggregateDemo.java
package cn.edu.xmu;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SimpleAggregateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D),
new StockPrice("stock_1", 10004L, 5.2D)
);
KeyedStream<StockPrice, String> stockPriceKeyedStream =
stockPriceDS.keyBy(new KeySelector<StockPrice, String>() {
@Override
public String getKey(StockPrice value) throws Exception {
return value.stockId;
}
});
SingleOutputStreamOperator<StockPrice> result =
stockPriceKeyedStream.min("price");
result.print();
env.execute();
}
}
代码文件ReduceDemo1.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReduceDemo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D),
new StockPrice("stock_1", 10004L, 5.2D)
);
KeyedStream<StockPrice, String> stockPriceKeyedStream =
stockPriceDS.keyBy(new KeySelector<StockPrice, String>() {
@Override
public String getKey(StockPrice value) throws Exception {
return value.stockId;
}
});
SingleOutputStreamOperator<StockPrice> result =
stockPriceKeyedStream.reduce(new ReduceFunction<StockPrice>() {
@Override
public StockPrice reduce(StockPrice value1, StockPrice value2) throws Exception {
return new StockPrice(
value1.stockId,
value1.timeStamp,
value1.price+value2.price);
}
});
result.print();
env.execute();
}
}
代码文件ReduceDemo2.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReduceDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D),
new StockPrice("stock_1", 10004L, 5.2D)
);
KeyedStream<StockPrice, String> stockPriceKeyedStream =
stockPriceDS.keyBy(new KeySelector<StockPrice, String>() {
@Override
public String getKey(StockPrice value) throws Exception {
return value.stockId;
}
});
SingleOutputStreamOperator<StockPrice> result =
stockPriceKeyedStream.reduce(
(value1,value2)->new StockPrice(
value1.stockId,
value1.timeStamp,
value1.price+value2.price));
result.print();
env.execute();
}
}
代码文件ReduceDemo3.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReduceDemo3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D),
new StockPrice("stock_1", 10004L, 5.2D)
);
KeyedStream<StockPrice, String> stockPriceKeyedStream =
stockPriceDS.keyBy(new KeySelector<StockPrice, String>() {
@Override
public String getKey(StockPrice value) throws Exception {
return value.stockId;
}
});
SingleOutputStreamOperator<StockPrice> result =
stockPriceKeyedStream.reduce(new MyReduceFunction());
result.print();
env.execute();
}
public static class MyReduceFunction implements ReduceFunction<StockPrice>{
@Override
public StockPrice reduce(StockPrice value1, StockPrice value2) throws Exception {
return new StockPrice(
value1.stockId,
value1.timeStamp,
value1.price+value2.price);
}
}
}
代码文件PartitionDemo1.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PartitionDemo1 {
public PartitionDemo1(){}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 为了看到重分区效果,必须把并行度设置为大于1的值
env.setParallelism(2);
DataStreamSource<String> source = env.socketTextStream("hadoop01",9999);
source.shuffle().print();
env.execute();
}
}
代码文件SplitByFilterDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SplitByFilterDemo {
public SplitByFilterDemo(){}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source = env.fromElements(1,2,3,4,5,6,7,8);
SingleOutputStreamOperator<Integer> evenDS = source.filter(value -> value % 2 == 0);
SingleOutputStreamOperator<Integer> oddDS = source.filter(value -> value % 2 != 0);
evenDS.print("偶数流");
oddDS.print("奇数流");
env.execute();
}
}
代码文件SplitBySideOutputDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SplitBySideOutputDemo {
public SplitByFilterDemo(){}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
// 新建标签对象,有两个参数,第1个参数表示标签名称,第2个参数表示放入侧输出流的数据的类型
OutputTag<StockPrice> stock1Tag =
new OutputTag<>("stock_1", Types.POJO(StockPrice.class));
OutputTag<StockPrice> stock2Tag =
new OutputTag<>("stock_2", Types.POJO(StockPrice.class));
SingleOutputStreamOperator<StockPrice> processResult
= stockPriceDS.process(new ProcessFunction<StockPrice, StockPrice>() {
@Override
public void processElement(StockPrice value, ProcessFunction<StockPrice, StockPrice>.Context ctx, Collector<StockPrice> out) throws Exception {
String stockId = value.getStockId();
if ("stock_1".equals(stockId)) {
// 如果ID是stock_1,就放到侧输出流stock_1中
// output方法有两个参数,第1个参数是标签,第2个参数是数据本身
ctx.output(stock1Tag, value);
} else if ("stock_2".equals(stockId)) {
// 如果ID是stock_2,就放到侧输出流stock_2中
ctx.output(stock2Tag, value);
} else {
// 对于其他ID,仍然放在主流中
out.collect(value);
}
}
});
// 从主流中根据标签获取侧输出流
SideOutputDataStream<StockPrice> sideOutputStock1 =
processResult.getSideOutput(stock1Tag);
// 从主流中根据标签获取侧输出流
SideOutputDataStream<StockPrice> sideOutputStock2 =
processResult.getSideOutput(stock2Tag);
// 打印主流
processResult.print("主流");
// 打印侧输出流stock_1
sideOutputStock1.print("侧输出流stock_1");
// 打印侧输出流stock_2
sideOutputStock2.print("侧输出流stock_2");
env.execute();
}
}
代码文件UnionDataStreamDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class UnionDataStreamDemo {
public UnionDataStreamDemo(){}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> sourceDS1 = env.fromElements(1,3,5,7,9);
DataStreamSource<Integer> sourceDS2 = env.fromElements(2,4,6,8,10);
DataStreamSource<Integer> sourceDS3 = env.fromElements(11,12,13,14,15);
DataStream<Integer> unionResult = sourceDS1.union(sourceDS2).union(sourceDS3);
unionResult.print();
env.execute();
}
}
代码文件ConnectDataStreamDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
public class ConnectDataStreamDemo {
public ConnectDataStreamDemo(){}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> sourceDS1 = env.fromElements(1,2,3);
DataStreamSource<String> sourceDS2 = env.fromElements("a","b","c");
ConnectedStreams<Integer, String> connectResult = sourceDS1.connect(sourceDS2);
// CoMapFunction有3个参数,第1个参数表述第1个流的数据类型
// 第2个参数表示第2个流的数据类型,第3个参数表示输出数据的类型
SingleOutputStreamOperator<String> result =
connectResult.map(new CoMapFunction<Integer, String, String>() {
// 针对第1个流的处理逻辑
@Override
public String map1(Integer value) throws Exception {
return value.toString();
}
// 针对第2个流的处理逻辑
@Override
public String map2(String value) throws Exception {
return value;
}
});
result.print();
env.execute();
}
}
代码文件ConnectDataStreamExample.java
package cn.edu.xmu;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConnectDataStreamExample {
public ConnectDataStreamDemo(){}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Tuple2<Integer, String>> sourceDS1 = env.fromElements(
Tuple2.of(1,"xiaoming"),
Tuple2.of(2,"xiaowang")
);
DataStreamSource<Tuple3<Integer, String, Integer>> sourceDS2 = env.fromElements(
Tuple3.of(1,"math",98),
Tuple3.of(1,"english",97),
Tuple3.of(2,"math",94),
Tuple3.of(2,"english",88)
);
ConnectedStreams<Tuple2<Integer, String>,Tuple3<Integer, String, Integer>> connectResult = sourceDS1.connect(sourceDS2);
/**
* 实现根据学号字段相互匹配的效果
* 对于两个流,谁的数据先到达是不一定的
* 对于每个流,当有数据到达时,就保存到一个变量中
* 变量采用HashMap格式,key是学号,value是一个List,List里面保存了流的数据
* 每个流有数据到达的时候,一方面要保存到变量中,另一方面要去另一个流查找是否有发生匹配的数据
*/
SingleOutputStreamOperator<String> processResult = connectResult.process(
// CoProcessFunction有3个参数,第1个参数表示第1个数据流的数据类型,第2个参数表示第2个数据流的数据类型,第3个表示输出的数据类型
new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
// 定义HashMap,用来保存到达的流数据
Map<Integer, List<Tuple2<Integer, String>>> ds1Cache = new HashMap<>();
Map<Integer, List<Tuple3<Integer, String, Integer>>> ds2Cache = new HashMap<>();
/**
* 第1个流的梳理逻辑
* @param value 第1个流的数据
* @param ctx 上下文
* @param out 采集器
*/
@Override
public void processElement1(Tuple2<Integer, String> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
Integer sno = value.f0;
// 第1个流的数据到达,保存到变量中
if (!ds1Cache.containsKey(sno)) {
// 如果key不存在,就把这条数据当成第1条数据存进去
List<Tuple2<Integer, String>> ds1Values = new ArrayList<>();
ds1Values.add(value);
ds1Cache.put(sno, ds1Values);
} else {
// 如果key存在,表示不是第1条数据,直接添加到List中
ds1Cache.get(sno).add(value);
}
// 去ds2Cache查找是否存在匹配的学号,匹配上就输出,没有匹配上就不输出
if (ds2Cache.containsKey(sno)) {
for (Tuple3<Integer, String, Integer> ds2Element : ds2Cache.get(sno)) {
out.collect(Tuple4.of(value.f0,value.f1,ds2Element.f1,ds2Element.f2).toString());
}
}
}
/**
* 第2个流的梳理逻辑
* @param value 第2流的数据
* @param ctx 上下文
* @param out 采集器
*/
@Override
public void processElement2(Tuple3<Integer, String, Integer> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
Integer sno = value.f0;
// 第2个流的数据到达,保存到变量中
if (!ds2Cache.containsKey(sno)) {
// 如果key不存在,就把这条数据当成第1条数据存进去
List<Tuple3<Integer, String, Integer>> ds2Values = new ArrayList<>();
ds2Values.add(value);
ds2Cache.put(sno, ds2Values);
} else {
// 如果key存在,表示不是第1条数据,直接添加到List中
ds2Cache.get(sno).add(value);
}
// 去ds1Cache查找是否存在匹配的学号,匹配上就输出,没有匹配上就不输出
if (ds1Cache.containsKey(sno)) {
for (Tuple2<Integer, String> ds1Element : ds1Cache.get(sno)) {
out.collect(Tuple4.of(ds1Element.f0,ds1Element.f1,value.f1,value.f2).toString());
}
}
}
}
);
processResult.print();
env.execute();
}
}
如果要使用FileSink,需要在4.2.3节中的pom.xml文件的基础上,再增加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
代码文件FileSinkDemo1.java
package cn.edu.xmu;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FileSinkDemo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7D),
new StockPrice("stock_2", 10002L, 4.5D),
new StockPrice("stock_3", 10003L, 8.2D)
);
FileSink<StockPrice> fileSink =
FileSink.<StockPrice>forRowFormat(
new Path("file:///home/hadoop"),
new SimpleStringEncoder<>("UTF-8"))
.build();
stockPriceDS.sinkTo(fileSink);
env.execute();
}
}
如果要写入HDFS文件,只需要把上面代码中的new Path("file:///home/hadoop")修改为“new Path("hdfs://hadoop01:9000/")”,并在pom.xml中增加如下两个依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
</dependency>
代码文件KafkaSinkDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hadoop01:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("sinkKafka")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
).build();
source.sinkTo(kafkaSink);
env.execute();
}
}
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka
./bin/kafka-server-start.sh config/server.properties
cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper hadoop01:2181 \
> --replication-factor 1 --partitions 1 --topic sinkKafka
nc -lk 9999
cd /usr/local/kafka
./bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 \
> --topic sinkKafka --from-beginning
要想让Flink输出数据到MySQL数据库,需要在pom.xml中添加如下两个依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.1-1.17</version>
</dependency>
在MySQL Shell中执行如下命令创建数据库和表:
mysql> create database flinkdb;
mysql> use flinkdb;
mysql> create table stockprice(stockId char(10),timeStamp bigint(20),price double(6,2));
代码文件MySQLSinkDemo.java
package cn.edu.xmu;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MySQLSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
new StockPrice("stock_1", 10001L, 9.7),
new StockPrice("stock_2", 10002L, 4.5),
new StockPrice("stock_3", 10003L, 8.2)
);
SinkFunction<StockPrice> jdbcSink = JdbcSink.sink(
"insert into stockprice (stockId, timeStamp, price) values (?, ?, ?)",
new JdbcStatementBuilder<StockPrice>() {
@Override
public void accept(PreparedStatement preparedStatement, StockPrice stockPrice) throws SQLException {
preparedStatement.setString(1, stockPrice.stockId);
preparedStatement.setLong(2, stockPrice.timeStamp);
preparedStatement.setDouble(3, stockPrice.price);
}
}
,
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/flinkdb")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
);
stockPriceDS.addSink(jdbcSink);
env.execute();
}
}
代码文件ReduceWindowFunctionDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
public class ReduceWindowFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataGeneratorSource<StockPrice> dataGeneratorSource = new DataGeneratorSource<>(
// 指定GeneratorFunction 实现类
new MyGeneratorFunction(),
// 指定输出数据的总行数
10,
// 指定每秒发射的记录数
RateLimiterStrategy.perSecond(1),
// 指定返回值类型
TypeInformation.of(StockPrice.class) // 将Java的StockPrice封装成到TypeInformation
);
// 读取dataGeneratorSource中的数据
DataStreamSource<StockPrice> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource
, WatermarkStrategy.noWatermarks() //指定水位线生成策略
, "dataGeneratorSource");
KeyedStream<StockPrice, String> stockPriceKS =
dataGeneratorSourceStream.keyBy(stockPrice->stockPrice.getStockId());
WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
stockPriceKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<StockPrice> reducedDS =
stockPirceWS.reduce((s1, s2) -> new StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price));
reducedDS.print();
env.execute();
}
}
代码文件AggregateWindowFunctionDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class AggregateWindowFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataGeneratorSource<StockPrice> dataGeneratorSource = new DataGeneratorSource<>(
// 指定GeneratorFunction 实现类
new MyGeneratorFunction(),
// 指定输出数据的总行数
10,
// 指定每秒发射的记录数
RateLimiterStrategy.perSecond(1),
// 指定返回值类型
TypeInformation.of(StockPrice.class) // 将Java的StockPrice封装成到TypeInformation
);
// 读取dataGeneratorSource中的数据
DataStreamSource<StockPrice> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource
, WatermarkStrategy.noWatermarks() //指定水位线生成策略
, "dataGeneratorSource");
KeyedStream<StockPrice, String> stockPriceKS =
dataGeneratorSourceStream.keyBy(stockPrice->stockPrice.getStockId());
WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
stockPriceKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<Tuple2<String, Double>> aggregatedDS = stockPirceWS.aggregate(new MyAggregateFunction());
aggregatedDS.print();
env.execute();
}
public static class MyAggregateFunction implements AggregateFunction<StockPrice, Tuple3<String,Double,Long>, Tuple2<String,Double>>{
@Override
//创建累加器
public Tuple3<String, Double, Long> createAccumulator() {
return Tuple3.of("",0D,0L);
}
@Override
//定义把输入数据累加到累加器的逻辑
public Tuple3<String, Double, Long> add(StockPrice value, Tuple3<String, Double, Long> accumulator) {
return Tuple3.of(value.stockId,accumulator.f1+value.price,accumulator.f2+1L);
}
@Override
//根据累加器得出结果
public Tuple2<String, Double> getResult(Tuple3<String, Double, Long> accumulator) {
return Tuple2.of(accumulator.f0,accumulator.f1 / accumulator.f2);
}
@Override
//定义累加器合并的逻辑
public Tuple3<String, Double, Long> merge(Tuple3<String, Double, Long> a, Tuple3<String, Double, Long> b) {
return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
}
}
}
代码文件ProcessWindowFunctionDemo.java
package cn.edu.xmu;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class ProcessWindowFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataGeneratorSource<StockPrice> dataGeneratorSource = new DataGeneratorSource<>(
// 指定GeneratorFunction 实现类
new MyGeneratorFunction(),
// 指定输出数据的总行数
10,
// 指定每秒发射的记录数
RateLimiterStrategy.perSecond(1),
// 指定返回值类型
TypeInformation.of(StockPrice.class) // 将Java的StockPrice封装成到TypeInformation
);
// 读取dataGeneratorSource中的数据
DataStreamSource<StockPrice> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource
, WatermarkStrategy.noWatermarks() //指定水位线生成策略
, "dataGeneratorSource");
KeyedStream<StockPrice, String> stockPriceKS =
dataGeneratorSourceStream.keyBy(stockPrice->stockPrice.getStockId());
WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
stockPriceKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<String> processDS =
stockPirceWS.process(new MyProcessWindowFunction());
processDS.print();
env.execute();
}
public static class MyProcessWindowFunction extends ProcessWindowFunction<StockPrice, String, String, TimeWindow> {
@Override
//一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
public void process(String s, ProcessWindowFunction<StockPrice, String, String, TimeWindow>.Context context, Iterable<StockPrice> elements, Collector<String> out) throws Exception {
//聚合,注意:整个窗口的数据保存到Iterable,里面有很多条数据
AtomicInteger count= new AtomicInteger();
AtomicReference<Double> sumPrice= new AtomicReference<>(0.0);
elements.forEach(stock -> {
sumPrice.set(sumPrice.get() + stock.price);
count.set(count.get() + 1);
});
long startTS = context.window().getStart();
long endTS = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTS,"yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTS,"yyyy-MM-dd HH:mm:ss.SSS");
out.collect("Windows start:"+windowStart+"Windows end:"+windowEnd+"stockId:"+s+",price:"+String.valueOf(sumPrice.get()/count.get()));
}
}
}
代码文件AggregateAndProcessWindowFunctionDemo.java
package cn.edu.xmu;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class AggregateAndProcessWindowFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataGeneratorSource<StockPrice> dataGeneratorSource = new DataGeneratorSource<>(
// 指定GeneratorFunction 实现类
new MyGeneratorFunction(),
// 指定输出数据的总行数
10,
// 指定每秒发射的记录数
RateLimiterStrategy.perSecond(1),
// 指定返回值类型
TypeInformation.of(StockPrice.class) // 将Java的StockPrice封装成到TypeInformation
);
// 读取dataGeneratorSource中的数据
DataStreamSource<StockPrice> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource
, WatermarkStrategy.noWatermarks() //指定水位线生成策略
, "dataGeneratorSource");
dataGeneratorSourceStream.print();
KeyedStream<StockPrice, String> stockPriceKS =
dataGeneratorSourceStream.keyBy(stockPrice->stockPrice.getStockId());
WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
stockPriceKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<String> aggregatedDS =
stockPirceWS.aggregate(new MyAggregateFunction(),new MyProcessWindowFunction());
aggregatedDS.print();
env.execute();
}
public static class MyAggregateFunction implements AggregateFunction<StockPrice, Tuple3<String,Double,Long>, Tuple2<String,Double>> {
@Override
//创建累加器
public Tuple3<String, Double, Long> createAccumulator() {
return Tuple3.of("",0D,0L);
}
@Override
//定义把输入数据累加到累加器的逻辑
public Tuple3<String, Double, Long> add(StockPrice value, Tuple3<String, Double, Long> accumulator) {
return Tuple3.of(value.stockId,accumulator.f1+value.price,accumulator.f2+1L);
}
@Override
//根据累加器得出结果
public Tuple2<String, Double> getResult(Tuple3<String, Double, Long> accumulator) {
return Tuple2.of(accumulator.f0,accumulator.f1 / accumulator.f2);
}
@Override
//定义累加器合并的逻辑
public Tuple3<String, Double, Long> merge(Tuple3<String, Double, Long> a, Tuple3<String, Double, Long> b) {
return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
}
}
public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String,Double>, String, String, TimeWindow> {
@Override
public void process(String s, ProcessWindowFunction<Tuple2<String, Double>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Double>> elements, Collector<String> out) throws Exception {
//聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据
AtomicInteger count= new AtomicInteger();
AtomicReference<Double> sumPrice= new AtomicReference<>(0.0);
elements.forEach(tuple -> {
sumPrice.set(sumPrice.get() + tuple.f1);
count.set(count.get() + 1);
});
long startTS = context.window().getStart();
long endTS = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTS,"yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTS,"yyyy-MM-dd HH:mm:ss.SSS");
out.collect("Windows start:"+windowStart+"Windows end:"+windowEnd+"stockId:"+s+",price:"+String.valueOf(sumPrice.get()/count.get()));
}
}
}
代码文件TriggerDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
public class TriggerDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> sourceDS = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<Integer> mapDS = sourceDS.map(value -> Integer.parseInt(value));
//定义一个大小为5的滚动计数窗口
AllWindowedStream<Integer, GlobalWindow> allWindowedDS = mapDS.windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5)));
//求窗口内的最小值
SingleOutputStreamOperator<Integer> minDS = allWindowedDS.min(0);
minDS.print();
env.execute();
}
}
代码文件MyTrigger.java
package cn.edu.xmu;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class MyTrigger extends Trigger<StockPrice, TimeWindow>{
private static final String DATA_COUNT_STATE_NAME = "dataCountState";
// 窗口最大数据条数
private int maxCount;
// 用于储存窗口当前数据条数的状态对象
private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(DATA_COUNT_STATE_NAME, new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}, LongSerializer.INSTANCE);
public MyTrigger(int maxCount) {
this.maxCount = maxCount;
}
// 触发计算,计算结束后清空窗口内的元素
private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
clear(window, ctx);
return TriggerResult.FIRE_AND_PURGE;
}
// 进入窗口的每个元素都会调用该方法
@Override
public TriggerResult onElement(StockPrice element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.add(1L);
if (countState.get() >= maxCount) {
return fireAndPurge(window, ctx);
}else{
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return fireAndPurge(window, ctx);
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return null;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countStateDescriptor).clear();
}
// 初始化触发器
public static MyTrigger creat(int maxCount) {
return new MyTrigger(maxCount);
}
}
代码文件TriggerDemo1.java
package cn.edu.xmu;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TriggerDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
KeyedStream<StockPrice, String> stockPriceKS =
stockPriceDS.keyBy(stockPrice->stockPrice.getStockId());
SingleOutputStreamOperator<StockPrice> resultDS = stockPriceKS
.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
.trigger(new MyTrigger(5))
.reduce((s1, s2) -> new StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price));
resultDS.print();
env.execute();
}
}
nc -lk 9999
s1,1,1
s1,2,2
s1,3,3
s1,4,4
s1,5,5
代码文件EvictorDemo.java
package cn.edu.xmu;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class EvictorDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
KeyedStream<StockPrice, String> stockPriceKS =
stockPriceDS.keyBy(stockPrice->stockPrice.getStockId());
WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
stockPriceKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> processDS =
stockPirceWS
.evictor(new MyEvictor())
.process(new MyProcessWindowFunction());
processDS.print();
env.execute();
}
public static class MyProcessWindowFunction extends ProcessWindowFunction<StockPrice, String, String, TimeWindow> {
@Override
//一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
public void process(String s, ProcessWindowFunction<StockPrice, String, String, TimeWindow>.Context context, Iterable<StockPrice> elements, Collector<String> out) throws Exception {
//聚合,注意:整个窗口的数据保存到Iterable,里面有很多条数据
AtomicInteger count= new AtomicInteger();
AtomicReference<Double> sumPrice= new AtomicReference<>(0.0);
elements.forEach(stock -> {
sumPrice.set(sumPrice.get() + stock.price);
count.set(count.get() + 1);
});
long startTS = context.window().getStart();
long endTS = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTS,"yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTS,"yyyy-MM-dd HH:mm:ss.SSS");
out.collect("Windows start:"+windowStart+"Windows end:"+windowEnd+"stockId:"+s+",price:"+String.valueOf(sumPrice.get()/count.get()));
}
}
public static class MyEvictor implements Evictor<StockPrice,TimeWindow>{
@Override
public void evictBefore(Iterable<TimestampedValue<StockPrice>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
Iterator<TimestampedValue<StockPrice>> ite = elements.iterator();
while (ite.hasNext()) {
TimestampedValue<StockPrice> element = ite.next();
System.out.println("驱逐器获取到的股票价格:" + element.getValue().price);
//模拟去掉非法参数数据
if (element.getValue().price <= 0) {
System.out.println("股票价格小于0,删除该记录");
ite.remove();
}
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<StockPrice>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
//不做任何操作
}
}
}
在Linux终端中启动系统自带的NC程序,再启动EvictorTest程序,然后,在NC窗口内输入如下数据(需要逐行输入,每输入一行就回车):
stock_1,1602031567000,8
stock_1,1602031568000,-4
代码文件WatermarkDemo1.java
package cn.edu.xmu;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class WatermarkDemo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
WatermarkStrategy<StockPrice> watermarkStrategy = WatermarkStrategy
.<StockPrice>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<StockPrice>() {
@Override
public long extractTimestamp(StockPrice element, long recordTimestamp) {
//从输入数据中提取时间戳作为水位线,转换成毫秒
return element.getTimeStamp() * 1000L;
}
});
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(watermarkStrategy);
KeyedStream<StockPrice, String> stockPriceKS =
stockPriceDSWithWatermark.keyBy(stockPrice->stockPrice.getStockId());
WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
// 使用事件时间语义窗口TumblingEventTimeWindows
stockPriceKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> processDS =
stockPirceWS.process(new MyProcessWindowFunction());
processDS.print();
env.execute();
}
public static class MyProcessWindowFunction extends ProcessWindowFunction<StockPrice, String, String, TimeWindow> {
@Override
//一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
public void process(String s, ProcessWindowFunction<StockPrice, String, String, TimeWindow>.Context context, Iterable<StockPrice> elements, Collector<String> out) throws Exception {
//聚合,注意:整个窗口的数据保存到Iterable,里面有很多条数据
AtomicInteger count= new AtomicInteger();
AtomicReference<Double> sumPrice= new AtomicReference<>(0.0);
elements.forEach(stock -> {
sumPrice.set(sumPrice.get() + stock.price);
count.set(count.get() + 1);
});
long startTS = context.window().getStart();
long endTS = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTS,"yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTS,"yyyy-MM-dd HH:mm:ss.SSS");
out.collect("Windows start:"+windowStart+"Windows end:"+windowEnd+"stockId:"+s+",price:"+String.valueOf(sumPrice.get()/count.get()));
}
}
}
在Linux中启动一个NC程序,在IDEA中运行上面程序,然后,在NC窗口内依次输入如下数据:
s1,1,1
s1,2,2
s1,3,3
s1,7,7
s1,9,9
s1,10,10
代码文件WatermarkDemo2.java
package cn.edu.xmu;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class WatermarkDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
WatermarkStrategy<StockPrice> watermarkStrategy = WatermarkStrategy
.<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<StockPrice>() {
@Override
public long extractTimestamp(StockPrice element, long recordTimestamp) {
return element.getTimeStamp() * 1000L; //转换成毫秒
}
});
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(watermarkStrategy);
KeyedStream<StockPrice, String> stockPriceKS =
stockPriceDSWithWatermark.keyBy(stockPrice->stockPrice.getStockId());
WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
//use event time
stockPriceKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> processDS =
stockPirceWS.process(new MyProcessWindowFunction());
processDS.print();
env.execute();
}
public static class MyProcessWindowFunction extends ProcessWindowFunction<StockPrice, String, String, TimeWindow> {
@Override
//一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
public void process(String s, ProcessWindowFunction<StockPrice, String, String, TimeWindow>.Context context, Iterable<StockPrice> elements, Collector<String> out) throws Exception {
//聚合,注意:整个窗口的数据保存到Iterable,里面有很多条数据
AtomicInteger count= new AtomicInteger();
AtomicReference<Double> sumPrice= new AtomicReference<>(0.0);
elements.forEach(stock -> {
sumPrice.set(sumPrice.get() + stock.price);
count.set(count.get() + 1);
});
long startTS = context.window().getStart();
long endTS = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTS,"yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTS,"yyyy-MM-dd HH:mm:ss.SSS");
out.collect("Windows start:"+windowStart+"Windows end:"+windowEnd+"stockId:"+s+",price:"+String.valueOf(sumPrice.get()/count.get()));
}
}
}
在Linux中启动一个NC程序,在IDEA中运行上面程序,然后,在NC窗口内依次输入如下数据:
s1,1,1
s1,2,2
s1,5,5
s1,3,3
s1,9,9
s1,2,2
s1,10,10
s1,3,3
s1,11,11
s1,13,13
代码文件PeriodicWatermarkDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class PeriodicWatermarkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(2000); // 设置周期性水位线生成时间间隔为2000毫秒
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
WatermarkStrategy<StockPrice> watermarkStrategy =
WatermarkStrategy.<StockPrice>forGenerator(new WatermarkGeneratorSupplier<StockPrice>() {
@Override
public WatermarkGenerator<StockPrice> createWatermarkGenerator(Context context) {
return new MyPeriodicWatermarkGenerator<>(2000L); // 2000毫秒
}
}).withTimestampAssigner(
(element, recordTimestamp) -> element.getTimeStamp()*1000L
);
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark =
stockPriceDS.assignTimestampsAndWatermarks(watermarkStrategy);
KeyedStream<StockPrice, String> stockPriceKS =
stockPriceDSWithWatermark.keyBy(stockPrice->stockPrice.getStockId());
WindowedStream<StockPrice, String, TimeWindow> stockPriceWS =
stockPriceKS.window(TumblingEventTimeWindows.of(Time.seconds(3)));
SingleOutputStreamOperator<StockPrice> reducedDS = stockPriceWS.reduce((s1, s2) -> new StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price));
reducedDS.print();
env.execute();
}
public static class MyPeriodicWatermarkGenerator<StockPrice> implements WatermarkGenerator<StockPrice>{
// 乱序等待时间
private long delayTime;
// 用来保存到目前为止的最大事件时间
private long maxTimestamp;
public MyPeriodicWatermarkGenerator(long delayTs){
this.delayTime = delayTs;
this.maxTimestamp = Long.MIN_VALUE + this.delayTime + 1;
}
/**
* 每条数据到达都会调用一次onEvent方法,用来提取最大事件时间,保存下来
* @param event 当前到达的数据
* @param eventTimestamp 提取到的事件时间
* @param output
*/
@Override
public void onEvent(StockPrice event, long eventTimestamp, WatermarkOutput output) {
// 已经保存的时间戳和当前提取到的时间戳进行比较,取二者较大值
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
System.out.println("调用onEvent方法,获取到目前为止的最大时间戳=" + maxTimestamp);
}
/**
* 周期性调用:发射Watermark
* @param output
*/
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - delayTime - 1)); //1表示1毫秒
System.out.println("调用onPeriodicEmit方法,生成Watermark=" + (maxTimestamp - delayTime - 1));
}
}
}
代码文件PunctuatedWatermarkDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.text.Format;
import java.text.SimpleDateFormat;
public class PunctuatedWatermarkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(new MyWatermarkStrategy());
KeyedStream<StockPrice, String> stockPriceKS =
stockPriceDSWithWatermark.keyBy(stockPrice->stockPrice.getStockId());
WindowedStream<StockPrice, String, TimeWindow> stockPriceWS =
stockPriceKS.window(TumblingEventTimeWindows.of(Time.seconds(3)));
SingleOutputStreamOperator<StockPrice> reducedDS = stockPriceWS.reduce((s1, s2) -> new StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price));
reducedDS.print();
env.execute();
}
public static class MyWatermarkStrategy implements WatermarkStrategy<StockPrice>{
@Override
public WatermarkGenerator<StockPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<StockPrice>(){
long maxOutOfOrderness = 10000L; //设定最大延迟为10秒
long currentMaxTimestamp = 0L;
Watermark a = null;
Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public void onEvent(StockPrice event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp);
a = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
output.emitWatermark(a);
System.out.println("timestamp:" + event.stockId + "," + event.timeStamp + "|" + format.format(event.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 没有使用周期性发送水印,因此这里没有执行任何操作
}
};
}
@Override
public TimestampAssigner<StockPrice> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<StockPrice>() {
@Override
public long extractTimestamp(StockPrice element, long recordTimestamp) {
return element.timeStamp; //从到达消息中提取时间戳;
}
};
}
}
}
nc -lk 9999
代码文件LateDataDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
import java.text.Format;
import java.text.SimpleDateFormat;
public class LateDataDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark =
stockPriceDS.assignTimestampsAndWatermarks(new MyWatermarkStrategy());
KeyedStream<StockPrice, String> stockPriceKS =
stockPriceDSWithWatermark.keyBy(stockPrice->stockPrice.getStockId());
OutputTag<StockPrice> lateData = new OutputTag<>("late", Types.POJO(StockPrice.class));
WindowedStream<StockPrice, String, TimeWindow> stockPriceWS =
stockPriceKS
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.allowedLateness(Time.seconds(2L))
.sideOutputLateData(lateData);
SingleOutputStreamOperator<StockPrice> reducedDS = stockPriceWS.reduce((s1, s2) -> new StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price));
reducedDS.print("window计算结果:");
SideOutputDataStream<StockPrice> lateDS = reducedDS.getSideOutput(lateData);
lateDS.print("迟到的数据:");
env.execute();
}
public static class MyWatermarkStrategy implements WatermarkStrategy<StockPrice>{
@Override
public WatermarkGenerator<StockPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<StockPrice>(){
long maxOutOfOrderness = 10000L; //设定最大延迟为10秒
long currentMaxTimestamp = 0L;
Watermark a = null;
Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public void onEvent(StockPrice event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp);
a = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
output.emitWatermark(a);
System.out.println("timestamp:" + event.stockId + "," + event.timeStamp + "|" + format.format(event.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 没有使用周期性发送水印,因此这里没有执行任何操作
}
};
}
@Override
public TimestampAssigner<StockPrice> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<StockPrice>() {
@Override
public long extractTimestamp(StockPrice element, long recordTimestamp) {
return element.timeStamp; //从到达消息中提取时间戳;
}
};
}
}
}
在Linux终端中启动NC程序,然后在IDEA中启动程序WatermarkDemo4.java,再在NC终端中输入如下数据(逐行输入):
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32
代码文件WindowJoinDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class WindowJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Tuple2<String, Integer>> sourceDS1 = env.fromElements(
Tuple2.of("b", 1), // 第1个元素是key,第2个元素是时间戳(秒数)
Tuple2.of("b", 2),
Tuple2.of("c", 3),
Tuple2.of("d", 4)
);
SingleOutputStreamOperator<Tuple2<String, Integer>> sourceDS1WithWatermark = sourceDS1.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
DataStreamSource<Tuple3<String, Integer, Integer>> sourceDS2 = env.fromElements(
Tuple3.of("b", 1, 1), // 第1个元素是key,第2个元素是时间戳(秒数)
Tuple3.of("b", 12, 1),
Tuple3.of("c", 2, 1),
Tuple3.of("d", 14, 1)
);
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> sourceDS2WithWatermark = sourceDS2.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
// 执行Window Join
DataStream<String> joinResult = sourceDS1WithWatermark.join(sourceDS2WithWatermark)
.where(value1 -> value1.f0) //sourceDS1WithWatermark的keyBy
.equalTo(value2 -> value2.f0) //sourceDS2WithWatermark的keyBy
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
* 关联上的数据会调用join方法
* @param first 来自sourceDS1WithWatermark的数据
* @param second 来自sourceDS2WithWatermark的数据
*/
@Override
public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
return first + "<------>" + second;
}
});
joinResult.print();
env.execute();
}
}
代码文件IntervalJoinDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class IntervalJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Tuple2<String, Integer>> sourceDS1 = env.fromElements(
Tuple2.of("b", 1), // 第1个元素是key,第2个元素是时间戳(秒数)
Tuple2.of("b", 2),
Tuple2.of("c", 3),
Tuple2.of("d", 4)
);
SingleOutputStreamOperator<Tuple2<String, Integer>> sourceDS1WithWatermark = sourceDS1.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
DataStreamSource<Tuple3<String, Integer, Integer>> sourceDS2 = env.fromElements(
Tuple3.of("b", 1, 1), // 第1个元素是key,第2个元素是时间戳(秒数)
Tuple3.of("b", 12, 1),
Tuple3.of("c", 2, 1),
Tuple3.of("d", 14, 1)
);
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> sourceDS2WithWatermark = sourceDS2.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
// 对两个流分别做keyBy操作
KeyedStream<Tuple2<String, Integer>, String> keyedStream1 = sourceDS1WithWatermark.keyBy(value1 -> value1.f0);
KeyedStream<Tuple3<String, Integer, Integer>, String> keyedStream2 = sourceDS2WithWatermark.keyBy(value2 -> value2.f0);
// 调用Interval Join
SingleOutputStreamOperator<String> processResult = keyedStream1.intervalJoin(keyedStream2)
.between(Time.seconds(-2), Time.seconds(2)) //设置下界和上界
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
*只有当两个流的数据匹配上,才会调用processElement方法
* @param left 第1个流keyedStream1
* @param right 第2个流keyedStream2
* @param ctx 上下文
* @param out 采集器
*/
@Override
public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(left + "可以和" + right + "发生连接");
}
});
processResult.print();
env.execute();
}
}
代码文件IntervalJoinWithLateDataDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class IntervalJoinWithLateDataDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> sourceDS1 = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS1 = sourceDS1.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] splitResult = value.split(",");
return Tuple2.of(splitResult[0], Integer.parseInt(splitResult[1]));
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS1WithWatermark = mapDS1.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
DataStreamSource<String> sourceDS2 = env.socketTextStream("hadoop01", 8888);
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> mapDS2 = sourceDS2.map(new MapFunction<String, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> map(String value) throws Exception {
String[] splitResult = value.split(",");
return Tuple3.of(splitResult[0], Integer.parseInt(splitResult[1]), Integer.parseInt(splitResult[2]));
}
});
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> mapDS2WithWatermark = mapDS2.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
// 对两个流分别做keyBy操作
KeyedStream<Tuple2<String, Integer>, String> keyedStream1 = mapDS1WithWatermark.keyBy(value1 -> value1.f0);
KeyedStream<Tuple3<String, Integer, Integer>, String> keyedStream2 = mapDS2WithWatermark.keyBy(value2 -> value2.f0);
// 调用Interval Join
OutputTag<Tuple2<String, Integer>> keyedStream1LateTag = new OutputTag<>("keyedStream1-late", Types.TUPLE(Types.STRING, Types.INT));
OutputTag<Tuple3<String, Integer, Integer>> keyedStream2LateTag = new OutputTag<>("keyedStream2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));
SingleOutputStreamOperator<String> processResult = keyedStream1.intervalJoin(keyedStream2)
.between(Time.seconds(-2), Time.seconds(2))
.sideOutputLeftLateData(keyedStream1LateTag)
.sideOutputRightLateData(keyedStream2LateTag)
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
*只有当两个流的数据匹配上,才会调用processElement方法
* @param left 第1个流keyedStream1
* @param right 第2个流keyedStream2
* @param ctx 上下文
* @param out 采集器
*/
@Override
public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(left + "可以和" + right + "发生连接");
}
});
processResult.print("主流");
processResult.getSideOutput(keyedStream1LateTag).printToErr("keyedStream1迟到的数据");
processResult.getSideOutput(keyedStream2LateTag).printToErr("keyedStream2迟到的数据");
env.execute();
}
}
nc -lk 9999
nc -lk 8888
程序KeyedValueStateDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class KeyedValueStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp() * 1000L)
);
SingleOutputStreamOperator<String> resultDS = stockPriceDSWithWatermark.keyBy(stockPrice -> stockPrice.getStockId())
.process(new KeyedProcessFunction<String, StockPrice, String>() {
// 定义状态
ValueState<Double> lastPriceState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化状态
// ValueStateDescriptor有两个参数,第1个参数是名字,第2个参数是存储的数据类型
lastPriceState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastPriceState", Types.DOUBLE));
}
@Override
public void processElement(StockPrice value, KeyedProcessFunction<String, StockPrice, String>.Context ctx, Collector<String> out) throws Exception {
// 取出上一条数据的股票价格
double lastPrice =
lastPriceState.value()==null?0.0:lastPriceState.value();
double currentPrice = value.getPrice();
// 求出两次价格的差值,判断是否超过5
if (Math.abs(currentPrice - lastPrice) > 5) {
out.collect("股票ID是" + value.getStockId() + ",当前股票价格=" + currentPrice + ",上一次股票价格=" + lastPrice + ",二者差值大于5");
}
// 把状态里面的股票价格更新为当前这条数据的股票价格
lastPriceState.update(currentPrice);
}
});
resultDS.print();
env.execute();
}
}
程序KeyedListStateDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
public class KeyedListStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp() * 1000L)
);
SingleOutputStreamOperator<String> resultDS = stockPriceDSWithWatermark.keyBy(stockPrice -> stockPrice.getStockId())
.process(new KeyedProcessFunction<String, StockPrice, String>() {
ListState<Double> stockPirceListState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化状态
// ListStateDescriptor有两个参数,第1个参数是名字,第2个参数是存储的数据类型
stockPirceListState = getRuntimeContext().getListState(new ListStateDescriptor<Double>("stockPirceListState", Types.DOUBLE));
}
@Override
public void processElement(StockPrice value, KeyedProcessFunction<String, StockPrice, String>.Context ctx, Collector<String> out) throws Exception {
// 新来一条数据,保存到ListState中
stockPirceListState.add(value.getPrice());
// 从ListState中取出数据进行排序,只保留排名前3的数据,更新ListState
Iterable<Double> stockPriceListIterable = stockPirceListState.get();
// 把ListState中数据拷贝到一个新的列表中
List<Double> stockPriceList = new ArrayList<>();
for(Double stockPrice : stockPriceListIterable){
stockPriceList.add(stockPrice);
}
// 对列表中的元素进行降序排序
stockPriceList.sort((t1,t2)-> (int) (t2-t1));
// 只保留列表中最大的3个元素
if(stockPriceList.size()>3){
stockPriceList.remove(3); // 清除第4个元素
}
out.collect("股票ID是" + value.getStockId() + ",最大的3个股票价格是:"+stockPriceList.toString());
// 更新ListState中的数据
stockPirceListState.update(stockPriceList);
}
}
);
resultDS.print();
env.execute();
}
}
程序KeyedMapStateDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.Map;
public class KeyedMapStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp() * 1000L)
);
SingleOutputStreamOperator<String> resultDS = stockPriceDSWithWatermark.keyBy(stockPrice -> stockPrice.getStockId())
.process(new KeyedProcessFunction<String, StockPrice, String>() {
// 定义状态
MapState<Double,Integer> stockPriceCountMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化状态
// MapStateDescriptor有3个参数,第1个参数是名字,第2个参数是Map的key的数据类型,第3个参数是Map的value的类型
stockPriceCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Double,Integer>("stockPriceCountMapState",Types.DOUBLE,Types.INT));
}
@Override
public void processElement(StockPrice value, KeyedProcessFunction<String, StockPrice, String>.Context ctx, Collector<String> out) throws Exception {
// 拿到一条数据以后,首先判断与该股票价格对应的key是否存在
Double stockPrice = value.getPrice();
if(stockPriceCountMapState.contains(stockPrice)){
// 如果包含与该股票价格对应的key,就把对应的Map的value增加1
Integer count = stockPriceCountMapState.get(stockPrice);
count = count + 1;
stockPriceCountMapState.put(stockPrice,count);
}else{
// 如果不包含与该股票价格对应的key,就在Map中新建一个键值对
stockPriceCountMapState.put(stockPrice,1);
}
StringBuilder outString = new StringBuilder();
outString.append("股票ID是"+value.getStockId()+"\n");
// 遍历MapState,输出Map中每个键值对
for (Map.Entry<Double, Integer> stockPriceCount : stockPriceCountMapState.entries()) {
outString.append("价格是:"+stockPriceCount.getKey()+",该价格出现次数是:"+stockPriceCount.getValue()+"\n");
}
out.collect(outString.toString());
}
});
resultDS.print();
env.execute();
}
}
在Linux终端中启动NC程序,然后在IDEA中启动程序KeyedMapStateDemo.java,再在NC窗口中输入如下数据:
s1,1,1
s1,2,1
s1,3,1
s2,1,1
s2,2,1
程序KeyedReducingStateDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class KeyedReducingStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp() * 1000L)
);
SingleOutputStreamOperator<String> resultDS = stockPriceDSWithWatermark.keyBy(stockPrice -> stockPrice.getStockId())
.process(new KeyedProcessFunction<String, StockPrice, String>() {
// 定义状态
ReducingState<Double> stockPriceSumReducingState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化状态
stockPriceSumReducingState = getRuntimeContext().getReducingState(
new ReducingStateDescriptor<Double>(
"stockPriceSumReducingState", //名称
(value1,value2)->value1+value2, // 匿名函数
Types.DOUBLE //输出数据类型
)
);
}
@Override
public void processElement(StockPrice value, KeyedProcessFunction<String, StockPrice, String>.Context ctx, Collector<String> out) throws Exception {
// 来一条数据就添加到ReducingState中
stockPriceSumReducingState.add(value.getPrice());
out.collect("股票ID是:"+value.getStockId()+",累计股票价格总和是:"+stockPriceSumReducingState.get());
}
});
resultDS.print();
env.execute();
}
}
在Linux终端中启动NC程序,然后在IDEA中启动程序KeyedReducingStateDemo.java,再在NC窗口中输入如下数据:
s1,1,3
s1,2,5
s1,3,4
s2,1,7
s2,2,9
程序KeyedAggregatingStateDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class KeyedAggregatingStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp() * 1000L)
);
SingleOutputStreamOperator<String> resultDS = stockPriceDSWithWatermark.keyBy(stockPrice -> stockPrice.getStockId())
.process(new KeyedProcessFunction<String, StockPrice, String>() {
// 定义状态
//AggregatingState有两个参数,第1个参数表示输入数据的类型,第2个参数表示输出数据的类型
AggregatingState<Double,Double> stockPriceAvgAggregatingState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化状态
stockPriceAvgAggregatingState = getRuntimeContext()
// AggregatingStateDescriptor有3个参数,分别表示输入的类型、累加器的类型和输出的类型
.getAggregatingState(new AggregatingStateDescriptor<Double, Tuple2<Double,Integer>, Double>(
"stockPriceAvgAggregatingState", // 名字
new AggregateFunction<Double, Tuple2<Double, Integer>, Double>() {
@Override
public Tuple2<Double, Integer> createAccumulator() {
return Tuple2.of(0.0,0); // 初始化累加器
}
@Override
public Tuple2<Double, Integer> add(Double value, Tuple2<Double, Integer> accumulator) {
return Tuple2.of(accumulator.f0+value,accumulator.f1+1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0/accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return Tuple2.of(a.f0+b.f0,a.f1+b.f1);
}
},
Types.TUPLE(Types.DOUBLE, Types.INT) // 累加器的类型
));
}
@Override
public void processElement(StockPrice value, KeyedProcessFunction<String, StockPrice, String>.Context ctx, Collector<String> out) throws Exception {
// 收到一条数据以后,把股票价格添加到AggregatingState中
stockPriceAvgAggregatingState.add(value.getPrice());
// 从AggregatingState中读取结果
Double stockPriceAvg = stockPriceAvgAggregatingState.get();
out.collect("股票ID是:"+value.getStockId()+",该股票的平均价格是:"+stockPriceAvg);
}
});
resultDS.print();
env.execute();
}
}
在Linux终端中启动NC程序,然后在IDEA中启动程序KeyedAggregatingStateDemo.java,再在NC窗口中输入如下数据:
s1,1,1
s1,2,2
s1,3,3
s2,4,4
代码文件ProcessingTimeTimerDemo.java
package cn.edu.xmu;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class ProcessingTimeTimerDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 处理时间语义,不需要分配时间戳和 watermark
DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
String[] element = s.split(",");
return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
});
// 要用定时器,必须基于 KeyedStream
stockPriceDS.keyBy(value -> value.getStockId())
// KeyedProcessFunction的第1个参数表示key的类型,第2个参数表述输入的类型,第3个参数表示输出的类型
.process(new KeyedProcessFunction<String, StockPrice, String>() {
@Override
public void processElement(StockPrice value, Context ctx, Collector<String> out) throws Exception {
Long currTs = ctx.timerService().currentProcessingTime();
out.collect("数据到达,到达时间:" + new Timestamp(currTs));
// 注册一个 10 秒后的定时器
ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));
}
}).print();
env.execute();
}
}
代码文件ClickEvent.java
package cn.edu.xmu;
public class ClickEvent {
public String name;
public String url;
public Long myTimeStamp;
// 一定要提供一个 空参 的构造器(反射的时候要使用)
public ClickEvent() {
}
public ClickEvent(String name, String url, Long myTimeStamp) {
this.name = name;
this.url = url;
this.myTimeStamp = myTimeStamp;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Long getMyTimeStamp() {
return myTimeStamp;
}
public void setMyTimeStamp(Long myTimeStamp) {
this.myTimeStamp = myTimeStamp;
}
@Override
public String toString() {
return "ClickEvent{" +
"name=" + name +
", url='" + url + '\'' +
", myTimeStamp=" + myTimeStamp +
'}';
}
}
代码文件EventTimeTimerDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class EventTimeTimerDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<ClickEvent> stream = env.addSource(new CustomSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<ClickEvent>() {
@Override
public long extractTimestamp(ClickEvent element, long recordTimestamp) {
return element.myTimeStamp;
}
}));
// 基于 KeyedStream 定义事件时间定时器
stream.keyBy(data -> true) //将所有数据的key都指定为了true,其实就是所有数据拥有相同的key,会分配到同一个分区
.process(new KeyedProcessFunction<Boolean, ClickEvent, String>() {
@Override
public void processElement(ClickEvent value, Context ctx, Collector<String> out) throws Exception {
out.collect("数据到达,时间戳为:" + ctx.timestamp());
out.collect("数据到达,水位线为: " + ctx.timerService().currentWatermark() + "\n -------分割线-------");
// 注册一个 10 秒后的定时器
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect("定时器触发,触发时间:" + timestamp);
}
})
.print();
env.execute();
}
// 自定义测试数据源
public static class CustomSource implements SourceFunction<ClickEvent> {
@Override
public void run(SourceContext<ClickEvent> ctx) throws Exception {
// 直接发出测试数据
ctx.collect(new ClickEvent("Mary", "./home", 1000L));
// 为了更加明显,中间停顿 5 秒钟
Thread.sleep(5000L);
// 发出 10 秒后的数据
ctx.collect(new ClickEvent("Mary", "./home", 11000L));
Thread.sleep(5000L);
// 发出 10 秒+1ms 后的数据
ctx.collect(new ClickEvent("Alice", "./cart", 11001L));
Thread.sleep(5000L);
}
@Override
public void cancel() {
}
}
}
代码文件ProcessAllWindowTopNDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
public class ProcessAllWindowTopNDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<ClickEvent> eventStream = env.addSource(new CustomSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<ClickEvent>() {
@Override
public long extractTimestamp(ClickEvent element, long
recordTimestamp) {
return element.getMyTimeStamp();
}
})
);
// 只需要 url 就可以统计数量,所以转换成 String 直接开窗统计
SingleOutputStreamOperator<String> result = eventStream
.map(new MapFunction<ClickEvent, String>() {
@Override
public String map(ClickEvent value) throws Exception {
return value.url;
}
})
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 开滑动窗口,窗口大小时10,滑动步长是5
.process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
@Override
public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
HashMap<String, Long> urlCountMap = new HashMap<>();
// 遍历窗口中数据,将浏览量保存到一个HashMap中
for (String url : elements) {
if (urlCountMap.containsKey(url)) {
long count = urlCountMap.get(url);
urlCountMap.put(url, count + 1L);
} else {
urlCountMap.put(url, 1L);
}
}
// 把HashMap中的数据复制到ArrayList中,从而支持排序
ArrayList<Tuple2<String, Long>> mapList = new ArrayList<Tuple2<String, Long>>();
// 将浏览量数据放入 ArrayList,进行排序
for (String key : urlCountMap.keySet()) {
mapList.add(Tuple2.of(key, urlCountMap.get(key)));
}
mapList.sort(new Comparator<Tuple2<String, Long>>() {
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String,
Long> o2) {
return o2.f1.intValue() - o1.f1.intValue(); //后面的值o2减去前面的值01表示降序排序
}
});
// 取排序后的前两名,构建输出结果
StringBuilder result = new StringBuilder();
result.append("========================================\n");
for (int i = 0; i < Math.min(2,mapList.size()); i++) {
Tuple2<String, Long> temp = mapList.get(i);
String info = "浏览量 No." + (i + 1) +
" url:" + temp.f0 +
" 浏览量:" + temp.f1 +
" 窗口开始时间 :" + new Timestamp(context.window().getStart()) +
" 窗口结束时间 :" + new Timestamp(context.window().getEnd()) + "\n";
result.append(info);
}
result.append("========================================\n");
out.collect(result.toString());
}
});
result.print();
env.execute();
}
// 自定义测试数据源
public static class CustomSource implements SourceFunction<ClickEvent> {
@Override
public void run(SourceContext<ClickEvent> ctx) throws Exception {
// 直接发出测试数据
ctx.collect(new ClickEvent("Mary", "./home", 1000L));
ctx.collect(new ClickEvent("Mary", "./home", 2000L));
ctx.collect(new ClickEvent("Alice", "./cart", 3000L));
ctx.collect(new ClickEvent("Mary", "./cart", 4000L));
ctx.collect(new ClickEvent("Alice", "./play", 5000L));
ctx.collect(new ClickEvent("Alice", "./cart", 6000L));
}
@Override
public void cancel() {
}
}
}
代码文件UrlViewCount.java
package cn.edu.xmu;
public class UrlViewCount {
public String url;
public Long startTs;
public Long endTs;
public Long count;
// 一定要提供一个 空参 的构造器(反射的时候要使用)
public UrlViewCount() {
}
public UrlViewCount(String url, Long count, Long startTs, Long endTs) {
this.url = url;
this.count = count;
this.startTs = startTs;
this.endTs = endTs;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Long getStartTs() {
return startTs;
}
public void setStartTs(Long startTs) {
this.startTs = startTs;
}
public Long getEndTs() {
return endTs;
}
public void setEndTs(Long endTs) {
this.endTs = endTs;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
@Override
public String toString() {
return "UrlViewCount{" +
"url=" + url +
", startTs='" + startTs + '\'' +
", endTs='" + endTs + '\'' +
", count=" + count +
'}';
}
}
代码文件KeyedProcessTopNDemo.java
package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.*;
public class KeyedProcessTopNDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从自定义数据源读取数据
SingleOutputStreamOperator<ClickEvent> eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<ClickEvent>() {
@Override
public long extractTimestamp(ClickEvent element, long
recordTimestamp) {
return element.getMyTimeStamp();
}
}));
// 需要按照 url 分组,求出每个 url 的访问量
// 开窗聚合后,就是普通的流,没有了窗口信息,因此需要自己打上窗口的标记,也就是在UrlViewCount中包含窗口开始时间和结束时间信息
SingleOutputStreamOperator<UrlViewCount> urlCountStream = eventStream.keyBy(data -> data.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
// 对结果中同一个窗口的统计数据,进行排序处理
// 需要按照窗口标签(窗口结束时间做keyBy,保证同一窗口时间范围内的结果到一起去,之后再排序取出Top N
SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data -> data.endTs)
.process(new TopN(2));
result.print();
env.execute();
}
// 自定义增量聚合
public static class UrlViewCountAgg implements AggregateFunction<ClickEvent, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(ClickEvent value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
// ProcessWindowFunction有4个参数,第1个是输入的类型,要和UrlViewCountAgg的输出类型保持一致,因为UrlViewCountAgg的输出就是ProcessWindowFunction的输入
// 第2个参数是输出的类型,是UrlViewCount类型,里面包含了窗口结束时间
// 第3个参数是key的类型,这里把url作为key,所以是String类型;第4个参数是窗口类型
public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>{
@Override
public void process(String s, ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
// 迭代器里面只有一条数据,所以值需要执行一次next()
Long count = elements.iterator().next();
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
out.collect(new UrlViewCount(s, windowStart, windowEnd, count));
}
}
// KeyedProcessFunction有3个参数,第1个参数表示key的类型,也就是窗口结束时间的类型
// 第2个参数表示输入的类型,第3个参数是输出的类型
public static class TopN extends KeyedProcessFunction<Long, UrlViewCount, String>{
// 用来存不同窗口的统计结果,key=windowEnd, value=List数据
private Map<Long, List<UrlViewCount>> dataListMap;
private int threshold; // n的取值
public TopN(int threshold){
this.threshold = threshold;
this.dataListMap = new HashMap<>();
}
@Override
public void processElement(UrlViewCount value, KeyedProcessFunction<Long, UrlViewCount, String>.Context ctx, Collector<String> out) throws Exception {
// 进入这个方法的只是一条数据,要排序就需要等所有数据到齐,因此,需要先把数据存起来,而且不同的窗口的数据要分开存
// 存到HashMap中
Long windowEnd = value.endTs;
if (dataListMap.containsKey(windowEnd)) {
// 不是该url的第一条数据,直接添加到List中
List<UrlViewCount> dataList = dataListMap.get(windowEnd);
dataList.add(value);
}else{
// 是该url的第一条数据,需要初始化List加入数据
List<UrlViewCount> dataList = new ArrayList<>();
dataList.add(value);
dataListMap.put(windowEnd,dataList);
}
// 注册 window end + 1ms 后的定时器,等待所有数据到齐开始排序
ctx.timerService().registerEventTimeTimer(windowEnd + 1);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, UrlViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
// 同一个窗口范围的结果都齐了,开始排序,取Top N
Long windowEnd = ctx.getCurrentKey();
// 排序
List<UrlViewCount> dataList = dataListMap.get(windowEnd);
dataList.sort(new Comparator<UrlViewCount>() {
@Override
public int compare(UrlViewCount o1, UrlViewCount o2) {
return Long.valueOf(o2.count).intValue() - Long.valueOf(o1.count).intValue(); // 后面减去前面表示降序
}
});
// 取Top N
StringBuilder result = new StringBuilder();
result.append("========================================\n");
result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");
for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {
UrlViewCount UrlViewCount = dataList.get(i);
String info = "No." + (i + 1) + " "
+ "url:" + UrlViewCount.url + " "
+ "浏览量:" + UrlViewCount.count + "\n";
result.append(info);
}
result.append("========================================\n");
//用完的List要及时清理
dataList.clear();
out.collect(result.toString());
}
}
// 自定义测试数据源
public static class ClickSource implements SourceFunction<ClickEvent> {
@Override
public void run(SourceContext<ClickEvent> ctx) throws Exception {
// 直接发出测试数据
ctx.collect(new ClickEvent("Mary", "./home", 1000L));
ctx.collect(new ClickEvent("Mary", "./home", 2000L));
ctx.collect(new ClickEvent("Alice", "./cart", 3000L));
ctx.collect(new ClickEvent("Mary", "./cart", 4000L));
ctx.collect(new ClickEvent("Alice", "./play", 5000L));
ctx.collect(new ClickEvent("Alice", "./cart", 6000L));
}
@Override
public void cancel() {
}
}
}