第5章代码-林子雨编著-《Flink编程基础(Scala版)》

大数据学习路线图

林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码

DataStream API

val env = StreamExecutionEnvironment.getExecutionEnvironment
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.2</version>
</dependency>
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object FileSource{
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

//加载或创建数据源
    val dataStream = env.readTextFile("file:///usr/local/flink/README.txt")

//打印输出
    dataStream.print()

//程序触发执行
    env.execute()
  }
}
val socketDataStream = env.socketTextStream(“localhost”,9999)
val dataStream = env.fromElements(Tuple2(1L,3L),Tuple2(1L,5L))
val dataStream = env.fromCollection(List(1,2,3))
cd  /usr/local/kafka
./bin/zookeeper-server-start.sh  config/zookeeper.properties
cd  /usr/local/kafka
./bin/kafka-server-start.sh  config/server.properties
cd  /usr/local/kafka
bin/kafka-server-start.sh  config/server.properties  &
cd  /usr/local/kafka
./bin/kafka-topics.sh  --create  --zookeeper  localhost:2181  \
> --replication-factor  1  --partitions  1  --topic  wordsendertest
#这个Topic叫wordsendertest,2181是Zookeeper默认的端口号,--partitions是Topic里面的分区数,--replication-factor是备份的数量,在Kafka集群中使用,由于这里是单机版,所以不用备份
#可以用list列出所有创建的Topic,来查看上面创建的Topic是否存在
./bin/kafka-topics.sh  --list  --zookeeper  localhost:2181
./bin/kafka-console-producer.sh  --broker-list  localhost:9092 \
>  --topic  wordsendertest
hello hadoop
hello spark
cd /usr/local/kafka
./bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092  \
> --topic  wordsendertest  --from-beginning
package cn.edu.xmu.dblab

import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time

object KafkaWordCount {
  def main(args: Array[String]): Unit = {

    val kafkaProps = new Properties()
    //Kafka的一些属性
    kafkaProps.setProperty("bootstrap.servers", "localhost:9092")
    //所在的消费组
    kafkaProps.setProperty("group.id", "group1")

//获取当前的执行环境
val evn = StreamExecutionEnvironment.getExecutionEnvironment

    //创建Kafka的消费者,wordsendertest是要消费的Topic
    val kafkaSource = new FlinkKafkaConsumer[String]("wordsendertest",new SimpleStringSchema,kafkaProps)
    //设置从最新的offset开始消费
    kafkaSource.setStartFromLatest()
    //自动提交offset
kafkaSource.setCommitOffsetsOnCheckpoints(true)  

    //绑定数据源
val stream = evn.addSource(kafkaSource)

//设置转换操作逻辑
    val text = stream.flatMap{ _.toLowerCase().split("\\W+")filter { _.nonEmpty} }
      .map{(_,1)}
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    //打印输出
text.print()

    //程序触发执行
    evn.execute("Kafka Word Count")
  }
}
<project>
    <groupId>cn.edu.xmu.dblab</groupId>
    <artifactId>wordcount</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>WordCount</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
   <dependencies>
     <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.12</artifactId>
          <version>1.11.2</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.12</artifactId>
          <version>1.11.2</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
       <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
</dependencies>  
<build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
cd ~/flinkapp
/usr/local/flink/bin/flink run \
> --class cn.edu.xmu.dblab.KafkaWordCount \
> ./target/wordcount-1.0-jar-with-dependencies.jar
hello wuhan
hello china
cd /usr/local/flink/log
tail -f flink*.out
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object ReadHDFSFile{
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //加载或创建数据源
    val dataStream = env.readTextFile("hdfs://localhost:9000/user/hadoop/word.txt")

//打印输出
dataStream.print()

    //程序触发执行
    env.execute()
  }
}
<project>
    <groupId>cn.edu.xmu.dblab</groupId>
    <artifactId>wordcount</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>WordCount</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
   <dependencies>
     <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.12</artifactId>
          <version>1.11.2</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.12</artifactId>
          <version>1.11.2</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
       <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
</dependencies>  
<build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
vim ~/.bashrc
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
source ~/.bashrc
cd ~/flinkapp
/usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.ReadHDFSFile ./target/wordcount-1.0-jar-with-dependencies.jar
cd /usr/local/flink/log
tail -f flink*.out
package cn.edu.xmu.dblab

import java.util.Calendar
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.util.Random

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object StockPriceStreaming {
  def main(args: Array[String]) {

    //设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

//设置程序并行度    
env.setParallelism(1)    

    //股票价格数据流
    val stockPriceStream: DataStream[StockPrice] = env
      //该数据流由StockPriceSource类随机生成
      .addSource(new StockPriceSource)

    //打印结果
    stockPriceStream.print()

    //程序触发执行
    env.execute("stock price streaming")
  }

  class StockPriceSource extends RichSourceFunction[StockPrice]{

    var isRunning: Boolean = true
    val rand = new Random()
    //初始化股票价格
    var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
    var stockId = 0
    var curPrice = 0.0d

    override def run(srcCtx: SourceContext[StockPrice]): Unit = {
      while (isRunning) {
        //每次从列表中随机选择一只股票
        stockId = rand.nextInt(priceList.size)
        val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05
        priceList = priceList.updated(stockId, curPrice)
        val curTime = Calendar.getInstance.getTimeInMillis
        //将数据源收集写入SourceContext
        srcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))
        Thread.sleep(rand.nextInt(10))
      }
}

    override def cancel(): Unit = {
      isRunning = false
    }
  }
}
cd /usr/local/flink/log
tail -f flink*.out
val dataStream = env.fromElements(1,2,3,4,5)
val mapStream = dataStream.map(x=>x+10)
package cn.edu.xmu.dblab

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object MapFunctionTest {
  def main(args: Array[String]): Unit = {

    //设定执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设定程序并行度
env.setParallelism(1)

//创建数据源
val dataStream: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7)

    //设置转换操作逻辑
    val richFunctionDataStream = dataStream.map {new MyMapFunction()}

//打印输出
richFunctionDataStream.print()

//程序触发执行
    env.execute("MapFunctionTest")
  }

  //自定义函数,继承RichMapFunction
  class MyMapFunction extends RichMapFunction[Int, String] {
    override def map(input: Int): String =
      ("Input : " + input.toString + ", Output : " + (input * 3).toString)
  }
}
val dataStream = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val flatMapStream = dataStream.flatMap(line => line.split(" "))
package cn.edu.xmu.dblab

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object FlatMapFunctionTest {
  def main(args: Array[String]): Unit = {

    //设定执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设定程序并行度
env.setParallelism(1)

//设置数据源
val dataStream: DataStream[String] = env.fromElements("Hello Spark", "Flink is excellent")

    //针对数据集的转换操作逻辑
val result = dataStream.flatMap(new WordSplitFlatMap(15))

    //打印输出
result.print()

//程序触发执行
    env.execute("FlatMapFunctionTest")
  }

  //使用FlatMapFunction实现过滤逻辑,只对字符串长度大于threshold的内容进行切词
  class WordSplitFlatMap(threshold: Int) extends FlatMapFunction[String, String] {
    override def flatMap(value: String, out: Collector[String]): Unit = {
      if (value.size > threshold) {
        value.split(" ").foreach(out.collect)
      }
    }
  }
}
val dataStream = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val filterStream = dataStream.filter(line => line.contains("Flink"))   
val dataStream: DataStream[(Int, Double)] = env.fromElements((1, 2.0), (2, 1.7), (1, 4.9), (3, 8.5), (3, 11.2))
//使用数字位置定义Key 按照第一个字段进行分组
val keyedStream = dataStream.keyBy(0)
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object KeyByTest{
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置程序并行度
    env.setParallelism(1)

    //创建数据源
    val stockList = List(
      StockPrice("stock_4",1602031562148L,43.4D),
      StockPrice("stock_1",1602031562148L,22.9D),
      StockPrice("stock_0",1602031562153L,8.2D),
      StockPrice("stock_3",1602031562153L,42.1D),
      StockPrice("stock_2",1602031562153L,29.2D),
      StockPrice("stock_0",1602031562159L,8.1D),
      StockPrice("stock_4",1602031562159L,43.7D),
      StockPrice("stock_4",1602031562169L,43.5D)
    )
    val dataStream = env.fromCollection(stockList)

    //设定转换操作逻辑
    val keyedStream = dataStream.keyBy("stockId")

    //打印输出
    keyedStream.print()

    //程序触发执行
    env.execute("KeyByTest")
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object ReduceTest{
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置程序并行度
    env.setParallelism(1)

    //创建数据源
    val stockList = List(
      StockPrice("stock_4",1602031562148L,43.4D),
      StockPrice("stock_1",1602031562148L,22.9D),
      StockPrice("stock_0",1602031562153L,8.2D),
      StockPrice("stock_3",1602031562153L,42.1D),
      StockPrice("stock_2",1602031562153L,29.2D),
      StockPrice("stock_0",1602031562159L,8.1D),
      StockPrice("stock_4",1602031562159L,43.7D),
      StockPrice("stock_4",1602031562169L,43.5D)
    )
    val dataStream = env.fromCollection(stockList)

    //设定转换操作逻辑
    val keyedStream = dataStream.keyBy("stockId")
    val reduceStream = keyedStream
      .reduce((t1,t2)=>StockPrice(t1.stockId,t1.timeStamp,t1.price+t2.price))

    //打印输出
    reduceStream.print()

    //程序触发执行
    env.execute("ReduceTest")
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

//声明一个样例类,包含三个字段:股票ID,交易时间,交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object MyReduceFunctionTest{
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置程序并行度
    env.setParallelism(1)

    //创建数据源
    val stockList = List(
      StockPrice("stock_4",1602031562148L,43.4D),
      StockPrice("stock_1",1602031562148L,22.9D),
      StockPrice("stock_0",1602031562153L,8.2D),
      StockPrice("stock_3",1602031562153L,42.1D),
      StockPrice("stock_2",1602031562153L,29.2D),
      StockPrice("stock_0",1602031562159L,8.1D),
      StockPrice("stock_4",1602031562159L,43.7D),
      StockPrice("stock_4",1602031562169L,43.5D)
    )
    val dataStream = env.fromCollection(stockList)

    //设定转换操作逻辑
    val keyedStream = dataStream.keyBy("stockId")
    val reduceStream = keyedStream.reduce(new MyReduceFunction)

    //打印输出
    reduceStream.print()

    //程序触发执行
    env.execute("MyReduceFunctionTest")
  }
  class MyReduceFunction extends ReduceFunction[StockPrice] {
    override def reduce(t1: StockPrice,t2:StockPrice):StockPrice = {
      StockPrice(t1.stockId,t1.timeStamp,t1.price+t2.price)
    }
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object AggregationTest{
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置程序并行度
    env.setParallelism(1)

    //创建数据源
    val stockList = List(
      StockPrice("stock_4",1602031562148L,43.4D),
      StockPrice("stock_1",1602031562148L,22.9D),
      StockPrice("stock_0",1602031562153L,8.2D),
      StockPrice("stock_3",1602031562153L,42.1D),
      StockPrice("stock_2",1602031562153L,29.2D),
      StockPrice("stock_0",1602031562159L,8.1D),
      StockPrice("stock_4",1602031562159L,43.7D),
      StockPrice("stock_4",1602031562169L,43.5D)
    )
    val dataStream = env.fromCollection(stockList)

    //设定转换操作逻辑
    val keyedStream = dataStream.keyBy("stockId")
    val aggregationStream = keyedStream.sum(2)

    //打印输出
    aggregationStream.print()

    //执行操作
    env.execute(" AggregationTest")
  }
}
val dataStream = env.fromElements("hadoop","spark","flink")
dataStream.writeAsText("file:///home/hadoop/output.txt")
package cn.edu.xmu.dblab

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

object SinkKafkaTest{
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

//加载或创建数据源
    val dataStream = env.fromElements("hadoop","spark","flink")

    //把数据输出到Kafka
dataStream.addSink(new FlinkKafkaProducer [String]("localhost:9092", "sinkKafka", new SimpleStringSchema()))

//程序触发执行
    env.execute()
  }
}
cd  /usr/local/kafka
./bin/zookeeper-server-start.sh  config/zookeeper.properties
cd  /usr/local/kafka
./bin/kafka-server-start.sh  config/server.properties
cd  /usr/local/kafka
./bin/kafka-topics.sh  --create  --zookeeper  localhost:2181  \
> --replication-factor  1  --partitions  1  --topic  sinkKafka
cd ~/flinkapp
/usr/local/flink/bin/flink run \
> --class cn.edu.xmu.dblab.SinkKafkaTest \
> ./target/wordcount-1.0-jar-with-dependencies.jar
cd /usr/local/kafka
./bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092  \
> --topic  sinkKafka --from-beginning
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object WriteHDFSFile{
  def main(args: Array[String]): Unit = {

    //获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//设置程序并行度为1
env.setParallelism(1)

//创建数据源
    val dataStream = env.fromElements("hadoop","spark","flink")

//把数据写入HDFS
dataStream.writeAsText("hdfs://localhost:9000/output.txt")

//程序触发执行
    env.execute()
  }
}
cd ~/flinkapp
/usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.WriteHDFSFile ./target/wordcount-1.0-jar-with-dependencies.jar
//设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//把时间特性设置为“事件时间”
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//或者,把时间特性设置为“处理时间”
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val dataStream: DataStream[T] = ...

//基于事件时间的滚动窗口,窗口大小为5秒钟
dataStream
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<window function>(...)

//基于处理时间的滚动窗口,窗口大小为5秒钟
dataStream
    .keyBy(...)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<window function>(...)

//基于事件时间的滚动窗口,窗口大小为1小时,偏移量为15分钟
dataStream
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
    .<window function>(...)
dataStream
    .keyBy(...)
    .timeWindow(Time.seconds(1))
    .<window function>(...)
val dataStream: DataStream[T] = ...

//基于事件时间的滑动窗口,窗口大小为10秒,滑动步长为5秒
dataStream
    .keyBy(...)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<window function>(...)

//基于处理时间的滑动窗口,窗口大小为10秒,滑动步长为5秒
dataStream
    .keyBy(<...>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<window function>(...)

//基于处理时间的滑动窗口,窗口大小为12小时,滑动步长为1小时,偏移量为8小时
dataStream
    .keyBy(<...>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<window function>(...)
val input: DataStream[T] = ...

//基于事件时间的会话窗口,会话间隙为10分钟
input
    .keyBy(...)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<window function>(...)

//基于处理时间的会话窗口,会话间隙为10分钟
input
    .keyBy(...)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<window function>(...)
package cn.edu.xmu.dblab

import java.util.Calendar
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Random

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object ReduceWindowFunctionTest {
  def main(args: Array[String]) {

    //设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置程序并行度
env.setParallelism(1)

//设置为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    //创建数据源,股票价格数据流
    val stockPriceStream: DataStream[StockPrice] = env
      //该数据流由StockPriceSource类随机生成
      .addSource(new StockPriceSource)

    //确定针对数据集的转换操作逻辑
    val sumStream = stockPriceStream
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(1))
      .reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))

    //打印输出
sumStream.print()

    //程序触发执行
    env.execute("ReduceWindowFunctionTest")
  }

  class StockPriceSource extends RichSourceFunction[StockPrice]{
    var isRunning: Boolean = true

    val rand = new Random()
    //初始化股票价格
    var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
    var stockId = 0
    var curPrice = 0.0d

    override def run(srcCtx: SourceContext[StockPrice]): Unit = {
      while (isRunning) {
        //每次从列表中随机选择一只股票
        stockId = rand.nextInt(priceList.size)
        val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05
        priceList = priceList.updated(stockId, curPrice)
        val curTime = Calendar.getInstance.getTimeInMillis

        //将数据源收集写入SourceContext
        srcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))
        Thread.sleep(rand.nextInt(1000))
      }
    }
    override def cancel(): Unit = {
      isRunning = false
    }
  }
}
package cn.edu.xmu.dblab

import java.util.Calendar
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Random

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object AggregateWindowFunctionTest {
  def main(args: Array[String]) {

    // 设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置程序并行度
    env.setParallelism(1)

    //设置为处理时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    //创建数据源,股票价格数据流
    val stockPriceStream: DataStream[StockPrice] = env
      //该数据流由StockPriceSource类随机生成
      .addSource(new StockPriceSource)

    stockPriceStream.print("input")

    //设定针对数据集的转换操作逻辑
    val sumStream = stockPriceStream
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(1))
      .aggregate(new MyAggregateFunction)

    //打印输出
    sumStream.print("output")

    //程序触发执行
    env.execute("AggregateWindowFunctionTest")
  }

  class StockPriceSource extends RichSourceFunction[StockPrice]{
    var isRunning: Boolean = true
    val rand = new Random()
    // 初始化股票价格
    var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
    var stockId = 0
    var curPrice = 0.0d

    override def run(srcCtx: SourceContext[StockPrice]): Unit = {
      while (isRunning) {
        // 每次从列表中随机选择一只股票
        stockId = rand.nextInt(priceList.size)
        val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05
        priceList = priceList.updated(stockId, curPrice)
        val curTime = Calendar.getInstance.getTimeInMillis

        // 将数据源收集写入SourceContext
        srcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))
        Thread.sleep(rand.nextInt(500))
      }
    }
    override def cancel(): Unit = {
      isRunning = false
    }
  }

  //自定义函数
  class MyAggregateFunction extends AggregateFunction[StockPrice,(String,Double,Long),(String,Double)] {
    //创建累加器
override def createAccumulator(): (String,Double, Long) = ("",0D,0L)
//定义把输入数据累加到累加器的逻辑
    override def add(input:StockPrice,acc:(String,Double,Long))={
      (input.stockId,acc._2+input.price,acc._3+1L)
    }
    //根据累加器得出结果
override def getResult(acc:(String,Double,Long)) = (acc._1,acc._2 / acc._3)
//定义累加器合并的逻辑
    override def merge(acc1:(String,Double,Long),acc2:(String,Double,Long)) = {
      (acc1._1,acc1._2+acc2._2,acc1._3+acc2._3)
    }
  }
}
//前面的代码和ReduceWindowFunctionTest程序中的代码相同,因此省略
val sumStream = stockPriceStream
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(1))
      .fold("CHINA_"){ (acc, v) => acc + v.stockId }
package cn.edu.xmu.dblab

import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object ProcessWindowFunctionTest {
  def main(args: Array[String]) {

    //设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置程序并行度
    env.setParallelism(1)

    //设置为处理时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//创建数据源,股票价格数据流
//输入数据样例:stock_4,1602031562148,43.4
    val source = env.socketTextStream("localhost", 9999)

    //指定针对数据流的转换操作逻辑
    val stockPriceStream = source
      .map(s => s.split(","))
      .map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))

    val sumStream = stockPriceStream
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          //为了测试方便,这里把水位线设置为0
          .forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
            override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp
          }
          )
      )
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(3))
      .process(new MyProcessWindowFunction())

    //打印输出
    sumStream.print()

    //执行程序
    env.execute("ProcessWindowFunction Test")
  }

  class MyProcessWindowFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {
    //一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
    override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {
      //聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据
      var sumPrice = 0.0;
      elements.foreach(stock => {
        sumPrice = sumPrice + stock.price
      })
      out.collect(key, sumPrice/elements.size)
    }
  }
}
package cn.edu.xmu.dblab

import java.util.Calendar
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import scala.util.Random

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object TriggerTest {
  def main(args: Array[String]) {

    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置程序并行度
    env.setParallelism(1)

    //设置为处理时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

//创建数据源,股票价格数据流
//输入数据样例:stock_4,1602031562148,43.4
    val source = env.socketTextStream("localhost", 9999)

    //指定针对数据流的转换操作逻辑
    val stockPriceStream = source
      .map(s => s.split(","))
      .map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))
    val sumStream = stockPriceStream
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(50))
      .trigger(new MyTrigger(5))
      .reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))

    //打印输出
    sumStream.print()

    //程序触发执行
    env.execute("Trigger Test")
  }

  class MyTrigger extends Trigger[StockPrice, TimeWindow] {
    //触发计算的最大数量
    private var maxCount: Long = _

    //记录当前数量的状态
private lazy val countStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long])

    def this(maxCount: Int) {
      this()
      this.maxCount = maxCount      
}

    override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
           TriggerResult.CONTINUE
}

    override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
         TriggerResult.CONTINUE
}

    override def onElement(element: StockPrice, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
      val countState = ctx.getPartitionedState(countStateDescriptor)
      //计数状态加1
      countState.add(1L)      
      if (countState.get() >= this.maxCount) {
        //达到指定指定数量       
        //清空计数状态
        countState.clear()
        //触发计算        
        TriggerResult.FIRE
      } else {
        TriggerResult.CONTINUE
      }
    }

    //窗口结束时清空状态
    override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
      println("窗口结束时清空状态")
      ctx.getPartitionedState(countStateDescriptor).clear()
    }

    //更新状态为累加值
    class Sum extends ReduceFunction[Long] {
      override def reduce(value1: Long, value2: Long): Long = value1 + value2
    } 
  }
}
package cn.edu.xmu.dblab

import java.time.Duration
import java.util
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
import org.apache.flink.util.Collector

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object EvictorTest {

  def main(args: Array[String]) {

    //设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置程序并行度
    env.setParallelism(1)

    //设置为处理时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //创建数据源,股票价格数据流
    val source = env.socketTextStream("localhost", 9999)

    //指定针对数据流的转换操作逻辑
    val stockPriceStream = source
      .map(s => s.split(","))
      .map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))

    val sumStream = stockPriceStream
         .assignTimestampsAndWatermarks(
            WatermarkStrategy
              //为了测试方便,这里把水位线设置为0
              .forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0))
              .withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
                override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp
              }
        )
    )
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(3))
      .evictor(new MyEvictor()) //自定义驱逐器
      .process(new MyProcessWindowFunction())  //自定义窗口计算函数

    //打印输出
    sumStream.print()

    //程序触发执行 
    env.execute("Evictor Test")
  }
  class MyEvictor() extends Evictor[StockPrice, TimeWindow] {
    override def evictBefore(iterable: java.lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
      val ite: util.Iterator[TimestampedValue[StockPrice]] = iterable.iterator()
      while (ite.hasNext) {
        val elment: TimestampedValue[StockPrice] = ite.next()
        println("驱逐器获取到的股票价格:" + elment.getValue().price)
        //模拟去掉非法参数数据
        if (elment.getValue().price <= 0) {
          println("股票价格小于0,删除该记录")
          ite.remove()
        }
      }
    }

override def evictAfter(iterable: java.lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
//不做任何操作
    }
  }

  class MyProcessWindowFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {
    // 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
    override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {
    // 聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据
      var sumPrice = 0.0;
      elements.foreach(stock => {
        sumPrice = sumPrice + stock.price
      })
      out.collect(key, sumPrice/elements.size)
    }
  }
}
stock_1,1602031567000,8
stock_1,1602031568000,-4
stock_1,1602031569000,3
stock_1,1602031570000,-8
stock_1,1602031571000,9
stock_1,1602031572000,10
val dataStream = ......
dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
    override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp
  }
)
)
val dataStream = ......
dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
    override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp
  }
)
)
package cn.edu.xmu.dblab

import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object WatermarkTest {

  def main(args: Array[String]): Unit = {
    //设定执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设定时间特性为事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//设定程序并行度
env.setParallelism(1)

    //创建数据源
val source = env.socketTextStream("localhost", 9999)

    //指定针对数据流的转换操作逻辑
val stockDataStream = source
      .map(s => s.split(","))
      .map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))

    //为数据流分配时间戳和水位线 
val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)

    //执行窗口计算
val sumStream = watermarkDataStream
      .keyBy("stockId")
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      .reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))

    //打印输出
sumStream.print("output")

    //指定名称并触发流计算
    env.execute("WatermarkTest")
  }

  //指定水位线生成策略
  class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {

    override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={
      new SerializableTimestampAssigner[StockPrice] {
        override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {
          element.timeStamp //从到达消息中提取时间戳
        }
      }
    }

    override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={
      new WatermarkGenerator[StockPrice](){
        val maxOutOfOrderness = 10000L //设定最大延迟为10秒
        var currentMaxTimestamp: Long = 0L
        var a: Watermark = null
        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

        override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {          
          currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)
          a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
          output.emitWatermark(a)
          println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)
        }

        override def onPeriodicEmit(output:WatermarkOutput): Unit = {
          // 没有使用周期性发送水印,因此这里没有执行任何操作
        }
      }
    }
  }
}
nc  -lk  9999
cd /usr/local/flink/log
tail -f flink*.out
timestamp:stock_1,1602031567000|2020-10-07 08:46:07.000,1602031567000|2020-10-07 08:46:07.000,Watermark @ 1602031557000 (2020-10-07 08:45:57.000)
timestamp:stock_1,1602031571000|2020-10-07 08:46:11.000,1602031571000|2020-10-07 08:46:11.000,Watermark @ 1602031561000 (2020-10-07 08:46:01.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031577000|2020-10-07 08:46:17.000,Watermark @ 1602031567000 (2020-10-07 08:46:07.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031578000|2020-10-07 08:46:18.000,Watermark @ 1602031568000 (2020-10-07 08:46:08.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
output> StockPrice(stock_1,1602031567000,8.14)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031581000|2020-10-07 08:46:21.000,Watermark @ 1602031571000 (2020-10-07 08:46:11.000)
timestamp:stock_1,1602031582000|2020-10-07 08:46:22.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
output> StockPrice(stock_1,1602031571000,8.23)
package cn.edu.xmu.dblab

import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object AllowedLatenessTest {
  def main(args: Array[String]): Unit = {

    //设定执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设定时间特性为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //设定程序并行度
    env.setParallelism(1)

    //创建数据源
    val source = env.socketTextStream("localhost", 9999)

    //指定针对数据流的转换操作逻辑
    val stockDataStream = source
      .map(s => s.split(","))
      .map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))

    //为数据流分配时间戳和水位线
    val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)

    //执行窗口计算
    val lateData = new OutputTag[StockPrice]("late")
    val sumStream = watermarkDataStream
      .keyBy("stockId")
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      .allowedLateness(Time.seconds(2L))
      .sideOutputLateData(lateData)
      .reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))

    //打印输出
    sumStream.print("window计算结果:")
    val late = sumStream.getSideOutput(lateData)
    late.print("迟到的数据:")

    //指定名称并触发流计算
    env.execute("AllowedLatenessTest")
  }

  //指定水位线生成策略
  class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {

    override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={
      new SerializableTimestampAssigner[StockPrice] {
        override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {
          element.timeStamp //从到达消息中提取时间戳
        }
      }
    }

    override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={
      new WatermarkGenerator[StockPrice](){
        val maxOutOfOrderness = 10000L //设定最大延迟为10秒
        var currentMaxTimestamp: Long = 0L
        var a: Watermark = null
        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

        override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {
          currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)
          a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
          output.emitWatermark(a)
          println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)
        }

        override def onPeriodicEmit(output:WatermarkOutput): Unit = {
          // 没有使用周期性发送水印,因此这里没有执行任何操作
        }
      }
    }
  }
}
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32
timestamp:stock_1,1602031567000|2020-10-07 08:46:07.000,1602031567000|2020-10-07 08:46:07.000,Watermark @ 1602031557000 (2020-10-07 08:45:57.000)
timestamp:stock_1,1602031571000|2020-10-07 08:46:11.000,1602031571000|2020-10-07 08:46:11.000,Watermark @ 1602031561000 (2020-10-07 08:46:01.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031577000|2020-10-07 08:46:17.000,Watermark @ 1602031567000 (2020-10-07 08:46:07.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031578000|2020-10-07 08:46:18.000,Watermark @ 1602031568000 (2020-10-07 08:46:08.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
window计算结果:> StockPrice(stock_1,1602031567000,8.14)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031581000|2020-10-07 08:46:21.000,Watermark @ 1602031571000 (2020-10-07 08:46:11.000)
timestamp:stock_1,1602031582000|2020-10-07 08:46:22.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
window计算结果:> StockPrice(stock_1,1602031571000,8.23)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031591000|2020-10-07 08:46:31.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window计算结果:> StockPrice(stock_1,1602031577000,16.48)
window计算结果:> StockPrice(stock_1,1602031578000,25.970000000000002)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031580000|2020-10-07 08:46:20.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window计算结果:> StockPrice(stock_1,1602031578000,34.400000000000006)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window计算结果:> StockPrice(stock_1,1602031578000,42.830000000000005)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window计算结果:> StockPrice(stock_1,1602031578000,51.260000000000005)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
迟到的数据:> StockPrice(stock_1,1602031577000,8.43)
package cn.edu.xmu.dblab

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object StateTest {
  def main(args: Array[String]): Unit = {
    //设定执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

//设定程序并行度
    env.setParallelism(1)

    //创建数据源
    val source = env.socketTextStream("localhost", 9999)

    //指定针对数据流的转换操作逻辑
    val stockDataStream = source
      .map(s => s.split(","))
      .map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))
    val alertStream = stockDataStream
      .keyBy(_.stockId)
      .flatMap(new PriceChangeAlert(10))

    // 打印输出
alertStream.print()

//触发程序执行
    env.execute("state test")
  }
  class PriceChangeAlert(threshold: Double) extends RichFlatMapFunction[StockPrice,(String, Double, Double)]{
    //定义状态保存上一次的价格
    lazy val lastPriceState: ValueState[Double] = getRuntimeContext
      .getState(new ValueStateDescriptor[Double]("last-price",classOf[Double]))
    override def flatMap(value: StockPrice, out: Collector[(String, Double, Double)]): Unit = {
      // 获取上次的价格
val lastPrice = lastPriceState.value()
//跟最新的价格求差值做比较
      val diff = (value.price-lastPrice).abs
      if( diff > threshold)
        out.collect((value.stockId,lastPrice,value.price))
      //更新状态
      lastPriceState.update(value.price)
    }
  }
}