第6章代码-林子雨编著-《Flink编程基础(Scala版)》

大数据学习路线图

林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码

第6章 DataSet API

val env = ExecutionEnvironment.getExecutionEnvironment
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.2</version>
</dependency>
val dataSet : DataSet[String] = env.readTextFile("file:///home/hadoop/word.txt")
transactionId,customerId,itemId,amountPaid
111,1,1,100.0
112,2,2,505.0
113,1,3,510.0
114,2,4,600.0
115,3,2,500.0
package cn.edu.xmu.dblab

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object ReadCSVFile{
  def main(args: Array[String]): Unit = {
    val bEnv = ExecutionEnvironment.getExecutionEnvironment
    val filePath="file:///home/hadoop/sales.csv"
    val csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true)
    csv.print()
  }
  case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double)
}
val myArray = Array("hello world","hadoop spark flink")
val collectionSet = env.fromCollection(myArray)
val dataSet = env.fromElements("hadoop","spark","flink")
val numSet = env.generateSequence(1,10)
mysql -u root -p
mysql> create database flink;
mysql> use flink;
mysql> create table student(sno char(8),cno char(2),grade int);
mysql> insert into student values('95001','1',96);
mysql> insert into student values('95002','1',94);
package cn.edu.xmu.dblab

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

object InputFromMySQL{
  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //使用JDBC输入格式从关系数据库读取数据
    val inputMySQL = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
      //数据库连接驱动名称
      .setDrivername("com.mysql.jdbc.Driver")
      //数据库连接驱动名称
      .setDBUrl("jdbc:mysql://localhost:3306/flink")
      //数据库连接用户名
      .setUsername("root")
      //数据库连接密码
      .setPassword("123456")
      //数据库连接查询SQL
      .setQuery("select sno,cno,grade from student")
      //字段类型、顺序和个数必须与SQL保持一致
      .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
      .finish()
    )
    inputMySQL.print()
  }
}
<project>
    <groupId>cn.edu.xmu.dblab</groupId>
    <artifactId>simple-project</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>Simple Project</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.40</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
package cn.edu.xmu.dblab

import org.apache.flink.api.scala.ExecutionEnvironment

object ReadHDFS{
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //创建数据源
val inputHDFS = env.readTextFile("hdfs://localhost:9000/word.txt")

    //打印输出
inputHDFS.print()
  }
}
<project>
    <groupId>cn.edu.xmu.dblab</groupId>
    <artifactId>simple-project</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>Simple Project</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
vim ~/.bashrc
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
source ~/.bashrc
val dataSet: DataSet[Int] = env.fromElements(1,2,3,4,5)
val mapDS: DataSet[Int] = dataSet.map(x=>x+10)
val dataSet: DataSet[String] = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val flatMapDS: DataSet[String] = dataSet.flatMap(line => line.split(" "))
val dataSet: DataSet[String] = env.fromElements("hadoop","spark","flink")
val mapPartitionDS: DataSet[(String,Int)] = dataSet.mapPartition(in => in.map( word=> (word,1)) )
val dataSet: DataSet[String] = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val filterDS: DataSet[String] = dataSet.filter(line => line.contains("Flink"))
val dataSet: DataSet[Int] = env.fromElements(1,2,3,4,5)
val reduceDS: DataSet[Int] = dataSet.reduce(_+_)
package cn.edu.xmu.dblab

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

case class WordCount(word:String, count:Int)

object ReduceOperator{
  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

//创建数据源
val wordCountDS: DataSet[WordCount] = 
      env.fromElements(
        WordCount("spark",1),
        WordCount("spark",2),
        WordCount("flink",1),
        WordCount("flink",1))
    //设定转换操作
val resultDS: DataSet[WordCount]  = wordCountDS
        .groupBy("word")
        .reduce((w1,w2)=>new WordCount(w1.word,w1.count+w2.count))
    resultDS.print()
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

object AggregationOperator{
  def main(args: Array[String]): Unit = {

//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment

    //创建数据源
val input: DataSet[(Int,String,Double)] = env.fromElements(
      (1,"spark",3.0),
      (1,"spark",4.0),
      (1,"spark",4.0),
      (1,"flink",5.0),
      (1,"flink",6.0),
)

//指定针对数据集的转换操作
    val output: DataSet[(Int,String,Double)] = input
      .aggregate(Aggregations.SUM,0)
      .and(Aggregations.MIN,2)

    //打印输出 
    output.print()
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

object AggregationOperator{
  def main(args: Array[String]): Unit = {

    //获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment

//创建数据源
    val input: DataSet[(Int,String,Double)] = env.fromElements(
      (1,"spark",3.0),
      (1,"spark",4.0),
      (1,"spark",4.0),
      (1,"flink",5.0),
      (1,"flink",6.0),
)

//指定针对数据集的转换操作
    val output: DataSet[(Int,String,Double)] = input
      .groupBy(1)
.aggregate(Aggregations.SUM,0)
      .and(Aggregations.MIN,2)

    //打印输出
output.print()
  }
}
val dataSet: DataSet[String] = env.fromElements("hadoop","hadoop","spark","flink")
val distinctDS: DataSet[String] = dataSet.distinct()
val input1: DataSet[(Int,String)] = env.fromElements((1,"spark"),(2,"flink"))
val input2: DataSet[(String,Int)] = env.fromElements(("spark",1),("flink",2))
val result = input1.join(input2).where(0).equalTo(1)
result.print()
package cn.edu.xmu.dblab

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

case class Student(name: String, lesson: String, score: Int)

object JoinFunctionTest{
  def main(args: Array[String]): Unit = {

    //获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment

//设置程序并行度
env.setParallelism(1)

//创建数据源
    val students: DataSet[Student] = env
      .fromElements(Student("xiaoming","computer",90),Student("zhangmei","english",94))

    //指定针对数据源的转换操作
val weights: DataSet[(String,Double)] = env.fromElements(("computer",0.7),("english",0.4))
    val weightedScores = students.join(weights).where("lesson").equalTo(0){
      (left,right) => (left.name,left.lesson,left.score*right._2)
}

//打印输出
    weightedScores.print()
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

case class Student(name: String, lesson: String, score: Int)

object FlatJoinFunctionTest{
  def main(args: Array[String]): Unit = {

    //获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment

//设置程序并行度
env.setParallelism(1)

//创建数据源
    val students: DataSet[Student] = env
      .fromElements(Student("xiaoming","computer",90),Student("zhangmei","english",94))

    //指定针对数据集的转换操作
    val weights: DataSet[(String,Double)] = env.fromElements(("computer",0.7),("english",0.4))
    val weightedScores = students.join(weights).where("lesson").equalTo(0){
      (left,right,out:Collector[(String,String,Double)]) =>
        if (right._2>0.5) out.collect(left.name,left.lesson,left.score*right._2)
}
//打印输出
    weightedScores.print()
  }
}
package cn.edu.xmu.dblab

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

case class Coord(id:Int,x:Int,y:Int)

object CrossOperator {
  def main(args: Array[String]): Unit = {

    //获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment

//设置程序并行度
env.setParallelism(1)

//创建数据源
    val coords1: DataSet[Coord] = env.fromElements(Coord(1,4,5),Coord(2,6,7))
val coords2:DataSet[Coord] = env.fromElements(Coord(3,8,9),Coord(4,10,11))

//指定针对数据集的转换操作
    val distances = coords1.cross(coords2){
      (c1,c2) => val dist = math.sqrt(math.pow(c1.x-c2.x,2)+math.pow(c1.y-c2.y,2))
      (c1.id,c2.id,dist)
}

//打印输出
    distances.print()
  }
}
val dataSet1: DataSet[(Int,String)] = env.fromElements((1,"spark"),(2,"flink"))
val dataSet2: DataSet[(Int,String)] = env.fromElements((3,"hadoop"),(4,"storm"))
val result: DataSet[(Int,String)] = dataSet1.union(dataSet2)
val dataSet: DataSet[String] = ...
val result = dataSet.rebalance().map{...}
val dataSet: DataSet[(String,Int)] = ...
val result = dataSet.partitionByHash(0).mapPartition{...}
val dataSet: DataSet[(String,Int)] = ...
val result = dataSet.partitionByRange(0).mapPartition{...}
val dataSet: DataSet[(Int,String)] = env.fromElements((1,"spark"),(2,"flink"))
dataSet.writeAsText("file:///home/hadoop/output")
env.execute()
val dataSet: DataSet[(Int,String)] = env.fromElements((1,"spark"),(2,"flink"))
dataSet.writeAsText("hdfs://localhost:9000/output")
env.execute()
package cn.edu.xmu.dblab

import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.types.Row
import org.apache.flink.api.scala.DataSet
import scala.collection.mutable.ArrayBuffer

object WriteMySQL {
  def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment

val arr = new ArrayBuffer[Row]()

    val row1 = new Row(3)
    row1.setField(0, "95001")
    row1.setField(1, "2")
    row1.setField(2, 94)

    val row2 = new Row(3)
    row2.setField(0, "95002")
    row2.setField(1, "2")
    row2.setField(2, 88)

    arr.+=(row1)
    arr.+=(row2)

    val data: DataSet[Row] = env.fromCollection(arr)
    data.output(JDBCOutputFormat.buildJDBCOutputFormat()
      // 数据库连接驱动名称
      .setDrivername("com.mysql.jdbc.Driver")
      // 数据库连接地址
      .setDBUrl("jdbc:mysql://localhost:3306/flink")
      // 数据库连接用户名
      .setUsername("root")
      // 数据库连接密码
      .setPassword("123456")
      // 数据库插入SQL
      .setQuery("insert into student (sno,cno,grade) values(?,?,?)")
      .finish())

    env.execute("insert data to mysql")
    System.out.println("MySQL写入成功!")

  }
}
package cn.edu.xmu.dblab
import org.apache.flink.api.scala._
object ComputePi{
  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 创建初始数据集
    val initial = env.fromElements(0)

    //执行迭代计算
val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
      val result = iterationInput.map { i =>
        val x = Math.random()
        val y = Math.random()
        i + (if (x * x + y * y < 1) 1 else 0)
      }
      result
    }

    //计算圆周率
val result = count map { c => c / 10000.0 * 4 }

//打印输出
    result.print()    
  }
}
//读取初始数据集
val initialSolutionSet: DataSet[(Long, Double)] = // [...]
val initialWorkset: DataSet[(Long, Double)] = // [...]

//设置迭代次数
val maxIterations = 100
val keyPosition = 0

//应用增量迭代方法
val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
  (solution, workset) =>
    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())

    val nextWorkset = deltas.filter(new FilterByThreshold())

    (deltas, nextWorkset)
}

//输出迭代计算的结果
result.writeAsCsv(outputPath)

env.execute()
package cn.edu.xmu.dblab

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable.ListBuffer
import org.apache.flink.api.scala._

object BroadcastDemo {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val rawtData = ListBuffer[Tuple2[String,Int]]()
    rawtData.append(("hadoop",48))
    rawtData.append(("spark",42))
    rawtData.append(("flink",46))
val tupleData = env.fromCollection(rawtData)

//创建需要广播的数据集
    val broadcastData = tupleData.map(x=>{
      Map(x._1->x._2)
    })

    val books = env.fromElements("hadoop","spark","flink")

    val result = books.map(new RichMapFunction[String,String] {

      var listData: java.util.List[Map[String,Int]] = null
      var allMap  = Map[String,Int]()

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        //获取广播变量数据集
        this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")
        val it = listData.iterator()
        while (it.hasNext){
          val next = it.next()
          allMap = allMap.++(next)
        }
      }

      override def map(value: String) = {
        val amount = allMap.get(value).get
        "The amount of "+value+" is:"+amount
      }
    }).withBroadcastSet(broadcastData,"broadcastMapName")

    result.print()
  }
}