林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码
DataStream API
val env = StreamExecutionEnvironment.getExecutionEnvironment
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
package cn.edu.xmu.dblab
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object FileSource{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//加载或创建数据源
val dataStream = env.readTextFile("file:///usr/local/flink/README.txt")
//打印输出
dataStream.print()
//程序触发执行
env.execute()
}
}
val socketDataStream = env.socketTextStream(“localhost”,9999)
val dataStream = env.fromElements(Tuple2(1L,3L),Tuple2(1L,5L))
val dataStream = env.fromCollection(List(1,2,3))
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka
./bin/kafka-server-start.sh config/server.properties
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties &
cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 --topic wordsendertest
#这个Topic叫wordsendertest,2181是Zookeeper默认的端口号,--partitions是Topic里面的分区数,--replication-factor是备份的数量,在Kafka集群中使用,由于这里是单机版,所以不用备份
#可以用list列出所有创建的Topic,来查看上面创建的Topic是否存在
./bin/kafka-topics.sh --list --zookeeper localhost:2181
./bin/kafka-console-producer.sh --broker-list localhost:9092 \
> --topic wordsendertest
hello hadoop
hello spark
cd /usr/local/kafka
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic wordsendertest --from-beginning
package cn.edu.xmu.dblab
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
object KafkaWordCount {
def main(args: Array[String]): Unit = {
val kafkaProps = new Properties()
//Kafka的一些属性
kafkaProps.setProperty("bootstrap.servers", "localhost:9092")
//所在的消费组
kafkaProps.setProperty("group.id", "group1")
//获取当前的执行环境
val evn = StreamExecutionEnvironment.getExecutionEnvironment
//创建Kafka的消费者,wordsendertest是要消费的Topic
val kafkaSource = new FlinkKafkaConsumer[String]("wordsendertest",new SimpleStringSchema,kafkaProps)
//设置从最新的offset开始消费
kafkaSource.setStartFromLatest()
//自动提交offset
kafkaSource.setCommitOffsetsOnCheckpoints(true)
//绑定数据源
val stream = evn.addSource(kafkaSource)
//设置转换操作逻辑
val text = stream.flatMap{ _.toLowerCase().split("\\W+")filter { _.nonEmpty} }
.map{(_,1)}
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
//打印输出
text.print()
//程序触发执行
evn.execute("Kafka Word Count")
}
}
<project>
<groupId>cn.edu.xmu.dblab</groupId>
<artifactId>wordcount</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>WordCount</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.11.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
cd ~/flinkapp
/usr/local/flink/bin/flink run \
> --class cn.edu.xmu.dblab.KafkaWordCount \
> ./target/wordcount-1.0-jar-with-dependencies.jar
hello wuhan
hello china
cd /usr/local/flink/log
tail -f flink*.out
package cn.edu.xmu.dblab
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object ReadHDFSFile{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//加载或创建数据源
val dataStream = env.readTextFile("hdfs://localhost:9000/user/hadoop/word.txt")
//打印输出
dataStream.print()
//程序触发执行
env.execute()
}
}
<project>
<groupId>cn.edu.xmu.dblab</groupId>
<artifactId>wordcount</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>WordCount</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
vim ~/.bashrc
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
source ~/.bashrc
cd ~/flinkapp
/usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.ReadHDFSFile ./target/wordcount-1.0-jar-with-dependencies.jar
cd /usr/local/flink/log
tail -f flink*.out
package cn.edu.xmu.dblab
import java.util.Calendar
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.util.Random
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object StockPriceStreaming {
def main(args: Array[String]) {
//设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//股票价格数据流
val stockPriceStream: DataStream[StockPrice] = env
//该数据流由StockPriceSource类随机生成
.addSource(new StockPriceSource)
//打印结果
stockPriceStream.print()
//程序触发执行
env.execute("stock price streaming")
}
class StockPriceSource extends RichSourceFunction[StockPrice]{
var isRunning: Boolean = true
val rand = new Random()
//初始化股票价格
var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
var stockId = 0
var curPrice = 0.0d
override def run(srcCtx: SourceContext[StockPrice]): Unit = {
while (isRunning) {
//每次从列表中随机选择一只股票
stockId = rand.nextInt(priceList.size)
val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05
priceList = priceList.updated(stockId, curPrice)
val curTime = Calendar.getInstance.getTimeInMillis
//将数据源收集写入SourceContext
srcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))
Thread.sleep(rand.nextInt(10))
}
}
override def cancel(): Unit = {
isRunning = false
}
}
}
cd /usr/local/flink/log
tail -f flink*.out
val dataStream = env.fromElements(1,2,3,4,5)
val mapStream = dataStream.map(x=>x+10)
package cn.edu.xmu.dblab
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object MapFunctionTest {
def main(args: Array[String]): Unit = {
//设定执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设定程序并行度
env.setParallelism(1)
//创建数据源
val dataStream: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7)
//设置转换操作逻辑
val richFunctionDataStream = dataStream.map {new MyMapFunction()}
//打印输出
richFunctionDataStream.print()
//程序触发执行
env.execute("MapFunctionTest")
}
//自定义函数,继承RichMapFunction
class MyMapFunction extends RichMapFunction[Int, String] {
override def map(input: Int): String =
("Input : " + input.toString + ", Output : " + (input * 3).toString)
}
}
val dataStream = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val flatMapStream = dataStream.flatMap(line => line.split(" "))
package cn.edu.xmu.dblab
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object FlatMapFunctionTest {
def main(args: Array[String]): Unit = {
//设定执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设定程序并行度
env.setParallelism(1)
//设置数据源
val dataStream: DataStream[String] = env.fromElements("Hello Spark", "Flink is excellent")
//针对数据集的转换操作逻辑
val result = dataStream.flatMap(new WordSplitFlatMap(15))
//打印输出
result.print()
//程序触发执行
env.execute("FlatMapFunctionTest")
}
//使用FlatMapFunction实现过滤逻辑,只对字符串长度大于threshold的内容进行切词
class WordSplitFlatMap(threshold: Int) extends FlatMapFunction[String, String] {
override def flatMap(value: String, out: Collector[String]): Unit = {
if (value.size > threshold) {
value.split(" ").foreach(out.collect)
}
}
}
}
val dataStream = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val filterStream = dataStream.filter(line => line.contains("Flink"))
val dataStream: DataStream[(Int, Double)] = env.fromElements((1, 2.0), (2, 1.7), (1, 4.9), (3, 8.5), (3, 11.2))
//使用数字位置定义Key 按照第一个字段进行分组
val keyedStream = dataStream.keyBy(0)
package cn.edu.xmu.dblab
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object KeyByTest{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//创建数据源
val stockList = List(
StockPrice("stock_4",1602031562148L,43.4D),
StockPrice("stock_1",1602031562148L,22.9D),
StockPrice("stock_0",1602031562153L,8.2D),
StockPrice("stock_3",1602031562153L,42.1D),
StockPrice("stock_2",1602031562153L,29.2D),
StockPrice("stock_0",1602031562159L,8.1D),
StockPrice("stock_4",1602031562159L,43.7D),
StockPrice("stock_4",1602031562169L,43.5D)
)
val dataStream = env.fromCollection(stockList)
//设定转换操作逻辑
val keyedStream = dataStream.keyBy("stockId")
//打印输出
keyedStream.print()
//程序触发执行
env.execute("KeyByTest")
}
}
package cn.edu.xmu.dblab
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object ReduceTest{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//创建数据源
val stockList = List(
StockPrice("stock_4",1602031562148L,43.4D),
StockPrice("stock_1",1602031562148L,22.9D),
StockPrice("stock_0",1602031562153L,8.2D),
StockPrice("stock_3",1602031562153L,42.1D),
StockPrice("stock_2",1602031562153L,29.2D),
StockPrice("stock_0",1602031562159L,8.1D),
StockPrice("stock_4",1602031562159L,43.7D),
StockPrice("stock_4",1602031562169L,43.5D)
)
val dataStream = env.fromCollection(stockList)
//设定转换操作逻辑
val keyedStream = dataStream.keyBy("stockId")
val reduceStream = keyedStream
.reduce((t1,t2)=>StockPrice(t1.stockId,t1.timeStamp,t1.price+t2.price))
//打印输出
reduceStream.print()
//程序触发执行
env.execute("ReduceTest")
}
}
package cn.edu.xmu.dblab
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//声明一个样例类,包含三个字段:股票ID,交易时间,交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object MyReduceFunctionTest{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//创建数据源
val stockList = List(
StockPrice("stock_4",1602031562148L,43.4D),
StockPrice("stock_1",1602031562148L,22.9D),
StockPrice("stock_0",1602031562153L,8.2D),
StockPrice("stock_3",1602031562153L,42.1D),
StockPrice("stock_2",1602031562153L,29.2D),
StockPrice("stock_0",1602031562159L,8.1D),
StockPrice("stock_4",1602031562159L,43.7D),
StockPrice("stock_4",1602031562169L,43.5D)
)
val dataStream = env.fromCollection(stockList)
//设定转换操作逻辑
val keyedStream = dataStream.keyBy("stockId")
val reduceStream = keyedStream.reduce(new MyReduceFunction)
//打印输出
reduceStream.print()
//程序触发执行
env.execute("MyReduceFunctionTest")
}
class MyReduceFunction extends ReduceFunction[StockPrice] {
override def reduce(t1: StockPrice,t2:StockPrice):StockPrice = {
StockPrice(t1.stockId,t1.timeStamp,t1.price+t2.price)
}
}
}
package cn.edu.xmu.dblab
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object AggregationTest{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//创建数据源
val stockList = List(
StockPrice("stock_4",1602031562148L,43.4D),
StockPrice("stock_1",1602031562148L,22.9D),
StockPrice("stock_0",1602031562153L,8.2D),
StockPrice("stock_3",1602031562153L,42.1D),
StockPrice("stock_2",1602031562153L,29.2D),
StockPrice("stock_0",1602031562159L,8.1D),
StockPrice("stock_4",1602031562159L,43.7D),
StockPrice("stock_4",1602031562169L,43.5D)
)
val dataStream = env.fromCollection(stockList)
//设定转换操作逻辑
val keyedStream = dataStream.keyBy("stockId")
val aggregationStream = keyedStream.sum(2)
//打印输出
aggregationStream.print()
//执行操作
env.execute(" AggregationTest")
}
}
val dataStream = env.fromElements("hadoop","spark","flink")
dataStream.writeAsText("file:///home/hadoop/output.txt")
package cn.edu.xmu.dblab
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
object SinkKafkaTest{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//加载或创建数据源
val dataStream = env.fromElements("hadoop","spark","flink")
//把数据输出到Kafka
dataStream.addSink(new FlinkKafkaProducer [String]("localhost:9092", "sinkKafka", new SimpleStringSchema()))
//程序触发执行
env.execute()
}
}
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka
./bin/kafka-server-start.sh config/server.properties
cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 --topic sinkKafka
cd ~/flinkapp
/usr/local/flink/bin/flink run \
> --class cn.edu.xmu.dblab.SinkKafkaTest \
> ./target/wordcount-1.0-jar-with-dependencies.jar
cd /usr/local/kafka
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic sinkKafka --from-beginning
package cn.edu.xmu.dblab
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object WriteHDFSFile{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度为1
env.setParallelism(1)
//创建数据源
val dataStream = env.fromElements("hadoop","spark","flink")
//把数据写入HDFS
dataStream.writeAsText("hdfs://localhost:9000/output.txt")
//程序触发执行
env.execute()
}
}
cd ~/flinkapp
/usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.WriteHDFSFile ./target/wordcount-1.0-jar-with-dependencies.jar
//设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//把时间特性设置为“事件时间”
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//或者,把时间特性设置为“处理时间”
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val dataStream: DataStream[T] = ...
//基于事件时间的滚动窗口,窗口大小为5秒钟
dataStream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<window function>(...)
//基于处理时间的滚动窗口,窗口大小为5秒钟
dataStream
.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<window function>(...)
//基于事件时间的滚动窗口,窗口大小为1小时,偏移量为15分钟
dataStream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
.<window function>(...)
dataStream
.keyBy(...)
.timeWindow(Time.seconds(1))
.<window function>(...)
val dataStream: DataStream[T] = ...
//基于事件时间的滑动窗口,窗口大小为10秒,滑动步长为5秒
dataStream
.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)
//基于处理时间的滑动窗口,窗口大小为10秒,滑动步长为5秒
dataStream
.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)
//基于处理时间的滑动窗口,窗口大小为12小时,滑动步长为1小时,偏移量为8小时
dataStream
.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<window function>(...)
val input: DataStream[T] = ...
//基于事件时间的会话窗口,会话间隙为10分钟
input
.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
//基于处理时间的会话窗口,会话间隙为10分钟
input
.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
package cn.edu.xmu.dblab
import java.util.Calendar
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Random
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object ReduceWindowFunctionTest {
def main(args: Array[String]) {
//设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//设置为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
//创建数据源,股票价格数据流
val stockPriceStream: DataStream[StockPrice] = env
//该数据流由StockPriceSource类随机生成
.addSource(new StockPriceSource)
//确定针对数据集的转换操作逻辑
val sumStream = stockPriceStream
.keyBy(s => s.stockId)
.timeWindow(Time.seconds(1))
.reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))
//打印输出
sumStream.print()
//程序触发执行
env.execute("ReduceWindowFunctionTest")
}
class StockPriceSource extends RichSourceFunction[StockPrice]{
var isRunning: Boolean = true
val rand = new Random()
//初始化股票价格
var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
var stockId = 0
var curPrice = 0.0d
override def run(srcCtx: SourceContext[StockPrice]): Unit = {
while (isRunning) {
//每次从列表中随机选择一只股票
stockId = rand.nextInt(priceList.size)
val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05
priceList = priceList.updated(stockId, curPrice)
val curTime = Calendar.getInstance.getTimeInMillis
//将数据源收集写入SourceContext
srcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))
Thread.sleep(rand.nextInt(1000))
}
}
override def cancel(): Unit = {
isRunning = false
}
}
}
package cn.edu.xmu.dblab
import java.util.Calendar
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Random
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object AggregateWindowFunctionTest {
def main(args: Array[String]) {
// 设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//设置为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
//创建数据源,股票价格数据流
val stockPriceStream: DataStream[StockPrice] = env
//该数据流由StockPriceSource类随机生成
.addSource(new StockPriceSource)
stockPriceStream.print("input")
//设定针对数据集的转换操作逻辑
val sumStream = stockPriceStream
.keyBy(s => s.stockId)
.timeWindow(Time.seconds(1))
.aggregate(new MyAggregateFunction)
//打印输出
sumStream.print("output")
//程序触发执行
env.execute("AggregateWindowFunctionTest")
}
class StockPriceSource extends RichSourceFunction[StockPrice]{
var isRunning: Boolean = true
val rand = new Random()
// 初始化股票价格
var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
var stockId = 0
var curPrice = 0.0d
override def run(srcCtx: SourceContext[StockPrice]): Unit = {
while (isRunning) {
// 每次从列表中随机选择一只股票
stockId = rand.nextInt(priceList.size)
val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05
priceList = priceList.updated(stockId, curPrice)
val curTime = Calendar.getInstance.getTimeInMillis
// 将数据源收集写入SourceContext
srcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))
Thread.sleep(rand.nextInt(500))
}
}
override def cancel(): Unit = {
isRunning = false
}
}
//自定义函数
class MyAggregateFunction extends AggregateFunction[StockPrice,(String,Double,Long),(String,Double)] {
//创建累加器
override def createAccumulator(): (String,Double, Long) = ("",0D,0L)
//定义把输入数据累加到累加器的逻辑
override def add(input:StockPrice,acc:(String,Double,Long))={
(input.stockId,acc._2+input.price,acc._3+1L)
}
//根据累加器得出结果
override def getResult(acc:(String,Double,Long)) = (acc._1,acc._2 / acc._3)
//定义累加器合并的逻辑
override def merge(acc1:(String,Double,Long),acc2:(String,Double,Long)) = {
(acc1._1,acc1._2+acc2._2,acc1._3+acc2._3)
}
}
}
//前面的代码和ReduceWindowFunctionTest程序中的代码相同,因此省略
val sumStream = stockPriceStream
.keyBy(s => s.stockId)
.timeWindow(Time.seconds(1))
.fold("CHINA_"){ (acc, v) => acc + v.stockId }
package cn.edu.xmu.dblab
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object ProcessWindowFunctionTest {
def main(args: Array[String]) {
//设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//设置为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//创建数据源,股票价格数据流
//输入数据样例:stock_4,1602031562148,43.4
val source = env.socketTextStream("localhost", 9999)
//指定针对数据流的转换操作逻辑
val stockPriceStream = source
.map(s => s.split(","))
.map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))
val sumStream = stockPriceStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
//为了测试方便,这里把水位线设置为0
.forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp
}
)
)
.keyBy(s => s.stockId)
.timeWindow(Time.seconds(3))
.process(new MyProcessWindowFunction())
//打印输出
sumStream.print()
//执行程序
env.execute("ProcessWindowFunction Test")
}
class MyProcessWindowFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {
//一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {
//聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据
var sumPrice = 0.0;
elements.foreach(stock => {
sumPrice = sumPrice + stock.price
})
out.collect(key, sumPrice/elements.size)
}
}
}
package cn.edu.xmu.dblab
import java.util.Calendar
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import scala.util.Random
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object TriggerTest {
def main(args: Array[String]) {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//设置为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
//创建数据源,股票价格数据流
//输入数据样例:stock_4,1602031562148,43.4
val source = env.socketTextStream("localhost", 9999)
//指定针对数据流的转换操作逻辑
val stockPriceStream = source
.map(s => s.split(","))
.map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))
val sumStream = stockPriceStream
.keyBy(s => s.stockId)
.timeWindow(Time.seconds(50))
.trigger(new MyTrigger(5))
.reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))
//打印输出
sumStream.print()
//程序触发执行
env.execute("Trigger Test")
}
class MyTrigger extends Trigger[StockPrice, TimeWindow] {
//触发计算的最大数量
private var maxCount: Long = _
//记录当前数量的状态
private lazy val countStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long])
def this(maxCount: Int) {
this()
this.maxCount = maxCount
}
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onElement(element: StockPrice, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
val countState = ctx.getPartitionedState(countStateDescriptor)
//计数状态加1
countState.add(1L)
if (countState.get() >= this.maxCount) {
//达到指定指定数量
//清空计数状态
countState.clear()
//触发计算
TriggerResult.FIRE
} else {
TriggerResult.CONTINUE
}
}
//窗口结束时清空状态
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
println("窗口结束时清空状态")
ctx.getPartitionedState(countStateDescriptor).clear()
}
//更新状态为累加值
class Sum extends ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = value1 + value2
}
}
}
package cn.edu.xmu.dblab
import java.time.Duration
import java.util
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
import org.apache.flink.util.Collector
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object EvictorTest {
def main(args: Array[String]) {
//设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//设置为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//创建数据源,股票价格数据流
val source = env.socketTextStream("localhost", 9999)
//指定针对数据流的转换操作逻辑
val stockPriceStream = source
.map(s => s.split(","))
.map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))
val sumStream = stockPriceStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
//为了测试方便,这里把水位线设置为0
.forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp
}
)
)
.keyBy(s => s.stockId)
.timeWindow(Time.seconds(3))
.evictor(new MyEvictor()) //自定义驱逐器
.process(new MyProcessWindowFunction()) //自定义窗口计算函数
//打印输出
sumStream.print()
//程序触发执行
env.execute("Evictor Test")
}
class MyEvictor() extends Evictor[StockPrice, TimeWindow] {
override def evictBefore(iterable: java.lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
val ite: util.Iterator[TimestampedValue[StockPrice]] = iterable.iterator()
while (ite.hasNext) {
val elment: TimestampedValue[StockPrice] = ite.next()
println("驱逐器获取到的股票价格:" + elment.getValue().price)
//模拟去掉非法参数数据
if (elment.getValue().price <= 0) {
println("股票价格小于0,删除该记录")
ite.remove()
}
}
}
override def evictAfter(iterable: java.lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
//不做任何操作
}
}
class MyProcessWindowFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {
// 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {
// 聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据
var sumPrice = 0.0;
elements.foreach(stock => {
sumPrice = sumPrice + stock.price
})
out.collect(key, sumPrice/elements.size)
}
}
}
stock_1,1602031567000,8
stock_1,1602031568000,-4
stock_1,1602031569000,3
stock_1,1602031570000,-8
stock_1,1602031571000,9
stock_1,1602031572000,10
val dataStream = ......
dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp
}
)
)
val dataStream = ......
dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp
}
)
)
package cn.edu.xmu.dblab
import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object WatermarkTest {
def main(args: Array[String]): Unit = {
//设定执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设定时间特性为事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设定程序并行度
env.setParallelism(1)
//创建数据源
val source = env.socketTextStream("localhost", 9999)
//指定针对数据流的转换操作逻辑
val stockDataStream = source
.map(s => s.split(","))
.map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))
//为数据流分配时间戳和水位线
val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)
//执行窗口计算
val sumStream = watermarkDataStream
.keyBy("stockId")
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))
//打印输出
sumStream.print("output")
//指定名称并触发流计算
env.execute("WatermarkTest")
}
//指定水位线生成策略
class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {
override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={
new SerializableTimestampAssigner[StockPrice] {
override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {
element.timeStamp //从到达消息中提取时间戳
}
}
}
override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={
new WatermarkGenerator[StockPrice](){
val maxOutOfOrderness = 10000L //设定最大延迟为10秒
var currentMaxTimestamp: Long = 0L
var a: Watermark = null
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {
currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)
a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
output.emitWatermark(a)
println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)
}
override def onPeriodicEmit(output:WatermarkOutput): Unit = {
// 没有使用周期性发送水印,因此这里没有执行任何操作
}
}
}
}
}
nc -lk 9999
cd /usr/local/flink/log
tail -f flink*.out
timestamp:stock_1,1602031567000|2020-10-07 08:46:07.000,1602031567000|2020-10-07 08:46:07.000,Watermark @ 1602031557000 (2020-10-07 08:45:57.000)
timestamp:stock_1,1602031571000|2020-10-07 08:46:11.000,1602031571000|2020-10-07 08:46:11.000,Watermark @ 1602031561000 (2020-10-07 08:46:01.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031577000|2020-10-07 08:46:17.000,Watermark @ 1602031567000 (2020-10-07 08:46:07.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031578000|2020-10-07 08:46:18.000,Watermark @ 1602031568000 (2020-10-07 08:46:08.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
output> StockPrice(stock_1,1602031567000,8.14)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031581000|2020-10-07 08:46:21.000,Watermark @ 1602031571000 (2020-10-07 08:46:11.000)
timestamp:stock_1,1602031582000|2020-10-07 08:46:22.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
output> StockPrice(stock_1,1602031571000,8.23)
package cn.edu.xmu.dblab
import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object AllowedLatenessTest {
def main(args: Array[String]): Unit = {
//设定执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设定时间特性为事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设定程序并行度
env.setParallelism(1)
//创建数据源
val source = env.socketTextStream("localhost", 9999)
//指定针对数据流的转换操作逻辑
val stockDataStream = source
.map(s => s.split(","))
.map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))
//为数据流分配时间戳和水位线
val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)
//执行窗口计算
val lateData = new OutputTag[StockPrice]("late")
val sumStream = watermarkDataStream
.keyBy("stockId")
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.allowedLateness(Time.seconds(2L))
.sideOutputLateData(lateData)
.reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))
//打印输出
sumStream.print("window计算结果:")
val late = sumStream.getSideOutput(lateData)
late.print("迟到的数据:")
//指定名称并触发流计算
env.execute("AllowedLatenessTest")
}
//指定水位线生成策略
class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {
override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={
new SerializableTimestampAssigner[StockPrice] {
override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {
element.timeStamp //从到达消息中提取时间戳
}
}
}
override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={
new WatermarkGenerator[StockPrice](){
val maxOutOfOrderness = 10000L //设定最大延迟为10秒
var currentMaxTimestamp: Long = 0L
var a: Watermark = null
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {
currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)
a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
output.emitWatermark(a)
println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)
}
override def onPeriodicEmit(output:WatermarkOutput): Unit = {
// 没有使用周期性发送水印,因此这里没有执行任何操作
}
}
}
}
}
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32
timestamp:stock_1,1602031567000|2020-10-07 08:46:07.000,1602031567000|2020-10-07 08:46:07.000,Watermark @ 1602031557000 (2020-10-07 08:45:57.000)
timestamp:stock_1,1602031571000|2020-10-07 08:46:11.000,1602031571000|2020-10-07 08:46:11.000,Watermark @ 1602031561000 (2020-10-07 08:46:01.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031577000|2020-10-07 08:46:17.000,Watermark @ 1602031567000 (2020-10-07 08:46:07.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031578000|2020-10-07 08:46:18.000,Watermark @ 1602031568000 (2020-10-07 08:46:08.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
window计算结果:> StockPrice(stock_1,1602031567000,8.14)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031581000|2020-10-07 08:46:21.000,Watermark @ 1602031571000 (2020-10-07 08:46:11.000)
timestamp:stock_1,1602031582000|2020-10-07 08:46:22.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
window计算结果:> StockPrice(stock_1,1602031571000,8.23)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031591000|2020-10-07 08:46:31.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window计算结果:> StockPrice(stock_1,1602031577000,16.48)
window计算结果:> StockPrice(stock_1,1602031578000,25.970000000000002)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031580000|2020-10-07 08:46:20.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window计算结果:> StockPrice(stock_1,1602031578000,34.400000000000006)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window计算结果:> StockPrice(stock_1,1602031578000,42.830000000000005)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window计算结果:> StockPrice(stock_1,1602031578000,51.260000000000005)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
迟到的数据:> StockPrice(stock_1,1602031577000,8.43)
package cn.edu.xmu.dblab
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object StateTest {
def main(args: Array[String]): Unit = {
//设定执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设定程序并行度
env.setParallelism(1)
//创建数据源
val source = env.socketTextStream("localhost", 9999)
//指定针对数据流的转换操作逻辑
val stockDataStream = source
.map(s => s.split(","))
.map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))
val alertStream = stockDataStream
.keyBy(_.stockId)
.flatMap(new PriceChangeAlert(10))
// 打印输出
alertStream.print()
//触发程序执行
env.execute("state test")
}
class PriceChangeAlert(threshold: Double) extends RichFlatMapFunction[StockPrice,(String, Double, Double)]{
//定义状态保存上一次的价格
lazy val lastPriceState: ValueState[Double] = getRuntimeContext
.getState(new ValueStateDescriptor[Double]("last-price",classOf[Double]))
override def flatMap(value: StockPrice, out: Collector[(String, Double, Double)]): Unit = {
// 获取上次的价格
val lastPrice = lastPriceState.value()
//跟最新的价格求差值做比较
val diff = (value.price-lastPrice).abs
if( diff > threshold)
out.collect((value.stockId,lastPrice,value.price))
//更新状态
lastPriceState.update(value.price)
}
}
}