代码-第5章 DataStream API-林子雨编著《Flink编程基础(Java版)》

大数据学习路线图

厦门大学林子雨编著《Flink编程基础(Java版)》教材中的命令行和代码(教材官网
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Flink编程基础(Java版)》教材中的所有命令行和代码

文件名为FileSourceDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FileSourceDemo {
    public FileSourceDemo(){}
    public static void main(String[] args) throws Exception {
        //1.构建流式执行环境
        StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();
        //2.创建数据源,需要事先准备好一个word.txt文件
        DataStreamSource<String>  source = env.readTextFile("file:///home/hadoop/word.txt");
        //3.指定对接收的数据进行转换操作的逻辑
        //3.1 使用flatMap算子进行处理
        DataStream<String> flatMapStream = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });
        //3.2 使用map算子进行转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });
        //3.3 使用keyBy算子进行单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });
        //3.4 使用sum算子进行汇总求和
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumStream = keyedStream.sum(1);
        //4.指定数据计算的输出结果方式
        sumStream.print();
        //5.程序触发执行
        env.execute();
    }
}

pom.xml文件

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
</dependency>

文件FileSourceDemo2.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FileSourceDemo2 {
    public FileSourceDemo2(){}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(
                new TextLineInputFormat(),
                new Path("file:///home/hadoop/word.txt")
        ).build();
        DataStreamSource<String> fileSourceStream = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource");
        fileSourceStream.print();
        env.execute();
    }
}
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0

在pom.xml中增加如下两个依赖:

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.5</version>
        </dependency>
cd /usr/local/hadoop
./sbin/start-dfs.sh
cd /usr/local/flink
./bin/start-cluster.sh
cd /usr/local/flink
./bin/flink run \
> --class cn.edu.xmu.FileSourceDemo \
> ~/IdeaProjects/FlinkWordCount/target/wordcount-1.0.jar
nc -lk 9999
cd  /usr/local/kafka
./bin/zookeeper-server-start.sh  config/zookeeper.properties
cd  /usr/local/kafka
./bin/kafka-server-start.sh  config/server.properties
cd  /usr/local/kafka
./bin/kafka-topics.sh  --create  --zookeeper  localhost:2181  \
> --replication-factor  1  --partitions  1  --topic  wordsendertest
#这个Topic叫wordsendertest,2181是Zookeeper默认的端口号,--partitions是Topic里面的分区数,--replication-factor是备份的数量,在Kafka集群中使用,由于这里是单机版,所以不用备份
#可以用list列出所有创建的Topic,来查看上面创建的Topic是否存在
./bin/kafka-topics.sh  --list  --zookeeper  localhost:2181
./bin/kafka-console-producer.sh  --broker-list  localhost:9092 \
>  --topic  wordsendertest
cd /usr/local/kafka
./bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092  \
> --topic  wordsendertest  --from-beginning

文件KafkaSourceDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KafkaSourceDemo {
    public KafkaSourceDemo(){}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop01:9092")
                .setGroupId("dblab")
                .setTopics("wordsendertest")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.earliest())
                .build();
        DataStreamSource<String> kafkaSourceStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource");
        kafkaSourceStream.print();
        env.execute();
    }
}

在IDEA中编辑pom.xml文件,文件内容需要在4.2.3节中的pom.xml的基础上,继续增加如下依赖:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>${flink.version}</version>
</dependency>
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.0

文件DataGeneratorSourceDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DataGeneratorSourceDemo {
    public DataGeneratorSourceDemo(){}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Current Number is:" + value;
                    }
                },
                5,
                RateLimiterStrategy.perSecond(1),
                Types.STRING
        );
        DataStreamSource<String> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");
        dataGeneratorSourceStream.print();
        env.execute();
    }
}

在IDEA中编辑pom.xml文件,文件内容需要在4.2.3节中的pom.xml的基础上,继续增加如下依赖:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
</dependency>

代码文件StockPrice.java

package cn.edu.xmu;

public class StockPrice {
    public String stockId;
    public Long timeStamp;
    public Double price;

    // 一定要提供一个 空参 的构造器(反射的时候要使用)
    public StockPrice() {
    }

    public StockPrice(String stockId, Long timeStamp, Double price) {
        this.stockId = stockId;
        this.timeStamp = timeStamp;
        this.price = price;
    }

    public String getStockId() {
        return stockId;
    }

    public void setStockId(String stockId) {
        this.stockId = stockId;
    }

    public Long getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(Long timeStamp) {
        this.timeStamp = timeStamp;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "StockPrice{" +
                "stockId=" + stockId +
                ", timeStamp='" + timeStamp + '\'' +
                ", price=" + price +
                '}';
    }
}

代码文件MyGeneratorFunction.java

package cn.edu.xmu;

import cn.edu.xmu.StockPrice;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.datagen.source.GeneratorFunction;

public class MyGeneratorFunction implements GeneratorFunction<Long, StockPrice> {

    // 定义随机数数据生成器
    public RandomDataGenerator generator;

    // 初始化随机数数据生成器
    @Override
    public void open(SourceReaderContext readerContext) throws Exception {
        generator = new RandomDataGenerator();
    }

    @Override
    public StockPrice map(Long value) throws Exception {
        // 使用随机数数据生成器来创建StockPrice例
        String stockId = "stock_"+value.toString();
        StockPrice stockPrice = new StockPrice(stockId
                , System.currentTimeMillis()
                , generator.nextInt(1,100)*0.1
        );
        return stockPrice;
    }

}

代码文件ReadDataGeneratorSource.java

package cn.edu.xmu;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReadDataGeneratorSource {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2.自定义数据生成器Source
        DataGeneratorSource<StockPrice> dataGeneratorSource = new DataGeneratorSource<>(
                // 指定GeneratorFunction 实现类
                new MyGeneratorFunction(),
                // 指定输出数据的总行数
                5,
                // 指定每秒发射的记录数
                RateLimiterStrategy.perSecond(1),
                // 指定返回值类型
                TypeInformation.of(StockPrice.class) // 将Java的StockPrice封装成到TypeInformation
        );

        // 3.读取dataGeneratorSource中的数据
        DataStreamSource<StockPrice> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource
                , WatermarkStrategy.noWatermarks()  //指定水位线生成策略
                , "dataGeneratorSource");
        dataGeneratorSourceStream.print();
        env.execute();
    }
}

代码文件MapDemo1.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MapDemo1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
                );
        SingleOutputStreamOperator<String> mapDS =
                stockPriceDS.map(
                        new MapFunction<StockPrice, String>() {
                            @Override
                            public String map(StockPrice value) throws Exception {
                                return value.stockId;
                            }
                        }
        );
        mapDS.print();
        env.execute();
    }
}

代码文件MapDemo2.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MapDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
                );
        SingleOutputStreamOperator<String> mapDS =
                stockPriceDS.map(stockPrice->stockPrice.stockId);
        mapDS.print();
        env.execute();
    }
}

代码文件MapDemo3.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MapDemo3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
                );
        SingleOutputStreamOperator<String> mapDS =
                stockPriceDS.map(new MyMapFunction());
        mapDS.print();
        env.execute();
    }
    public static class MyMapFunction implements MapFunction<StockPrice,String>{
        @Override
        public String map(StockPrice value) throws Exception{
            return value.stockId;
        }
    }
}

代码文件FlatMapDemo1.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlatMapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
                );
        SingleOutputStreamOperator<String> flatMapDS =
                stockPriceDS.flatMap(new FlatMapFunction<StockPrice, String>() {
                    @Override
                    public void flatMap(StockPrice value, Collector<String> out) throws Exception {
                        if("stock_1".equals(value.stockId)){
                            out.collect(value.stockId);
                        }else if("stock_2".equals(value.stockId)){
                            out.collect(value.stockId);
                            out.collect(value.price.toString());
                        }
                    }
                });
        flatMapDS.print();
        env.execute();
    }
}

代码文件FlatMapDemo2.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlatMapDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
                );
        SingleOutputStreamOperator<String> flatMapDS =
                stockPriceDS.flatMap((StockPrice value,Collector<String> out) ->{
                    if("stock_1".equals(value.stockId)){
                        out.collect(value.stockId);
                    }else if("stock_2".equals(value.stockId)){
                        out.collect(value.stockId);
                        out.collect(value.price.toString());
                    }
                }
                );
        flatMapDS.returns(Types.STRING);
        flatMapDS.print();
        env.execute();
    }
}

代码文件FlatMapDemo3.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlatMapDemo3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
                );
        SingleOutputStreamOperator<String> flatMapDS =
                stockPriceDS.flatMap(new MyFlatMapFunction());
        flatMapDS.print();
        env.execute();
    }
    public static class MyFlatMapFunction implements FlatMapFunction<StockPrice,String>{
        @Override
        public void flatMap(StockPrice value,Collector<String> out) throws Exception{
            if("stock_1".equals(value.stockId)){
                out.collect(value.stockId);
            }else if("stock_2".equals(value.stockId)){
                out.collect(value.stockId);
                out.collect(value.price.toString());
            }
        }
    }
}

代码文件FilterDemo1.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FilterDemo1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
                );
        SingleOutputStreamOperator<StockPrice> filterDS =
                stockPriceDS.filter(new FilterFunction<StockPrice>() {
                    @Override
                    public boolean filter(StockPrice value) throws Exception {
                        return "stock_1".equals(value.stockId);
                    }
                }
                );
        filterDS.print();
        env.execute();
    }
}

代码文件FilterDemo2.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FilterDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
                );
        SingleOutputStreamOperator<StockPrice> filterDS =
                stockPriceDS.filter(stockPrice->"stock_1".equals(stockPrice.stockId));
        filterDS.print();
        env.execute();
    }
}

代码文件FilterDemo3.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FilterDemo3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
                );
        SingleOutputStreamOperator<StockPrice> filterDS =
                stockPriceDS.filter(new MyFilterFunction());
        filterDS.print();
        env.execute();
    }
    public static class MyFilterFunction implements FilterFunction<StockPrice>{
        @Override
        public boolean filter(StockPrice value) throws Exception{
            return "stock_1".equals(value.stockId);
        }
    }
}

代码文件KeyByDemo.java

package cn.edu.xmu;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KeyByDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D),
                new StockPrice("stock_1", 10004L, 5.2D)
                );
        KeyedStream<StockPrice, String> stockPriceKeyedStream = stockPriceDS.keyBy(new KeySelector<StockPrice, String>() {
            @Override
            public String getKey(StockPrice value) throws Exception {
                return value.stockId;
            }
        });
        stockPriceKeyedStream.print();
        env.execute();
    }
}

代码文件SimpleAggregateDemo.java

package cn.edu.xmu;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SimpleAggregateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D),
                new StockPrice("stock_1", 10004L, 5.2D)
                );
        KeyedStream<StockPrice, String> stockPriceKeyedStream =
                stockPriceDS.keyBy(new KeySelector<StockPrice, String>() {
                    @Override
                    public String getKey(StockPrice value) throws Exception {
                    return value.stockId;
            }
        });
        SingleOutputStreamOperator<StockPrice> result =
                stockPriceKeyedStream.min("price");
        result.print();
        env.execute();
    }
}

代码文件ReduceDemo1.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReduceDemo1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D),
                new StockPrice("stock_1", 10004L, 5.2D)
                );
        KeyedStream<StockPrice, String> stockPriceKeyedStream =
                stockPriceDS.keyBy(new KeySelector<StockPrice, String>() {
                    @Override
                    public String getKey(StockPrice value) throws Exception {
                    return value.stockId;
            }
        });
        SingleOutputStreamOperator<StockPrice> result =
                stockPriceKeyedStream.reduce(new ReduceFunction<StockPrice>() {
                    @Override
                    public StockPrice reduce(StockPrice value1, StockPrice value2) throws Exception {
                        return new StockPrice(
                                value1.stockId,
                                value1.timeStamp,
                                value1.price+value2.price);
                    }
                });
        result.print();
        env.execute();
    }
}

代码文件ReduceDemo2.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReduceDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D),
                new StockPrice("stock_1", 10004L, 5.2D)
                );
        KeyedStream<StockPrice, String> stockPriceKeyedStream =
                stockPriceDS.keyBy(new KeySelector<StockPrice, String>() {
                    @Override
                    public String getKey(StockPrice value) throws Exception {
                    return value.stockId;
            }
        });
        SingleOutputStreamOperator<StockPrice> result =
                stockPriceKeyedStream.reduce(
                        (value1,value2)->new StockPrice(
                        value1.stockId,
                        value1.timeStamp,
                        value1.price+value2.price));
        result.print();
        env.execute();
    }
}

代码文件ReduceDemo3.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReduceDemo3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D),
                new StockPrice("stock_1", 10004L, 5.2D)
                );
        KeyedStream<StockPrice, String> stockPriceKeyedStream =
                stockPriceDS.keyBy(new KeySelector<StockPrice, String>() {
                    @Override
                    public String getKey(StockPrice value) throws Exception {
                    return value.stockId;
            }
        });
        SingleOutputStreamOperator<StockPrice> result =
                stockPriceKeyedStream.reduce(new MyReduceFunction());
        result.print();
        env.execute();
    }
    public static class MyReduceFunction implements ReduceFunction<StockPrice>{
        @Override
        public StockPrice reduce(StockPrice value1, StockPrice value2) throws Exception {
            return new StockPrice(
                    value1.stockId,
                    value1.timeStamp,
                    value1.price+value2.price);
        }
    }
}

代码文件PartitionDemo1.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class PartitionDemo1 {
    public PartitionDemo1(){}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 为了看到重分区效果,必须把并行度设置为大于1的值
env.setParallelism(2);
        DataStreamSource<String>  source = env.socketTextStream("hadoop01",9999);
        source.shuffle().print();
        env.execute();
    }
}

代码文件SplitByFilterDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SplitByFilterDemo {
    public SplitByFilterDemo(){}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer>  source = env.fromElements(1,2,3,4,5,6,7,8);
        SingleOutputStreamOperator<Integer> evenDS = source.filter(value -> value % 2 == 0);
        SingleOutputStreamOperator<Integer> oddDS = source.filter(value -> value % 2 != 0);
        evenDS.print("偶数流");
        oddDS.print("奇数流");
        env.execute();
    }
}

代码文件SplitBySideOutputDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SplitBySideOutputDemo {
    public SplitByFilterDemo(){}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
        );
        // 新建标签对象,有两个参数,第1个参数表示标签名称,第2个参数表示放入侧输出流的数据的类型
        OutputTag<StockPrice> stock1Tag =
                new OutputTag<>("stock_1", Types.POJO(StockPrice.class));
        OutputTag<StockPrice> stock2Tag =
                new OutputTag<>("stock_2", Types.POJO(StockPrice.class));
        SingleOutputStreamOperator<StockPrice> processResult
                = stockPriceDS.process(new ProcessFunction<StockPrice, StockPrice>() {
            @Override
            public void processElement(StockPrice value, ProcessFunction<StockPrice, StockPrice>.Context ctx, Collector<StockPrice> out) throws Exception {
                String stockId = value.getStockId();
                if ("stock_1".equals(stockId)) {
                    // 如果ID是stock_1,就放到侧输出流stock_1中
                    // output方法有两个参数,第1个参数是标签,第2个参数是数据本身
                    ctx.output(stock1Tag, value);
                } else if ("stock_2".equals(stockId)) {
                    // 如果ID是stock_2,就放到侧输出流stock_2中
                    ctx.output(stock2Tag, value);
                } else {
                    // 对于其他ID,仍然放在主流中
                    out.collect(value);
                }
            }
        });
        // 从主流中根据标签获取侧输出流
        SideOutputDataStream<StockPrice> sideOutputStock1 =
                processResult.getSideOutput(stock1Tag);
        // 从主流中根据标签获取侧输出流
        SideOutputDataStream<StockPrice> sideOutputStock2 =
                processResult.getSideOutput(stock2Tag);
        // 打印主流
        processResult.print("主流");
        // 打印侧输出流stock_1
        sideOutputStock1.print("侧输出流stock_1");
        // 打印侧输出流stock_2
        sideOutputStock2.print("侧输出流stock_2");
        env.execute();
    }
}

代码文件UnionDataStreamDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class UnionDataStreamDemo {
    public UnionDataStreamDemo(){}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> sourceDS1 = env.fromElements(1,3,5,7,9);
        DataStreamSource<Integer> sourceDS2 = env.fromElements(2,4,6,8,10);
        DataStreamSource<Integer> sourceDS3 = env.fromElements(11,12,13,14,15);
        DataStream<Integer> unionResult = sourceDS1.union(sourceDS2).union(sourceDS3);
        unionResult.print();
        env.execute();
    }
}

代码文件ConnectDataStreamDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

public class ConnectDataStreamDemo {
    public ConnectDataStreamDemo(){}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> sourceDS1 = env.fromElements(1,2,3);
        DataStreamSource<String> sourceDS2 = env.fromElements("a","b","c");
        ConnectedStreams<Integer, String> connectResult = sourceDS1.connect(sourceDS2);
        // CoMapFunction有3个参数,第1个参数表述第1个流的数据类型
        // 第2个参数表示第2个流的数据类型,第3个参数表示输出数据的类型
        SingleOutputStreamOperator<String> result =
                connectResult.map(new CoMapFunction<Integer, String, String>() {
                    // 针对第1个流的处理逻辑
                    @Override
                    public String map1(Integer value) throws Exception {
                        return value.toString();
                    }

                    // 针对第2个流的处理逻辑
                    @Override
                    public String map2(String value) throws Exception {
                        return value;
                    }
                });
        result.print();
        env.execute();
    }
}

代码文件ConnectDataStreamExample.java

package cn.edu.xmu;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ConnectDataStreamExample {
    public ConnectDataStreamDemo(){}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Tuple2<Integer, String>> sourceDS1 = env.fromElements(
                Tuple2.of(1,"xiaoming"),
                Tuple2.of(2,"xiaowang")
        );
        DataStreamSource<Tuple3<Integer, String, Integer>> sourceDS2 = env.fromElements(
                Tuple3.of(1,"math",98),
                Tuple3.of(1,"english",97),
                Tuple3.of(2,"math",94),
                Tuple3.of(2,"english",88)
        );
        ConnectedStreams<Tuple2<Integer, String>,Tuple3<Integer, String, Integer>> connectResult = sourceDS1.connect(sourceDS2);
        /**
         * 实现根据学号字段相互匹配的效果
         * 对于两个流,谁的数据先到达是不一定的
         * 对于每个流,当有数据到达时,就保存到一个变量中
         * 变量采用HashMap格式,key是学号,value是一个List,List里面保存了流的数据
         * 每个流有数据到达的时候,一方面要保存到变量中,另一方面要去另一个流查找是否有发生匹配的数据
         */
        SingleOutputStreamOperator<String> processResult = connectResult.process(
                // CoProcessFunction有3个参数,第1个参数表示第1个数据流的数据类型,第2个参数表示第2个数据流的数据类型,第3个表示输出的数据类型
                new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
                    // 定义HashMap,用来保存到达的流数据
                    Map<Integer, List<Tuple2<Integer, String>>> ds1Cache = new HashMap<>();
                    Map<Integer, List<Tuple3<Integer, String, Integer>>> ds2Cache = new HashMap<>();

                    /**
                     * 第1个流的梳理逻辑
                     * @param value 第1个流的数据
                     * @param ctx 上下文
                     * @param out 采集器
                     */
                    @Override
                    public void processElement1(Tuple2<Integer, String> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
                        Integer sno = value.f0;
                        // 第1个流的数据到达,保存到变量中
                        if (!ds1Cache.containsKey(sno)) {
                            // 如果key不存在,就把这条数据当成第1条数据存进去
                            List<Tuple2<Integer, String>> ds1Values = new ArrayList<>();
                            ds1Values.add(value);
                            ds1Cache.put(sno, ds1Values);
                        } else {
                            // 如果key存在,表示不是第1条数据,直接添加到List中
                            ds1Cache.get(sno).add(value);
                        }
                        // 去ds2Cache查找是否存在匹配的学号,匹配上就输出,没有匹配上就不输出
                        if (ds2Cache.containsKey(sno)) {
                            for (Tuple3<Integer, String, Integer> ds2Element : ds2Cache.get(sno)) {                                
                                out.collect(Tuple4.of(value.f0,value.f1,ds2Element.f1,ds2Element.f2).toString());
                            }
                        }
                    }

                    /**
                     * 第2个流的梳理逻辑
                     * @param value 第2流的数据
                     * @param ctx 上下文
                     * @param out 采集器
                     */
                    @Override
                    public void processElement2(Tuple3<Integer, String, Integer> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
                        Integer sno = value.f0;
                        // 第2个流的数据到达,保存到变量中
                        if (!ds2Cache.containsKey(sno)) {
                            // 如果key不存在,就把这条数据当成第1条数据存进去
                            List<Tuple3<Integer, String, Integer>> ds2Values = new ArrayList<>();
                            ds2Values.add(value);
                            ds2Cache.put(sno, ds2Values);
                        } else {
                            // 如果key存在,表示不是第1条数据,直接添加到List中
                            ds2Cache.get(sno).add(value);
                        }
                        // 去ds1Cache查找是否存在匹配的学号,匹配上就输出,没有匹配上就不输出
                        if (ds1Cache.containsKey(sno)) {
                            for (Tuple2<Integer, String> ds1Element : ds1Cache.get(sno)) {                                
                                out.collect(Tuple4.of(ds1Element.f0,ds1Element.f1,value.f1,value.f2).toString());
                            }
                        }
                    }
                }
        );
        processResult.print();
        env.execute();
    }
}

如果要使用FileSink,需要在4.2.3节中的pom.xml文件的基础上,再增加如下依赖:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
</dependency>

代码文件FileSinkDemo1.java

package cn.edu.xmu;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FileSinkDemo1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7D),
                new StockPrice("stock_2", 10002L, 4.5D),
                new StockPrice("stock_3", 10003L, 8.2D)
        );
        FileSink<StockPrice> fileSink =
                FileSink.<StockPrice>forRowFormat(
                        new Path("file:///home/hadoop"),
                        new SimpleStringEncoder<>("UTF-8"))
                        .build();
        stockPriceDS.sinkTo(fileSink);
        env.execute();
    }
}

如果要写入HDFS文件,只需要把上面代码中的new Path("file:///home/hadoop")修改为“new Path("hdfs://hadoop01:9000/")”,并在pom.xml中增加如下两个依赖:

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.5</version>
</dependency>

代码文件KafkaSinkDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KafkaSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop01:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                            .setTopic("sinkKafka")
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build()
                ).build();
        source.sinkTo(kafkaSink);
        env.execute();
    }
}
cd  /usr/local/kafka
./bin/zookeeper-server-start.sh  config/zookeeper.properties
cd  /usr/local/kafka
./bin/kafka-server-start.sh  config/server.properties
cd  /usr/local/kafka
./bin/kafka-topics.sh  --create  --zookeeper  hadoop01:2181  \
> --replication-factor  1  --partitions  1  --topic  sinkKafka
nc -lk 9999
cd /usr/local/kafka
./bin/kafka-console-consumer.sh  --bootstrap-server  hadoop01:9092  \
> --topic  sinkKafka --from-beginning

要想让Flink输出数据到MySQL数据库,需要在pom.xml中添加如下两个依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc</artifactId>
      <version>3.1.1-1.17</version>      
</dependency>

在MySQL Shell中执行如下命令创建数据库和表:

mysql> create database flinkdb;
mysql> use flinkdb;
mysql> create table stockprice(stockId char(10),timeStamp bigint(20),price double(6,2));

代码文件MySQLSinkDemo.java

package cn.edu.xmu;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MySQLSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<StockPrice> stockPriceDS = env.fromElements(
                new StockPrice("stock_1", 10001L, 9.7),
                new StockPrice("stock_2", 10002L, 4.5),
                new StockPrice("stock_3", 10003L, 8.2)
        );
        SinkFunction<StockPrice> jdbcSink = JdbcSink.sink(
                "insert into stockprice (stockId, timeStamp, price) values (?, ?, ?)",
                new JdbcStatementBuilder<StockPrice>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, StockPrice stockPrice) throws SQLException {
                        preparedStatement.setString(1, stockPrice.stockId);
                        preparedStatement.setLong(2, stockPrice.timeStamp);
                        preparedStatement.setDouble(3, stockPrice.price);
                    }
                }
                ,
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/flinkdb")
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()
        );
        stockPriceDS.addSink(jdbcSink);
        env.execute();
    }
}

代码文件ReduceWindowFunctionDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;

public class ReduceWindowFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataGeneratorSource<StockPrice> dataGeneratorSource = new DataGeneratorSource<>(
                // 指定GeneratorFunction 实现类
                new MyGeneratorFunction(),
                // 指定输出数据的总行数
                10,
                // 指定每秒发射的记录数
                RateLimiterStrategy.perSecond(1),
                // 指定返回值类型
                TypeInformation.of(StockPrice.class) // 将Java的StockPrice封装成到TypeInformation
        );

        // 读取dataGeneratorSource中的数据
        DataStreamSource<StockPrice> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource
                , WatermarkStrategy.noWatermarks()  //指定水位线生成策略
                , "dataGeneratorSource");
        KeyedStream<StockPrice, String> stockPriceKS =
                dataGeneratorSourceStream.keyBy(stockPrice->stockPrice.getStockId());
        WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
                stockPriceKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<StockPrice> reducedDS = 
                stockPirceWS.reduce((s1, s2) -> new StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price));
        reducedDS.print();
        env.execute();
    }
}

代码文件AggregateWindowFunctionDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class AggregateWindowFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataGeneratorSource<StockPrice> dataGeneratorSource = new DataGeneratorSource<>(
                // 指定GeneratorFunction 实现类
                new MyGeneratorFunction(),
                // 指定输出数据的总行数
                10,
                // 指定每秒发射的记录数
                RateLimiterStrategy.perSecond(1),
                // 指定返回值类型
                TypeInformation.of(StockPrice.class) // 将Java的StockPrice封装成到TypeInformation
        );

        // 读取dataGeneratorSource中的数据
        DataStreamSource<StockPrice> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource
                , WatermarkStrategy.noWatermarks()  //指定水位线生成策略
                , "dataGeneratorSource");
        KeyedStream<StockPrice, String> stockPriceKS =
                dataGeneratorSourceStream.keyBy(stockPrice->stockPrice.getStockId());
        WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
                stockPriceKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<Tuple2<String, Double>> aggregatedDS = stockPirceWS.aggregate(new MyAggregateFunction());
        aggregatedDS.print();
        env.execute();
    }
    public static class MyAggregateFunction implements AggregateFunction<StockPrice, Tuple3<String,Double,Long>, Tuple2<String,Double>>{
        @Override
        //创建累加器
        public Tuple3<String, Double, Long> createAccumulator() {
            return Tuple3.of("",0D,0L);
        }

        @Override
        //定义把输入数据累加到累加器的逻辑
        public Tuple3<String, Double, Long> add(StockPrice value, Tuple3<String, Double, Long> accumulator) {
            return Tuple3.of(value.stockId,accumulator.f1+value.price,accumulator.f2+1L);
        }

        @Override
        //根据累加器得出结果
        public Tuple2<String, Double> getResult(Tuple3<String, Double, Long> accumulator) {
            return  Tuple2.of(accumulator.f0,accumulator.f1 / accumulator.f2);
        }

        @Override
        //定义累加器合并的逻辑
        public Tuple3<String, Double, Long> merge(Tuple3<String, Double, Long> a, Tuple3<String, Double, Long> b) {
            return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
        }
    }
}

代码文件ProcessWindowFunctionDemo.java

package cn.edu.xmu;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class ProcessWindowFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataGeneratorSource<StockPrice> dataGeneratorSource = new DataGeneratorSource<>(
                // 指定GeneratorFunction 实现类
                new MyGeneratorFunction(),
                // 指定输出数据的总行数
                10,
                // 指定每秒发射的记录数
                RateLimiterStrategy.perSecond(1),
                // 指定返回值类型
                TypeInformation.of(StockPrice.class) // 将Java的StockPrice封装成到TypeInformation
        );

        // 读取dataGeneratorSource中的数据
        DataStreamSource<StockPrice> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource
                , WatermarkStrategy.noWatermarks()  //指定水位线生成策略
                , "dataGeneratorSource");
        KeyedStream<StockPrice, String> stockPriceKS =
                dataGeneratorSourceStream.keyBy(stockPrice->stockPrice.getStockId());
        WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
                stockPriceKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<String> processDS =
                stockPirceWS.process(new MyProcessWindowFunction());
        processDS.print();
        env.execute();
    }
    public static class MyProcessWindowFunction extends ProcessWindowFunction<StockPrice, String, String, TimeWindow> {
        @Override
        //一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
        public void process(String s, ProcessWindowFunction<StockPrice, String, String, TimeWindow>.Context context, Iterable<StockPrice> elements, Collector<String> out) throws Exception {
            //聚合,注意:整个窗口的数据保存到Iterable,里面有很多条数据
            AtomicInteger count= new AtomicInteger();
            AtomicReference<Double> sumPrice= new AtomicReference<>(0.0);
            elements.forEach(stock -> {
                    sumPrice.set(sumPrice.get() + stock.price);
                    count.set(count.get() + 1);
            });
            long startTS = context.window().getStart();
            long endTS = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTS,"yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTS,"yyyy-MM-dd HH:mm:ss.SSS");
            out.collect("Windows start:"+windowStart+"Windows end:"+windowEnd+"stockId:"+s+",price:"+String.valueOf(sumPrice.get()/count.get()));
        }
    }
}

代码文件AggregateAndProcessWindowFunctionDemo.java

package cn.edu.xmu;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class AggregateAndProcessWindowFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataGeneratorSource<StockPrice> dataGeneratorSource = new DataGeneratorSource<>(
                // 指定GeneratorFunction 实现类
                new MyGeneratorFunction(),
                // 指定输出数据的总行数
                10,
                // 指定每秒发射的记录数
                RateLimiterStrategy.perSecond(1),
                // 指定返回值类型
                TypeInformation.of(StockPrice.class) // 将Java的StockPrice封装成到TypeInformation
        );

        // 读取dataGeneratorSource中的数据
        DataStreamSource<StockPrice> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource
                , WatermarkStrategy.noWatermarks()  //指定水位线生成策略
                , "dataGeneratorSource");
        dataGeneratorSourceStream.print();
        KeyedStream<StockPrice, String> stockPriceKS =
                dataGeneratorSourceStream.keyBy(stockPrice->stockPrice.getStockId());
        WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
                stockPriceKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<String> aggregatedDS =
                stockPirceWS.aggregate(new MyAggregateFunction(),new MyProcessWindowFunction());
        aggregatedDS.print();
        env.execute();
    }
    public static class MyAggregateFunction implements AggregateFunction<StockPrice, Tuple3<String,Double,Long>, Tuple2<String,Double>> {
        @Override
        //创建累加器
        public Tuple3<String, Double, Long> createAccumulator() {
            return Tuple3.of("",0D,0L);
        }

        @Override
        //定义把输入数据累加到累加器的逻辑
        public Tuple3<String, Double, Long> add(StockPrice value, Tuple3<String, Double, Long> accumulator) {
            return Tuple3.of(value.stockId,accumulator.f1+value.price,accumulator.f2+1L);
        }

        @Override
        //根据累加器得出结果
        public Tuple2<String, Double> getResult(Tuple3<String, Double, Long> accumulator) {
            return  Tuple2.of(accumulator.f0,accumulator.f1 / accumulator.f2);
        }

        @Override
        //定义累加器合并的逻辑
        public Tuple3<String, Double, Long> merge(Tuple3<String, Double, Long> a, Tuple3<String, Double, Long> b) {
            return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
        }
    }
    public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String,Double>, String, String, TimeWindow> {

        @Override
        public void process(String s, ProcessWindowFunction<Tuple2<String, Double>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Double>> elements, Collector<String> out) throws Exception {
            //聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据
            AtomicInteger count= new AtomicInteger();
            AtomicReference<Double> sumPrice= new AtomicReference<>(0.0);
            elements.forEach(tuple -> {
                sumPrice.set(sumPrice.get() + tuple.f1);
                count.set(count.get() + 1);
            });
            long startTS = context.window().getStart();
            long endTS = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTS,"yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTS,"yyyy-MM-dd HH:mm:ss.SSS");
            out.collect("Windows start:"+windowStart+"Windows end:"+windowEnd+"stockId:"+s+",price:"+String.valueOf(sumPrice.get()/count.get()));
        }
    }
}

代码文件TriggerDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

public class TriggerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> sourceDS = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<Integer> mapDS = sourceDS.map(value -> Integer.parseInt(value));
        //定义一个大小为5的滚动计数窗口
        AllWindowedStream<Integer, GlobalWindow> allWindowedDS = mapDS.windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5)));
        //求窗口内的最小值
        SingleOutputStreamOperator<Integer> minDS = allWindowedDS.min(0);
        minDS.print();
        env.execute();
    }
}

代码文件MyTrigger.java

package cn.edu.xmu;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class MyTrigger extends Trigger<StockPrice, TimeWindow>{
    private static final String DATA_COUNT_STATE_NAME = "dataCountState";
    // 窗口最大数据条数
    private int maxCount;
    // 用于储存窗口当前数据条数的状态对象
    private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(DATA_COUNT_STATE_NAME, new ReduceFunction<Long>() {
        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }, LongSerializer.INSTANCE);
    public MyTrigger(int maxCount) {
        this.maxCount = maxCount;
    }
    // 触发计算,计算结束后清空窗口内的元素
    private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
        clear(window, ctx);
        return TriggerResult.FIRE_AND_PURGE;
    }
    // 进入窗口的每个元素都会调用该方法
    @Override
    public TriggerResult onElement(StockPrice element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
        countState.add(1L);
        if (countState.get() >= maxCount) {
            return fireAndPurge(window, ctx);
        }else{
            return TriggerResult.CONTINUE;
        }
    }
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return fireAndPurge(window, ctx);
    }
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return null;
    }
    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(countStateDescriptor).clear();
    }
    // 初始化触发器
    public static MyTrigger creat(int maxCount) {
        return new MyTrigger(maxCount);
    }
}

代码文件TriggerDemo1.java

package cn.edu.xmu;

import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TriggerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        KeyedStream<StockPrice, String> stockPriceKS =
                stockPriceDS.keyBy(stockPrice->stockPrice.getStockId());

        SingleOutputStreamOperator<StockPrice> resultDS = stockPriceKS
                .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
                .trigger(new MyTrigger(5))
                .reduce((s1, s2) -> new StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price));
        resultDS.print();
        env.execute();
    }
}
nc -lk 9999
s1,1,1
s1,2,2
s1,3,3
s1,4,4
s1,5,5

代码文件EvictorDemo.java

package cn.edu.xmu;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class EvictorDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        KeyedStream<StockPrice, String> stockPriceKS =
                stockPriceDS.keyBy(stockPrice->stockPrice.getStockId());
        WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
                stockPriceKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> processDS =
                stockPirceWS
                        .evictor(new MyEvictor())
                        .process(new MyProcessWindowFunction());
        processDS.print();
        env.execute();
    }
    public static class MyProcessWindowFunction extends ProcessWindowFunction<StockPrice, String, String, TimeWindow> {
        @Override
        //一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
        public void process(String s, ProcessWindowFunction<StockPrice, String, String, TimeWindow>.Context context, Iterable<StockPrice> elements, Collector<String> out) throws Exception {
            //聚合,注意:整个窗口的数据保存到Iterable,里面有很多条数据
            AtomicInteger count= new AtomicInteger();
            AtomicReference<Double> sumPrice= new AtomicReference<>(0.0);
            elements.forEach(stock -> {
                sumPrice.set(sumPrice.get() + stock.price);
                count.set(count.get() + 1);
            });
            long startTS = context.window().getStart();
            long endTS = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTS,"yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTS,"yyyy-MM-dd HH:mm:ss.SSS");
            out.collect("Windows start:"+windowStart+"Windows end:"+windowEnd+"stockId:"+s+",price:"+String.valueOf(sumPrice.get()/count.get()));
        }
    }
    public static class MyEvictor implements Evictor<StockPrice,TimeWindow>{
        @Override
        public void evictBefore(Iterable<TimestampedValue<StockPrice>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
            Iterator<TimestampedValue<StockPrice>> ite = elements.iterator();
            while (ite.hasNext()) {
                TimestampedValue<StockPrice> element = ite.next();
                System.out.println("驱逐器获取到的股票价格:" + element.getValue().price);
                //模拟去掉非法参数数据
                if (element.getValue().price <= 0) {
                    System.out.println("股票价格小于0,删除该记录");
                    ite.remove();
                }
            }
        }
        @Override
        public void evictAfter(Iterable<TimestampedValue<StockPrice>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
            //不做任何操作
        }
    }
}

在Linux终端中启动系统自带的NC程序,再启动EvictorTest程序,然后,在NC窗口内输入如下数据(需要逐行输入,每输入一行就回车):

stock_1,1602031567000,8
stock_1,1602031568000,-4

代码文件WatermarkDemo1.java

package cn.edu.xmu;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class WatermarkDemo1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        WatermarkStrategy<StockPrice> watermarkStrategy = WatermarkStrategy
                .<StockPrice>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<StockPrice>() {
                    @Override
                    public long extractTimestamp(StockPrice element, long recordTimestamp) {
//从输入数据中提取时间戳作为水位线,转换成毫秒
                        return element.getTimeStamp() * 1000L; 
                    }
                });

        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(watermarkStrategy);
        KeyedStream<StockPrice, String> stockPriceKS =
                stockPriceDSWithWatermark.keyBy(stockPrice->stockPrice.getStockId());
        WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
                // 使用事件时间语义窗口TumblingEventTimeWindows
                stockPriceKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> processDS =
                stockPirceWS.process(new MyProcessWindowFunction());
        processDS.print();
        env.execute();
    }
    public static class MyProcessWindowFunction extends ProcessWindowFunction<StockPrice, String, String, TimeWindow> {
        @Override
        //一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
        public void process(String s, ProcessWindowFunction<StockPrice, String, String, TimeWindow>.Context context, Iterable<StockPrice> elements, Collector<String> out) throws Exception {
            //聚合,注意:整个窗口的数据保存到Iterable,里面有很多条数据
            AtomicInteger count= new AtomicInteger();
            AtomicReference<Double> sumPrice= new AtomicReference<>(0.0);
            elements.forEach(stock -> {
                sumPrice.set(sumPrice.get() + stock.price);
                count.set(count.get() + 1);
            });
            long startTS = context.window().getStart();
            long endTS = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTS,"yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTS,"yyyy-MM-dd HH:mm:ss.SSS");
            out.collect("Windows start:"+windowStart+"Windows end:"+windowEnd+"stockId:"+s+",price:"+String.valueOf(sumPrice.get()/count.get()));
        }
    }
}

在Linux中启动一个NC程序,在IDEA中运行上面程序,然后,在NC窗口内依次输入如下数据:

s1,1,1
s1,2,2
s1,3,3
s1,7,7
s1,9,9
s1,10,10

代码文件WatermarkDemo2.java

package cn.edu.xmu;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class WatermarkDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        WatermarkStrategy<StockPrice> watermarkStrategy = WatermarkStrategy
                .<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<StockPrice>() {
                    @Override
                    public long extractTimestamp(StockPrice element, long recordTimestamp) {
                        return element.getTimeStamp() * 1000L; //转换成毫秒
                    }
                });
        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(watermarkStrategy);
        KeyedStream<StockPrice, String> stockPriceKS =
                stockPriceDSWithWatermark.keyBy(stockPrice->stockPrice.getStockId());
        WindowedStream<StockPrice, String, TimeWindow> stockPirceWS =
                //use event time
                stockPriceKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> processDS =
                stockPirceWS.process(new MyProcessWindowFunction());
        processDS.print();
        env.execute();
    }
    public static class MyProcessWindowFunction extends ProcessWindowFunction<StockPrice, String, String, TimeWindow> {
        @Override
        //一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
        public void process(String s, ProcessWindowFunction<StockPrice, String, String, TimeWindow>.Context context, Iterable<StockPrice> elements, Collector<String> out) throws Exception {
            //聚合,注意:整个窗口的数据保存到Iterable,里面有很多条数据
            AtomicInteger count= new AtomicInteger();
            AtomicReference<Double> sumPrice= new AtomicReference<>(0.0);
            elements.forEach(stock -> {
                sumPrice.set(sumPrice.get() + stock.price);
                count.set(count.get() + 1);
            });
            long startTS = context.window().getStart();
            long endTS = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTS,"yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTS,"yyyy-MM-dd HH:mm:ss.SSS");
            out.collect("Windows start:"+windowStart+"Windows end:"+windowEnd+"stockId:"+s+",price:"+String.valueOf(sumPrice.get()/count.get()));
        }
    }
}

在Linux中启动一个NC程序,在IDEA中运行上面程序,然后,在NC窗口内依次输入如下数据:

s1,1,1
s1,2,2
s1,5,5
s1,3,3
s1,9,9
s1,2,2
s1,10,10
s1,3,3
s1,11,11
s1,13,13

代码文件PeriodicWatermarkDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class PeriodicWatermarkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(2000); // 设置周期性水位线生成时间间隔为2000毫秒
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        WatermarkStrategy<StockPrice> watermarkStrategy =
                WatermarkStrategy.<StockPrice>forGenerator(new WatermarkGeneratorSupplier<StockPrice>() {
                    @Override
                    public WatermarkGenerator<StockPrice> createWatermarkGenerator(Context context) {
                        return new MyPeriodicWatermarkGenerator<>(2000L); // 2000毫秒
                    }
                }).withTimestampAssigner(
                        (element, recordTimestamp) -> element.getTimeStamp()*1000L
                );
        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark =
                stockPriceDS.assignTimestampsAndWatermarks(watermarkStrategy);
        KeyedStream<StockPrice, String> stockPriceKS =
                stockPriceDSWithWatermark.keyBy(stockPrice->stockPrice.getStockId());
        WindowedStream<StockPrice, String, TimeWindow> stockPriceWS =
                stockPriceKS.window(TumblingEventTimeWindows.of(Time.seconds(3)));
        SingleOutputStreamOperator<StockPrice> reducedDS = stockPriceWS.reduce((s1, s2) -> new StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price));
        reducedDS.print();
        env.execute();
    }
    public static class MyPeriodicWatermarkGenerator<StockPrice> implements WatermarkGenerator<StockPrice>{
        // 乱序等待时间
        private long delayTime;
        // 用来保存到目前为止的最大事件时间
        private long maxTimestamp;

        public MyPeriodicWatermarkGenerator(long delayTs){
            this.delayTime = delayTs;
            this.maxTimestamp = Long.MIN_VALUE + this.delayTime + 1;
        }

        /**
         * 每条数据到达都会调用一次onEvent方法,用来提取最大事件时间,保存下来
         * @param event 当前到达的数据
         * @param eventTimestamp 提取到的事件时间
         * @param output
         */
        @Override
        public void onEvent(StockPrice event, long eventTimestamp, WatermarkOutput output) {
            // 已经保存的时间戳和当前提取到的时间戳进行比较,取二者较大值
            maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
            System.out.println("调用onEvent方法,获取到目前为止的最大时间戳=" + maxTimestamp);
        }

        /**
         * 周期性调用:发射Watermark
         * @param output
         */
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(maxTimestamp - delayTime - 1)); //1表示1毫秒
            System.out.println("调用onPeriodicEmit方法,生成Watermark=" + (maxTimestamp - delayTime - 1));

        }
    }
}

代码文件PunctuatedWatermarkDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.text.Format;
import java.text.SimpleDateFormat;

public class PunctuatedWatermarkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(new MyWatermarkStrategy());

        KeyedStream<StockPrice, String> stockPriceKS =
                stockPriceDSWithWatermark.keyBy(stockPrice->stockPrice.getStockId());
        WindowedStream<StockPrice, String, TimeWindow> stockPriceWS =
                stockPriceKS.window(TumblingEventTimeWindows.of(Time.seconds(3)));
        SingleOutputStreamOperator<StockPrice> reducedDS = stockPriceWS.reduce((s1, s2) -> new StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price));
        reducedDS.print();
        env.execute();
    }
    public static class MyWatermarkStrategy implements WatermarkStrategy<StockPrice>{

        @Override
        public WatermarkGenerator<StockPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<StockPrice>(){
                long maxOutOfOrderness = 10000L; //设定最大延迟为10秒
                long currentMaxTimestamp = 0L;
                Watermark a = null;
                Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                @Override
                public void onEvent(StockPrice event, long eventTimestamp, WatermarkOutput output) {
                    currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp);
                    a = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                    output.emitWatermark(a);
                    System.out.println("timestamp:" + event.stockId + "," + event.timeStamp + "|" + format.format(event.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString());
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    // 没有使用周期性发送水印,因此这里没有执行任何操作
                }
                };
            }

        @Override
        public TimestampAssigner<StockPrice> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<StockPrice>() {
                @Override
                public long extractTimestamp(StockPrice element, long recordTimestamp) {
                    return element.timeStamp; //从到达消息中提取时间戳;
                }
            };
        }
    }
}
nc  -lk  9999

代码文件LateDataDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
import java.text.Format;
import java.text.SimpleDateFormat;

public class LateDataDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark =
                stockPriceDS.assignTimestampsAndWatermarks(new MyWatermarkStrategy());
        KeyedStream<StockPrice, String> stockPriceKS =
                stockPriceDSWithWatermark.keyBy(stockPrice->stockPrice.getStockId());
        OutputTag<StockPrice> lateData = new OutputTag<>("late", Types.POJO(StockPrice.class));
        WindowedStream<StockPrice, String, TimeWindow> stockPriceWS =
                stockPriceKS
                        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                        .allowedLateness(Time.seconds(2L))
                        .sideOutputLateData(lateData);
        SingleOutputStreamOperator<StockPrice> reducedDS = stockPriceWS.reduce((s1, s2) -> new StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price));
        reducedDS.print("window计算结果:");
        SideOutputDataStream<StockPrice> lateDS = reducedDS.getSideOutput(lateData);
        lateDS.print("迟到的数据:");
        env.execute();
    }
    public static class MyWatermarkStrategy implements WatermarkStrategy<StockPrice>{

        @Override
        public WatermarkGenerator<StockPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<StockPrice>(){
                long maxOutOfOrderness = 10000L; //设定最大延迟为10秒
                long currentMaxTimestamp = 0L;
                Watermark a = null;
                Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                @Override
                public void onEvent(StockPrice event, long eventTimestamp, WatermarkOutput output) {
                    currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp);
                    a = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                    output.emitWatermark(a);
                    System.out.println("timestamp:" + event.stockId + "," + event.timeStamp + "|" + format.format(event.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString());
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    // 没有使用周期性发送水印,因此这里没有执行任何操作
                }
                };
            }

        @Override
        public TimestampAssigner<StockPrice> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<StockPrice>() {
                @Override
                public long extractTimestamp(StockPrice element, long recordTimestamp) {
                    return element.timeStamp; //从到达消息中提取时间戳;
                }
            };
        }
    }
}

在Linux终端中启动NC程序,然后在IDEA中启动程序WatermarkDemo4.java,再在NC终端中输入如下数据(逐行输入):

stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32

代码文件WindowJoinDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WindowJoinDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Tuple2<String, Integer>> sourceDS1 = env.fromElements(
                Tuple2.of("b", 1), // 第1个元素是key,第2个元素是时间戳(秒数)
                Tuple2.of("b", 2),
                Tuple2.of("c", 3),
                Tuple2.of("d", 4)
        );
        SingleOutputStreamOperator<Tuple2<String, Integer>> sourceDS1WithWatermark = sourceDS1.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Tuple2<String, Integer>>forMonotonousTimestamps()
                        .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
        );

        DataStreamSource<Tuple3<String, Integer, Integer>> sourceDS2 = env.fromElements(
                Tuple3.of("b", 1, 1), // 第1个元素是key,第2个元素是时间戳(秒数)
                Tuple3.of("b", 12, 1),
                Tuple3.of("c", 2, 1),
                Tuple3.of("d", 14, 1)
        );
        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> sourceDS2WithWatermark = sourceDS2.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
                        .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
        );
        // 执行Window Join
        DataStream<String> joinResult = sourceDS1WithWatermark.join(sourceDS2WithWatermark)
                .where(value1 -> value1.f0) //sourceDS1WithWatermark的keyBy
                .equalTo(value2 -> value2.f0) //sourceDS2WithWatermark的keyBy
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     * 关联上的数据会调用join方法
                     * @param first 来自sourceDS1WithWatermark的数据
                     * @param second 来自sourceDS2WithWatermark的数据
                     */
                    @Override
                    public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
                        return first + "<------>" + second;
                    }
                });
        joinResult.print();
        env.execute();
    }
}

代码文件IntervalJoinDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class IntervalJoinDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Tuple2<String, Integer>> sourceDS1 = env.fromElements(
                Tuple2.of("b", 1), // 第1个元素是key,第2个元素是时间戳(秒数)
                Tuple2.of("b", 2),
                Tuple2.of("c", 3),
                Tuple2.of("d", 4)
        );
        SingleOutputStreamOperator<Tuple2<String, Integer>> sourceDS1WithWatermark = sourceDS1.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Tuple2<String, Integer>>forMonotonousTimestamps()
                        .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
        );

        DataStreamSource<Tuple3<String, Integer, Integer>> sourceDS2 = env.fromElements(
                Tuple3.of("b", 1, 1), // 第1个元素是key,第2个元素是时间戳(秒数)
                Tuple3.of("b", 12, 1),
                Tuple3.of("c", 2, 1),
                Tuple3.of("d", 14, 1)
        );
        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> sourceDS2WithWatermark = sourceDS2.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
                        .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
        );
        // 对两个流分别做keyBy操作
        KeyedStream<Tuple2<String, Integer>, String> keyedStream1 = sourceDS1WithWatermark.keyBy(value1 -> value1.f0);
        KeyedStream<Tuple3<String, Integer, Integer>, String> keyedStream2 = sourceDS2WithWatermark.keyBy(value2 -> value2.f0);
        // 调用Interval Join
        SingleOutputStreamOperator<String> processResult = keyedStream1.intervalJoin(keyedStream2)
                .between(Time.seconds(-2), Time.seconds(2)) //设置下界和上界
                .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     *只有当两个流的数据匹配上,才会调用processElement方法
                     * @param left 第1个流keyedStream1
                     * @param right 第2个流keyedStream2
                     * @param ctx 上下文
                     * @param out 采集器
                     */
                    @Override
                    public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
                        out.collect(left + "可以和" + right + "发生连接");
                    }
                });

        processResult.print();
        env.execute();
    }
}

代码文件IntervalJoinWithLateDataDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;

public class IntervalJoinWithLateDataDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> sourceDS1 = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS1 = sourceDS1.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] splitResult = value.split(",");
                return Tuple2.of(splitResult[0], Integer.parseInt(splitResult[1]));
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS1WithWatermark = mapDS1.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
        );
        DataStreamSource<String> sourceDS2 = env.socketTextStream("hadoop01", 8888);
        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> mapDS2 = sourceDS2.map(new MapFunction<String, Tuple3<String, Integer, Integer>>() {
            @Override
            public Tuple3<String, Integer, Integer> map(String value) throws Exception {
                String[] splitResult = value.split(",");
                return Tuple3.of(splitResult[0], Integer.parseInt(splitResult[1]), Integer.parseInt(splitResult[2]));
            }
        });
        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> mapDS2WithWatermark = mapDS2.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
        );
        // 对两个流分别做keyBy操作
        KeyedStream<Tuple2<String, Integer>, String> keyedStream1 = mapDS1WithWatermark.keyBy(value1 -> value1.f0);
        KeyedStream<Tuple3<String, Integer, Integer>, String> keyedStream2 = mapDS2WithWatermark.keyBy(value2 -> value2.f0);
        // 调用Interval Join
        OutputTag<Tuple2<String, Integer>> keyedStream1LateTag = new OutputTag<>("keyedStream1-late", Types.TUPLE(Types.STRING, Types.INT));
        OutputTag<Tuple3<String, Integer, Integer>> keyedStream2LateTag = new OutputTag<>("keyedStream2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));

        SingleOutputStreamOperator<String> processResult = keyedStream1.intervalJoin(keyedStream2)
                .between(Time.seconds(-2), Time.seconds(2))
                .sideOutputLeftLateData(keyedStream1LateTag)
                .sideOutputRightLateData(keyedStream2LateTag)
                .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     *只有当两个流的数据匹配上,才会调用processElement方法
                     * @param left 第1个流keyedStream1
                     * @param right 第2个流keyedStream2
                     * @param ctx 上下文
                     * @param out 采集器
                     */
                    @Override
                    public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
                        out.collect(left + "可以和" + right + "发生连接");
                    }
                });
        processResult.print("主流");
        processResult.getSideOutput(keyedStream1LateTag).printToErr("keyedStream1迟到的数据");
        processResult.getSideOutput(keyedStream2LateTag).printToErr("keyedStream2迟到的数据");
        env.execute();
    }
}
nc -lk 9999
nc -lk 8888

程序KeyedValueStateDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;

public class KeyedValueStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp() * 1000L)
        );
        SingleOutputStreamOperator<String> resultDS = stockPriceDSWithWatermark.keyBy(stockPrice -> stockPrice.getStockId())
                .process(new KeyedProcessFunction<String, StockPrice, String>() {
                    // 定义状态
                    ValueState<Double> lastPriceState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // 初始化状态
                        // ValueStateDescriptor有两个参数,第1个参数是名字,第2个参数是存储的数据类型
                        lastPriceState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastPriceState", Types.DOUBLE));
                    }

                    @Override
                    public void processElement(StockPrice value, KeyedProcessFunction<String, StockPrice, String>.Context ctx, Collector<String> out) throws Exception {
                        // 取出上一条数据的股票价格
                        double lastPrice = 
lastPriceState.value()==null?0.0:lastPriceState.value();
                        double currentPrice = value.getPrice();
                        // 求出两次价格的差值,判断是否超过5
                        if (Math.abs(currentPrice - lastPrice) > 5) {
                            out.collect("股票ID是" + value.getStockId() + ",当前股票价格=" + currentPrice + ",上一次股票价格=" + lastPrice + ",二者差值大于5");
                        }
                        // 把状态里面的股票价格更新为当前这条数据的股票价格
                        lastPriceState.update(currentPrice);
                    }
                });
        resultDS.print();
        env.execute();
    }
}

程序KeyedListStateDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class KeyedListStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp() * 1000L)
        );
        SingleOutputStreamOperator<String> resultDS = stockPriceDSWithWatermark.keyBy(stockPrice -> stockPrice.getStockId())
                .process(new KeyedProcessFunction<String, StockPrice, String>() {
                             ListState<Double> stockPirceListState;
                             @Override
                             public void open(Configuration parameters) throws Exception {
                                 super.open(parameters);
                                 // 初始化状态
                                 // ListStateDescriptor有两个参数,第1个参数是名字,第2个参数是存储的数据类型
                                 stockPirceListState = getRuntimeContext().getListState(new ListStateDescriptor<Double>("stockPirceListState", Types.DOUBLE));
                             }
                             @Override
                             public void processElement(StockPrice value, KeyedProcessFunction<String, StockPrice, String>.Context ctx, Collector<String> out) throws Exception {
                                 // 新来一条数据,保存到ListState中
                                 stockPirceListState.add(value.getPrice());
                                 // 从ListState中取出数据进行排序,只保留排名前3的数据,更新ListState
                                 Iterable<Double> stockPriceListIterable = stockPirceListState.get();
                                 // 把ListState中数据拷贝到一个新的列表中
                                 List<Double> stockPriceList = new ArrayList<>();
                                 for(Double stockPrice : stockPriceListIterable){
                                     stockPriceList.add(stockPrice);
                                 }
                                 // 对列表中的元素进行降序排序
                                 stockPriceList.sort((t1,t2)-> (int) (t2-t1));
                                 // 只保留列表中最大的3个元素
                                 if(stockPriceList.size()>3){
                                     stockPriceList.remove(3); // 清除第4个元素
                                 }
                                 out.collect("股票ID是" + value.getStockId() + ",最大的3个股票价格是:"+stockPriceList.toString());
                                 // 更新ListState中的数据
                                 stockPirceListState.update(stockPriceList);
                             }
                         }
                );
        resultDS.print();
        env.execute();
    }
}

程序KeyedMapStateDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.Map;

public class KeyedMapStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp() * 1000L)
        );
        SingleOutputStreamOperator<String> resultDS = stockPriceDSWithWatermark.keyBy(stockPrice -> stockPrice.getStockId())
                .process(new KeyedProcessFunction<String, StockPrice, String>() {
                    // 定义状态
                    MapState<Double,Integer>  stockPriceCountMapState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // 初始化状态
                        // MapStateDescriptor有3个参数,第1个参数是名字,第2个参数是Map的key的数据类型,第3个参数是Map的value的类型
                        stockPriceCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Double,Integer>("stockPriceCountMapState",Types.DOUBLE,Types.INT));
                    }

                    @Override
                    public void processElement(StockPrice value, KeyedProcessFunction<String, StockPrice, String>.Context ctx, Collector<String> out) throws Exception {
                        // 拿到一条数据以后,首先判断与该股票价格对应的key是否存在
                        Double stockPrice = value.getPrice();
                        if(stockPriceCountMapState.contains(stockPrice)){
                            // 如果包含与该股票价格对应的key,就把对应的Map的value增加1
                            Integer count = stockPriceCountMapState.get(stockPrice);
                            count = count + 1;
                            stockPriceCountMapState.put(stockPrice,count);
                        }else{
                            // 如果不包含与该股票价格对应的key,就在Map中新建一个键值对
                            stockPriceCountMapState.put(stockPrice,1);
                        }
                        StringBuilder outString = new StringBuilder();
                        outString.append("股票ID是"+value.getStockId()+"\n");
                        // 遍历MapState,输出Map中每个键值对
                        for (Map.Entry<Double, Integer> stockPriceCount : stockPriceCountMapState.entries()) {
                            outString.append("价格是:"+stockPriceCount.getKey()+",该价格出现次数是:"+stockPriceCount.getValue()+"\n");
                        }
                        out.collect(outString.toString());
                    }
                });
        resultDS.print();
        env.execute();
    }
}

在Linux终端中启动NC程序,然后在IDEA中启动程序KeyedMapStateDemo.java,再在NC窗口中输入如下数据:

s1,1,1
s1,2,1
s1,3,1
s2,1,1
s2,2,1

程序KeyedReducingStateDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;

public class KeyedReducingStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp() * 1000L)
        );
        SingleOutputStreamOperator<String> resultDS = stockPriceDSWithWatermark.keyBy(stockPrice -> stockPrice.getStockId())
                .process(new KeyedProcessFunction<String, StockPrice, String>() {
                    // 定义状态
                    ReducingState<Double> stockPriceSumReducingState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // 初始化状态
                        stockPriceSumReducingState = getRuntimeContext().getReducingState(
                                new ReducingStateDescriptor<Double>(
                                        "stockPriceSumReducingState", //名称
                                        (value1,value2)->value1+value2, // 匿名函数
                                        Types.DOUBLE //输出数据类型
                                )
                        );
                    }

                    @Override
                    public void processElement(StockPrice value, KeyedProcessFunction<String, StockPrice, String>.Context ctx, Collector<String> out) throws Exception {
                        // 来一条数据就添加到ReducingState中
                        stockPriceSumReducingState.add(value.getPrice());
                        out.collect("股票ID是:"+value.getStockId()+",累计股票价格总和是:"+stockPriceSumReducingState.get());
                    }
                });
        resultDS.print();
        env.execute();
    }
}

在Linux终端中启动NC程序,然后在IDEA中启动程序KeyedReducingStateDemo.java,再在NC窗口中输入如下数据:

s1,1,3
s1,2,5
s1,3,4
s2,1,7
s2,2,9

程序KeyedAggregatingStateDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;

public class KeyedAggregatingStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        SingleOutputStreamOperator<StockPrice> stockPriceDSWithWatermark = stockPriceDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<StockPrice>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp() * 1000L)
        );
        SingleOutputStreamOperator<String> resultDS = stockPriceDSWithWatermark.keyBy(stockPrice -> stockPrice.getStockId())
                .process(new KeyedProcessFunction<String, StockPrice, String>() {
                    // 定义状态
                    //AggregatingState有两个参数,第1个参数表示输入数据的类型,第2个参数表示输出数据的类型
                    AggregatingState<Double,Double> stockPriceAvgAggregatingState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // 初始化状态
                        stockPriceAvgAggregatingState = getRuntimeContext()
                                // AggregatingStateDescriptor有3个参数,分别表示输入的类型、累加器的类型和输出的类型
                                .getAggregatingState(new AggregatingStateDescriptor<Double, Tuple2<Double,Integer>, Double>(
                                        "stockPriceAvgAggregatingState", // 名字
                                        new AggregateFunction<Double, Tuple2<Double, Integer>, Double>() {
                                            @Override
                                            public Tuple2<Double, Integer> createAccumulator() {
                                                return Tuple2.of(0.0,0); // 初始化累加器
                                            }

                                            @Override
                                            public Tuple2<Double, Integer> add(Double value, Tuple2<Double, Integer> accumulator) {
                                                return Tuple2.of(accumulator.f0+value,accumulator.f1+1);
                                            }

                                            @Override
                                            public Double getResult(Tuple2<Double, Integer> accumulator) {
                                                return accumulator.f0/accumulator.f1;
                                            }

                                            @Override
                                            public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
                                                return Tuple2.of(a.f0+b.f0,a.f1+b.f1);
                                            }
                                        },
                                        Types.TUPLE(Types.DOUBLE, Types.INT)  // 累加器的类型
                                ));
                    }

                    @Override
                    public void processElement(StockPrice value, KeyedProcessFunction<String, StockPrice, String>.Context ctx, Collector<String> out) throws Exception {
                        // 收到一条数据以后,把股票价格添加到AggregatingState中
                        stockPriceAvgAggregatingState.add(value.getPrice());
                        // 从AggregatingState中读取结果
                        Double stockPriceAvg = stockPriceAvgAggregatingState.get();
                        out.collect("股票ID是:"+value.getStockId()+",该股票的平均价格是:"+stockPriceAvg);
                    }
                });
        resultDS.print();
        env.execute();
    }
}

在Linux终端中启动NC程序,然后在IDEA中启动程序KeyedAggregatingStateDemo.java,再在NC窗口中输入如下数据:

s1,1,1
s1,2,2
s1,3,3 
s2,4,4

代码文件ProcessingTimeTimerDemo.java

package cn.edu.xmu;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;

public class ProcessingTimeTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 处理时间语义,不需要分配时间戳和 watermark
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        SingleOutputStreamOperator<StockPrice> stockPriceDS = source.map(s -> {
            String[] element = s.split(",");
            return new StockPrice(element[0], Long.valueOf(element[1]), Double.valueOf(element[2]));
        });
        // 要用定时器,必须基于 KeyedStream
        stockPriceDS.keyBy(value -> value.getStockId())
                // KeyedProcessFunction的第1个参数表示key的类型,第2个参数表述输入的类型,第3个参数表示输出的类型
                .process(new KeyedProcessFunction<String, StockPrice, String>() {
                    @Override
                    public void processElement(StockPrice value, Context ctx, Collector<String> out) throws Exception {
                        Long currTs = ctx.timerService().currentProcessingTime();
                        out.collect("数据到达,到达时间:" + new Timestamp(currTs));
                        // 注册一个 10 秒后的定时器
                        ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
                    }
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));
                    }
                }).print();
        env.execute();
    }
}

代码文件ClickEvent.java

package cn.edu.xmu;

public class ClickEvent {
    public String name;
    public String url;
    public Long myTimeStamp;

    // 一定要提供一个 空参 的构造器(反射的时候要使用)
    public ClickEvent() {
    }
    public ClickEvent(String name, String url, Long myTimeStamp) {
        this.name = name;
        this.url = url;
        this.myTimeStamp = myTimeStamp;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getUrl() {
        return url;
    }
    public void setUrl(String url) {
        this.url = url;
    }
    public Long getMyTimeStamp() {
        return myTimeStamp;
    }
    public void setMyTimeStamp(Long myTimeStamp) {
        this.myTimeStamp = myTimeStamp;
    }
    @Override
    public String toString() {
        return "ClickEvent{" +
                "name=" + name +
                ", url='" + url + '\'' +
                ", myTimeStamp=" + myTimeStamp +
                '}';
    }
}

代码文件EventTimeTimerDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

public class EventTimeTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<ClickEvent> stream = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<ClickEvent>() {
                            @Override
                            public long extractTimestamp(ClickEvent element, long recordTimestamp) {
                                return element.myTimeStamp;
                            }
                        }));
        // 基于 KeyedStream 定义事件时间定时器
        stream.keyBy(data -> true) //将所有数据的key都指定为了true,其实就是所有数据拥有相同的key,会分配到同一个分区
                .process(new KeyedProcessFunction<Boolean, ClickEvent, String>() {
                    @Override
                    public void processElement(ClickEvent value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("数据到达,时间戳为:" + ctx.timestamp());
                        out.collect("数据到达,水位线为: " + ctx.timerService().currentWatermark() + "\n -------分割线-------");
                        // 注册一个 10 秒后的定时器
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定时器触发,触发时间:" + timestamp);
                    }
                })
                .print();
        env.execute();
    }

    // 自定义测试数据源
    public static class CustomSource implements SourceFunction<ClickEvent> {
        @Override
        public void run(SourceContext<ClickEvent> ctx) throws Exception {
            // 直接发出测试数据
            ctx.collect(new ClickEvent("Mary", "./home", 1000L));
            // 为了更加明显,中间停顿 5 秒钟
            Thread.sleep(5000L);
            // 发出 10 秒后的数据
            ctx.collect(new ClickEvent("Mary", "./home", 11000L));
            Thread.sleep(5000L);
            // 发出 10 秒+1ms 后的数据
            ctx.collect(new ClickEvent("Alice", "./cart", 11001L));
            Thread.sleep(5000L);
        }
        @Override
        public void cancel() {
        }
    }
}

代码文件ProcessAllWindowTopNDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;

public class ProcessAllWindowTopNDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<ClickEvent> eventStream = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<ClickEvent>() {
                            @Override
                            public long extractTimestamp(ClickEvent element, long
                                    recordTimestamp) {
                                return element.getMyTimeStamp();
                            }
                        })
                );
        // 只需要 url 就可以统计数量,所以转换成 String 直接开窗统计
        SingleOutputStreamOperator<String> result = eventStream
                .map(new MapFunction<ClickEvent, String>() {
                    @Override
                    public String map(ClickEvent value) throws Exception {
                        return value.url;
                    }
                })
                .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 开滑动窗口,窗口大小时10,滑动步长是5
                .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                        HashMap<String, Long> urlCountMap = new HashMap<>();
                        // 遍历窗口中数据,将浏览量保存到一个HashMap中
                        for (String url : elements) {
                            if (urlCountMap.containsKey(url)) {
                                long count = urlCountMap.get(url);
                                urlCountMap.put(url, count + 1L);
                            } else {
                                urlCountMap.put(url, 1L);
                            }
                        }
                        // 把HashMap中的数据复制到ArrayList中,从而支持排序
                        ArrayList<Tuple2<String, Long>> mapList = new ArrayList<Tuple2<String, Long>>();
                        // 将浏览量数据放入 ArrayList,进行排序
                        for (String key : urlCountMap.keySet()) {
                            mapList.add(Tuple2.of(key, urlCountMap.get(key)));
                        }
                        mapList.sort(new Comparator<Tuple2<String, Long>>() {
                            @Override
                            public int compare(Tuple2<String, Long> o1, Tuple2<String,
                                    Long> o2) {
                                return o2.f1.intValue() - o1.f1.intValue(); //后面的值o2减去前面的值01表示降序排序
                            }
                        });
                        // 取排序后的前两名,构建输出结果
                        StringBuilder result = new StringBuilder();
                        result.append("========================================\n");
                        for (int i = 0; i < Math.min(2,mapList.size()); i++) {
                            Tuple2<String, Long> temp = mapList.get(i);
                            String info = "浏览量 No." + (i + 1) +
                                    " url:" + temp.f0 +
                                    " 浏览量:" + temp.f1 +
                                    " 窗口开始时间 :" + new Timestamp(context.window().getStart()) +
                                    " 窗口结束时间 :" + new Timestamp(context.window().getEnd()) + "\n";
                            result.append(info);
                        }
                        result.append("========================================\n");
                        out.collect(result.toString());
                    }
                });
        result.print();
        env.execute();
    }
    // 自定义测试数据源
    public static class CustomSource implements SourceFunction<ClickEvent> {
        @Override
        public void run(SourceContext<ClickEvent> ctx) throws Exception {
            // 直接发出测试数据
            ctx.collect(new ClickEvent("Mary", "./home", 1000L));
            ctx.collect(new ClickEvent("Mary", "./home", 2000L));
            ctx.collect(new ClickEvent("Alice", "./cart", 3000L));
            ctx.collect(new ClickEvent("Mary", "./cart", 4000L));
            ctx.collect(new ClickEvent("Alice", "./play", 5000L));
            ctx.collect(new ClickEvent("Alice", "./cart", 6000L));
        }

        @Override
        public void cancel() {
        }
    }
}

代码文件UrlViewCount.java

package cn.edu.xmu;

public class UrlViewCount {
    public String url;
    public Long startTs;
    public  Long endTs;
    public Long count;
    // 一定要提供一个 空参 的构造器(反射的时候要使用)
    public UrlViewCount() {
    }
    public UrlViewCount(String url, Long count, Long startTs, Long endTs) {
        this.url = url;
        this.count = count;
        this.startTs = startTs;
        this.endTs = endTs;
    }
    public String getUrl() {
        return url;
    }
    public void setUrl(String url) {
        this.url = url;
    }
    public Long getStartTs() {
        return startTs;
    }
    public void setStartTs(Long startTs) {
        this.startTs = startTs;
    }
    public Long getEndTs() {
        return endTs;
    }
    public void setEndTs(Long endTs) {
        this.endTs = endTs;
    }
    public Long getCount() {
        return count;
    }
    public void setCount(Long count) {
        this.count = count;
    }
    @Override
    public String toString() {
        return "UrlViewCount{" +
                "url=" + url +
                ", startTs='" + startTs + '\'' +
                ", endTs='" + endTs + '\'' +
                ", count=" + count +
                '}';
    }
}

代码文件KeyedProcessTopNDemo.java

package cn.edu.xmu;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.*;

public class KeyedProcessTopNDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 从自定义数据源读取数据
        SingleOutputStreamOperator<ClickEvent> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<ClickEvent>() {
                            @Override
                            public long extractTimestamp(ClickEvent element, long
                                    recordTimestamp) {
                                return element.getMyTimeStamp();
                            }
                        }));
        // 需要按照 url 分组,求出每个 url 的访问量
        // 开窗聚合后,就是普通的流,没有了窗口信息,因此需要自己打上窗口的标记,也就是在UrlViewCount中包含窗口开始时间和结束时间信息
        SingleOutputStreamOperator<UrlViewCount> urlCountStream = eventStream.keyBy(data -> data.url)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
        // 对结果中同一个窗口的统计数据,进行排序处理
        // 需要按照窗口标签(窗口结束时间做keyBy,保证同一窗口时间范围内的结果到一起去,之后再排序取出Top N
        SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data -> data.endTs)
                .process(new TopN(2));
        result.print();
        env.execute();
    }
    // 自定义增量聚合
    public static class UrlViewCountAgg implements AggregateFunction<ClickEvent, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }
        @Override
        public Long add(ClickEvent value, Long accumulator) {
            return accumulator + 1;
        }
        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }
        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }
    // ProcessWindowFunction有4个参数,第1个是输入的类型,要和UrlViewCountAgg的输出类型保持一致,因为UrlViewCountAgg的输出就是ProcessWindowFunction的输入
    // 第2个参数是输出的类型,是UrlViewCount类型,里面包含了窗口结束时间
    // 第3个参数是key的类型,这里把url作为key,所以是String类型;第4个参数是窗口类型
    public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>{
        @Override
        public void process(String s, ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            // 迭代器里面只有一条数据,所以值需要执行一次next()
            Long count = elements.iterator().next();
            long windowStart = context.window().getStart();
            long windowEnd = context.window().getEnd();
            out.collect(new UrlViewCount(s, windowStart, windowEnd, count));
        }
    }
    // KeyedProcessFunction有3个参数,第1个参数表示key的类型,也就是窗口结束时间的类型
    // 第2个参数表示输入的类型,第3个参数是输出的类型
    public static class TopN extends KeyedProcessFunction<Long, UrlViewCount, String>{
        // 用来存不同窗口的统计结果,key=windowEnd, value=List数据
        private Map<Long, List<UrlViewCount>> dataListMap;
        private int threshold;  // n的取值
        public TopN(int threshold){
            this.threshold = threshold;
            this.dataListMap = new HashMap<>();
        }
        @Override
        public void processElement(UrlViewCount value, KeyedProcessFunction<Long, UrlViewCount, String>.Context ctx, Collector<String> out) throws Exception {
            // 进入这个方法的只是一条数据,要排序就需要等所有数据到齐,因此,需要先把数据存起来,而且不同的窗口的数据要分开存
            // 存到HashMap中
            Long windowEnd = value.endTs;
            if (dataListMap.containsKey(windowEnd)) {
                // 不是该url的第一条数据,直接添加到List中
                List<UrlViewCount> dataList = dataListMap.get(windowEnd);
                dataList.add(value);
            }else{
                // 是该url的第一条数据,需要初始化List加入数据
                List<UrlViewCount> dataList = new ArrayList<>();
                dataList.add(value);
                dataListMap.put(windowEnd,dataList);
            }
            // 注册 window end + 1ms 后的定时器,等待所有数据到齐开始排序
            ctx.timerService().registerEventTimeTimer(windowEnd + 1);
        }
        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, UrlViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            // 同一个窗口范围的结果都齐了,开始排序,取Top N
            Long windowEnd = ctx.getCurrentKey();
            // 排序
            List<UrlViewCount> dataList = dataListMap.get(windowEnd);
            dataList.sort(new Comparator<UrlViewCount>() {
                @Override
                public int compare(UrlViewCount o1, UrlViewCount o2) {
                    return Long.valueOf(o2.count).intValue() - Long.valueOf(o1.count).intValue(); // 后面减去前面表示降序
                }
            });
            // 取Top N
            StringBuilder result = new StringBuilder();
            result.append("========================================\n");
            result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");
            for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {
                UrlViewCount UrlViewCount = dataList.get(i);
                String info = "No." + (i + 1) + " "
                        + "url:" + UrlViewCount.url + " "
                        + "浏览量:" + UrlViewCount.count + "\n";
                result.append(info);
            }
            result.append("========================================\n");
            //用完的List要及时清理
            dataList.clear();
            out.collect(result.toString());
        }
    }
    // 自定义测试数据源
    public static class ClickSource implements SourceFunction<ClickEvent> {
        @Override
        public void run(SourceContext<ClickEvent> ctx) throws Exception {
            // 直接发出测试数据
            ctx.collect(new ClickEvent("Mary", "./home", 1000L));
            ctx.collect(new ClickEvent("Mary", "./home", 2000L));
            ctx.collect(new ClickEvent("Alice", "./cart", 3000L));
            ctx.collect(new ClickEvent("Mary", "./cart", 4000L));
            ctx.collect(new ClickEvent("Alice", "./play", 5000L));
            ctx.collect(new ClickEvent("Alice", "./cart", 6000L));
        }
        @Override
        public void cancel() {
        }
    }
}