林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码
第6章 DataSet API
val env = ExecutionEnvironment.getExecutionEnvironment
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
val dataSet : DataSet[String] = env.readTextFile("file:///home/hadoop/word.txt")
transactionId,customerId,itemId,amountPaid
111,1,1,100.0
112,2,2,505.0
113,1,3,510.0
114,2,4,600.0
115,3,2,500.0
package cn.edu.xmu.dblab
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object ReadCSVFile{
def main(args: Array[String]): Unit = {
val bEnv = ExecutionEnvironment.getExecutionEnvironment
val filePath="file:///home/hadoop/sales.csv"
val csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true)
csv.print()
}
case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double)
}
val myArray = Array("hello world","hadoop spark flink")
val collectionSet = env.fromCollection(myArray)
val dataSet = env.fromElements("hadoop","spark","flink")
val numSet = env.generateSequence(1,10)
mysql -u root -p
mysql> create database flink;
mysql> use flink;
mysql> create table student(sno char(8),cno char(2),grade int);
mysql> insert into student values('95001','1',96);
mysql> insert into student values('95002','1',94);
package cn.edu.xmu.dblab
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
object InputFromMySQL{
def main(args: Array[String]): Unit = {
//创建执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//使用JDBC输入格式从关系数据库读取数据
val inputMySQL = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
//数据库连接驱动名称
.setDrivername("com.mysql.jdbc.Driver")
//数据库连接驱动名称
.setDBUrl("jdbc:mysql://localhost:3306/flink")
//数据库连接用户名
.setUsername("root")
//数据库连接密码
.setPassword("123456")
//数据库连接查询SQL
.setQuery("select sno,cno,grade from student")
//字段类型、顺序和个数必须与SQL保持一致
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
)
inputMySQL.print()
}
}
<project>
<groupId>cn.edu.xmu.dblab</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.11.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package cn.edu.xmu.dblab
import org.apache.flink.api.scala.ExecutionEnvironment
object ReadHDFS{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//创建数据源
val inputHDFS = env.readTextFile("hdfs://localhost:9000/word.txt")
//打印输出
inputHDFS.print()
}
}
<project>
<groupId>cn.edu.xmu.dblab</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
vim ~/.bashrc
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
source ~/.bashrc
val dataSet: DataSet[Int] = env.fromElements(1,2,3,4,5)
val mapDS: DataSet[Int] = dataSet.map(x=>x+10)
val dataSet: DataSet[String] = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val flatMapDS: DataSet[String] = dataSet.flatMap(line => line.split(" "))
val dataSet: DataSet[String] = env.fromElements("hadoop","spark","flink")
val mapPartitionDS: DataSet[(String,Int)] = dataSet.mapPartition(in => in.map( word=> (word,1)) )
val dataSet: DataSet[String] = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val filterDS: DataSet[String] = dataSet.filter(line => line.contains("Flink"))
val dataSet: DataSet[Int] = env.fromElements(1,2,3,4,5)
val reduceDS: DataSet[Int] = dataSet.reduce(_+_)
package cn.edu.xmu.dblab
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
case class WordCount(word:String, count:Int)
object ReduceOperator{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//创建数据源
val wordCountDS: DataSet[WordCount] =
env.fromElements(
WordCount("spark",1),
WordCount("spark",2),
WordCount("flink",1),
WordCount("flink",1))
//设定转换操作
val resultDS: DataSet[WordCount] = wordCountDS
.groupBy("word")
.reduce((w1,w2)=>new WordCount(w1.word,w1.count+w2.count))
resultDS.print()
}
}
package cn.edu.xmu.dblab
import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
object AggregationOperator{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//创建数据源
val input: DataSet[(Int,String,Double)] = env.fromElements(
(1,"spark",3.0),
(1,"spark",4.0),
(1,"spark",4.0),
(1,"flink",5.0),
(1,"flink",6.0),
)
//指定针对数据集的转换操作
val output: DataSet[(Int,String,Double)] = input
.aggregate(Aggregations.SUM,0)
.and(Aggregations.MIN,2)
//打印输出
output.print()
}
}
package cn.edu.xmu.dblab
import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
object AggregationOperator{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//创建数据源
val input: DataSet[(Int,String,Double)] = env.fromElements(
(1,"spark",3.0),
(1,"spark",4.0),
(1,"spark",4.0),
(1,"flink",5.0),
(1,"flink",6.0),
)
//指定针对数据集的转换操作
val output: DataSet[(Int,String,Double)] = input
.groupBy(1)
.aggregate(Aggregations.SUM,0)
.and(Aggregations.MIN,2)
//打印输出
output.print()
}
}
val dataSet: DataSet[String] = env.fromElements("hadoop","hadoop","spark","flink")
val distinctDS: DataSet[String] = dataSet.distinct()
val input1: DataSet[(Int,String)] = env.fromElements((1,"spark"),(2,"flink"))
val input2: DataSet[(String,Int)] = env.fromElements(("spark",1),("flink",2))
val result = input1.join(input2).where(0).equalTo(1)
result.print()
package cn.edu.xmu.dblab
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
case class Student(name: String, lesson: String, score: Int)
object JoinFunctionTest{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//创建数据源
val students: DataSet[Student] = env
.fromElements(Student("xiaoming","computer",90),Student("zhangmei","english",94))
//指定针对数据源的转换操作
val weights: DataSet[(String,Double)] = env.fromElements(("computer",0.7),("english",0.4))
val weightedScores = students.join(weights).where("lesson").equalTo(0){
(left,right) => (left.name,left.lesson,left.score*right._2)
}
//打印输出
weightedScores.print()
}
}
package cn.edu.xmu.dblab
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
case class Student(name: String, lesson: String, score: Int)
object FlatJoinFunctionTest{
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//创建数据源
val students: DataSet[Student] = env
.fromElements(Student("xiaoming","computer",90),Student("zhangmei","english",94))
//指定针对数据集的转换操作
val weights: DataSet[(String,Double)] = env.fromElements(("computer",0.7),("english",0.4))
val weightedScores = students.join(weights).where("lesson").equalTo(0){
(left,right,out:Collector[(String,String,Double)]) =>
if (right._2>0.5) out.collect(left.name,left.lesson,left.score*right._2)
}
//打印输出
weightedScores.print()
}
}
package cn.edu.xmu.dblab
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
case class Coord(id:Int,x:Int,y:Int)
object CrossOperator {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)
//创建数据源
val coords1: DataSet[Coord] = env.fromElements(Coord(1,4,5),Coord(2,6,7))
val coords2:DataSet[Coord] = env.fromElements(Coord(3,8,9),Coord(4,10,11))
//指定针对数据集的转换操作
val distances = coords1.cross(coords2){
(c1,c2) => val dist = math.sqrt(math.pow(c1.x-c2.x,2)+math.pow(c1.y-c2.y,2))
(c1.id,c2.id,dist)
}
//打印输出
distances.print()
}
}
val dataSet1: DataSet[(Int,String)] = env.fromElements((1,"spark"),(2,"flink"))
val dataSet2: DataSet[(Int,String)] = env.fromElements((3,"hadoop"),(4,"storm"))
val result: DataSet[(Int,String)] = dataSet1.union(dataSet2)
val dataSet: DataSet[String] = ...
val result = dataSet.rebalance().map{...}
val dataSet: DataSet[(String,Int)] = ...
val result = dataSet.partitionByHash(0).mapPartition{...}
val dataSet: DataSet[(String,Int)] = ...
val result = dataSet.partitionByRange(0).mapPartition{...}
val dataSet: DataSet[(Int,String)] = env.fromElements((1,"spark"),(2,"flink"))
dataSet.writeAsText("file:///home/hadoop/output")
env.execute()
val dataSet: DataSet[(Int,String)] = env.fromElements((1,"spark"),(2,"flink"))
dataSet.writeAsText("hdfs://localhost:9000/output")
env.execute()
package cn.edu.xmu.dblab
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.types.Row
import org.apache.flink.api.scala.DataSet
import scala.collection.mutable.ArrayBuffer
object WriteMySQL {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val arr = new ArrayBuffer[Row]()
val row1 = new Row(3)
row1.setField(0, "95001")
row1.setField(1, "2")
row1.setField(2, 94)
val row2 = new Row(3)
row2.setField(0, "95002")
row2.setField(1, "2")
row2.setField(2, 88)
arr.+=(row1)
arr.+=(row2)
val data: DataSet[Row] = env.fromCollection(arr)
data.output(JDBCOutputFormat.buildJDBCOutputFormat()
// 数据库连接驱动名称
.setDrivername("com.mysql.jdbc.Driver")
// 数据库连接地址
.setDBUrl("jdbc:mysql://localhost:3306/flink")
// 数据库连接用户名
.setUsername("root")
// 数据库连接密码
.setPassword("123456")
// 数据库插入SQL
.setQuery("insert into student (sno,cno,grade) values(?,?,?)")
.finish())
env.execute("insert data to mysql")
System.out.println("MySQL写入成功!")
}
}
package cn.edu.xmu.dblab
import org.apache.flink.api.scala._
object ComputePi{
def main(args: Array[String]): Unit = {
//创建执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 创建初始数据集
val initial = env.fromElements(0)
//执行迭代计算
val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
val result = iterationInput.map { i =>
val x = Math.random()
val y = Math.random()
i + (if (x * x + y * y < 1) 1 else 0)
}
result
}
//计算圆周率
val result = count map { c => c / 10000.0 * 4 }
//打印输出
result.print()
}
}
//读取初始数据集
val initialSolutionSet: DataSet[(Long, Double)] = // [...]
val initialWorkset: DataSet[(Long, Double)] = // [...]
//设置迭代次数
val maxIterations = 100
val keyPosition = 0
//应用增量迭代方法
val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
(solution, workset) =>
val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())
val nextWorkset = deltas.filter(new FilterByThreshold())
(deltas, nextWorkset)
}
//输出迭代计算的结果
result.writeAsCsv(outputPath)
env.execute()
package cn.edu.xmu.dblab
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable.ListBuffer
import org.apache.flink.api.scala._
object BroadcastDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val rawtData = ListBuffer[Tuple2[String,Int]]()
rawtData.append(("hadoop",48))
rawtData.append(("spark",42))
rawtData.append(("flink",46))
val tupleData = env.fromCollection(rawtData)
//创建需要广播的数据集
val broadcastData = tupleData.map(x=>{
Map(x._1->x._2)
})
val books = env.fromElements("hadoop","spark","flink")
val result = books.map(new RichMapFunction[String,String] {
var listData: java.util.List[Map[String,Int]] = null
var allMap = Map[String,Int]()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//获取广播变量数据集
this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")
val it = listData.iterator()
while (it.hasNext){
val next = it.next()
allMap = allMap.++(next)
}
}
override def map(value: String) = {
val amount = allMap.get(value).get
"The amount of "+value+" is:"+amount
}
}).withBroadcastSet(broadcastData,"broadcastMapName")
result.print()
}
}