【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学信息学院2020级研究生 刘官山
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
相关教材:林子雨、陶继平编著《Flink编程基础(Scala版)》(官网)
相关案例:基于Scala语言的Flink数据处理分析案例集锦
本实验用Scala语言编写了Flink程序,对Pokemon数据集进行了数据处理和分析,之后对分析结果使用Echarts编写了可视化HTML页面
一、实验环境
本次作业使用的环境和软件如下:
(1)Windows10
(2)Jdk 1.8.0
(3)Scala 2.12.7
(4)Flink-scala_2.12-1.11.2
(5)Flink-clients_2.12-1.11.2
(6)IDEA 2021
(7)Echarts
搭建方法:
- 打开IDEA,新建一个Maven项目
- File->Settings->Plugin 在Markplace中搜索Scala 点击Installed
- File->Project Structure->Libraries ->点击“+”按钮->Scala SDK->Download->选择2.12版本进行下载
- 在pom文件中导入flink相关的包,配置如下:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>
</dependencies>
至此环境搭建完成
注:未安装Java环境还需下载和配置JAVA环境
二、数据集
1. 数据集介绍
数据集使用了Pokemon数据集,可以在百度云中下载处理过的数据集和源代码文件:
链接:https://pan.baidu.com/s/1LAj8aajUlB40AzMsvAh4FQ
提取码:ziyu
也可以通过https://public.tableau.com/s/sites/default/files/media/Resources/pokemon.xlsx下载原数据集
数据集分为四个部分:
1.Pokemon (#,Name,Type,HP,Attack,Defense,Special Attack,Special Defense,Speed)
记录了宝可梦的编号,名称,种族,生命值,攻击,防御,特攻,特防,速度等属性
2.Moves (Name,Type,Cat.,Power,Acc.,PP,TM,Effect,Prob.)
记录了宝可梦的各种技能信息,本实验中不曾使用此项记录
3.Evolution (Evolving from,Evolving to,level,Condition,Evolution Type)
记录了宝可梦进化的进化源,进化方向,进化登记,进化条件,进化种类等信息
4.TypeChart (Attack,Defense,Effectiveness,Multiplier)
记录了各个种族之间的克制关系:包含以下属性:攻击的种族、防御的种族,攻击效果描述、攻击倍率
2. 数据集处理
原数据集为含有多个表的xlsx文件,为便于Flink进行处理将每个表转为CSV文件单独存储起来,变为了Evolution.csv,Pokemon.csv,以及Type.csv三个文件
将文件复制到工程文件的resources文件夹
三、数据处理
1. 环境创建和数据导入
先创建执行环境,同时为了输出的数据清晰,设置并行度为1
val env=ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
之后把数据集导入,按照列属性封装成相应的DataSet,同时将第一行属性行忽视
val inputpath="src/main/resources/Pokemon/Pokemon.csv"
val InputData1=env.readCsvFile[(Double,String,String,Int,Int,Int,Int,Int,Int,Int)](inputpath,ignoreFirstLine = true)
val InputData2=env.readCsvFile[(String,String,String,Double)]("src/main/resources/Pokemon/Type.csv",ignoreFirstLine = true)
val InputData3=env.readCsvFile[(String,String,Int,String,String)]("src/main/resources/Pokemon/Evolution.csv",ignoreFirstLine = true)
2. 对所有Pokemon的种族进行统计,并按种族数量从大到小进行排序
val resultData1=InputData1.map(line=>(line._3,1)).groupBy(0).sum(1).sortPartition(1,Order.ASCENDING)
resultData1.print("种族及数量")
3. 统计所有Pokemon的种族值(所有单项属性值的和),选出数值前五的Pokemon
val resultData2=InputData1.map(line=>(line._2,line._4,line._5,line._6,line._7,line._8,line._9,line._10)).distinct().sortPartition(1,Order.DESCENDING).first(5)
resultData2.print("TOP5宝可梦")
4. 将所有Pokemon按照种族值分类,算出每个种族各项数据的平均值
为计算出平均值,对每行数据进行了处理:在每一行后增加了一列数值为1的属性,求和时将其同步相加便得到了各个种族的总数量,进而求出各项的平均值
val resultData3=InputData1.map(line=>(line._3,line._4,line._5,line._6,line._7,line._8,line._9,line._10,1)).groupBy(0).reduce((x,y)=>(x._1,x._2+y._2,x._3+y._3,x._4+y._4,x._5+y._5,x._6+y._6,x._7+y._7,x._8+y._8,x._9+y._9)).map(line=>("种族:",line._1,"总值平均",line._2/line._9,"HP平均",line._3/line._9,"Attack均值",line._4/line._9,"Defense均值",line._5/line._9,"特攻均值",line._6/line._9,"特防均值",line._7/line._9,"速度均值",line._8/line._9)).sortPartition(1,Order.DESCENDING)
resultData3.print("各种族值均值")
5. 对种族之间的克制情况进行统计,找出完全无效(No Effect)的种族克制
val resultData4=InputData2.filter(line=>(line._3.equals("No Effect"))).map(line=>(line._2,line._1,1)).groupBy(0).reduce((x,y)=>(x._1,x._2+y._2,x._3+y._3))
resultData4.print("免疫情况")
6. 对Evolution数据集进行自连接操作,找出所有Pockmon的进化路径并返回输入的Pockmon的进化路径
用函数以及统一写到main之中两种方法进行了实现
val name="Bulbasaur"
val resultData5=InputData3.join(InputData3).where(1).equalTo(0).apply((a,b)=>(a._1,a._2,b._2)).filter(x=>(x._1.equals(name)||x._2.equals(name)||x._3.equals(name)))
resultData5.print("进化过程")
def selectEvolution(name:String): DataSet[(String, String, String)] ={
val env=ExecutionEnvironment.getExecutionEnvironment
val InputData3=env.readCsvFile[(String,String,Int,String,String)]("src/main/resources/Pokemon/Evolution.csv",ignoreFirstLine = true)
val resultData1=InputData3.join(InputData3).where(1).equalTo(0).apply((a,b)=>(a._1,a._2,b._2)).filter(x=>(x._1.equals(name)||x._2.equals(name)||x._3.equals(name)))
return resultData1
}
def main(args: Array[String]): Unit = {
selectEvolution("Bulbasaur").print()
}
7. 执行任务
env.execute()
8.执行结果(部分)
(Bulbasaur,Ivysaur,Venusaur)
进化过程> (Bulbasaur,Ivysaur,Venusaur)
免疫情况> (DARK,PSYCHIC,1)
免疫情况> (FAIRY,DRAGON,1)
免疫情况> (FLYING,GROUND,1)
免疫情况> (GHOST,NORMALFIGHTING,2)
免疫情况> (GROUND,ELECTRIC,1)
免疫情况> (NORMAL,GHOST,1)
免疫情况> (STEEL,POISON,1)
Type and Num> (FAIRY,34)
Type and Num> (ICE,37)
Type and Num> (DRAGON,44)
Type and Num> (GHOST,45)
......
各种族值均值> (种族:,WATER,总值平均,422,HP平均,70,Attack均值,71,Defense均值,73,特攻均值,72,特防均值,69,速度均值,64)
各种族值均值> (种族:,STEEL,总值平均,484,HP平均,64,Attack均值,92,Defense均值,116,特攻均值,71,特防均值,82,速度均值,57)
各种族值均值> (种族:,ROCK,总值平均,441,HP平均,66,Attack均值,89,Defense均值,106,特攻均值,57,特防均值,71,速度均值,50)
......
TOP5宝可梦> (Mega Mewtwo X,780,106,190,100,154,100,130)
TOP5宝可梦> (Mega Mewtwo Y,780,106,150,70,194,120,140)
TOP5宝可梦> (Arceus,720,120,120,120,120,120,120)
TOP5宝可梦> (White Kyurem,700,125,120,90,170,100,95)
TOP5宝可梦> (Mega Garchomp,700,108,170,115,120,95,92)进程已结束,退出代码为 0
四、数据图形化
数据图形化我使用了Echarts:一个使用 JavaScript 实现的开源可视化库
利用script标签导入了Echarts库之后编写了HTML文件作为数据图形化
利用其功能结合Flink处理好的数据构建了如下图表(图表都可以动态操作)