基于Flink的对豆瓣电影数据的分析与处理

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学信息学院2020级研究生 寿铁祺
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
相关教材:林子雨、陶继平编著《Flink编程基础(Scala版)》(官网
相关案例:基于Scala语言的Flink数据处理分析案例集锦
本实验爬取了豆瓣电影的31441条电影记录,使用scala语言进行flink编程处理了数据,最后结果使用python进行了可视化

一、 实验环境:

1 系统:Ubuntu 18.04

2 编程语言:Scala 2.12.13,Python 3.7.7

3 Java环境:JDK 1.8.0_162

4 框架:Flink-1.11.2,Hadoop-3.1.3,Maven-3.6.3

5 Python包:Pandas,Plotly

6 开发工具:Idea-2020.2.3,Jupyter Notebook

其中的绝大部分环境都可按照课堂PPT和官网资料安装配置,额外的Python包可通过以下命令安装:

Pandas:pip install pandas

Plotly:pip install plotly

本实验涉及到的所有数据集和代码,可以从百度网盘下载。

链接:https://pan.baidu.com/s/1NjznWRlJw5HIywo3JEdbig 
提取码:ziyu

二、 实验数据集:

1. 数据集说明:

本次实验采用的数据于2019年3月从豆瓣电影(https://movie.douban.com/)上爬取,包含31441条电影记录。每条记录包含以下字段:

1) id:一串整数,标识网页链接的后缀。

2) name:电影名。

3) year:上映年份。

4) ratingsum:评分人数。

5) genre:电影类型。

6) country:制片国家/地区。

数据集预览:

图2.1 数据集预览

2.将数据集存放在分布式文件系统HDFS中:

A. 启动Hadoop中的HDFS组件,在命令行运行下面命令:

  1. /usr/local/hadoop/sbin/start-dfs.sh
Shell

B. 在hadoop上登录用户创建目录,在命令行运行下面命令:

  1. hdfs dfs -mkdir -p /user/hadoop
Shell

C. 把本地文件系统中的数据集albums.csv上传到分布式文件系统HDFS中:

  1. hdfs dfs -put douban_2.csv
Shell

三、 使用 Flink进行数据分析:

主要完成以下几个分析任务:

1 每个年份的电影数

2 每个评分区间的电影数

3 每个制片地区的电影数

4 每个制片地区电影的平均评分

5 每种类型的电影数

6 每种类型电影的平均评分

1. 创建idea工程:

(1) 打开Linux终端,输入以下命令启动idea:

/usr/local/idea/bin/idea.sh

选择新建项目,并创建Maven项目,如下:

图3.1 新建Maven项目

图3.2 填写项目信息

(2)初次使用请先下载Scala插件并启用:

图3.3 安装启用Scala插件

(3)然后为项目添加框架支持,在项目名称“MovieAnalysis”上点击鼠标右键,在弹出的子菜单中选择“Add Framework Support”:

图3.4 添加框架支持

(4)修改pom.xml如下:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6.  
  7. <groupId>dblab</groupId>
  8. <artifactId>MovieAnalysis</artifactId>
  9. <version>1.0-SNAPSHOT</version>
  10.  
  11.  
  12. <repositories>
  13. <repository>
  14. <id>alimaven</id>
  15. <name>aliyun maven</name>
  16. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  17. </repository>
  18. </repositories>
  19.  
  20. <dependencies>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-scala_2.12</artifactId>
  24. <version>1.11.2</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.flink</groupId>
  28. <artifactId>flink-streaming-scala_2.12</artifactId>
  29. <version>1.11.2</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.flink</groupId>
  33. <artifactId>flink-clients_2.12</artifactId>
  34. <version>1.11.2</version>
  35. </dependency>
  36.  
  37.  
  38. <dependency>
  39. <groupId>org.apache.hadoop</groupId>
  40. <artifactId>hadoop-common</artifactId>
  41. <version>3.1.3</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.apache.hadoop</groupId>
  45. <artifactId>hadoop-client</artifactId>
  46. <version>3.1.3</version>
  47. </dependency>
  48.  
  49. </dependencies>
  50.  
  51.  
  52. <build>
  53. <plugins>
  54. <plugin>
  55. <groupId>net.alchim31.maven</groupId>
  56. <artifactId>scala-maven-plugin</artifactId>
  57. <version>3.4.6</version>
  58. <executions>
  59. <execution>
  60. <goals>
  61. <goal>compile</goal>
  62. </goals>
  63. </execution>
  64. </executions>
  65. </plugin>
  66. <plugin>
  67. <groupId>org.apache.maven.plugins</groupId>
  68. <artifactId>maven-assembly-plugin</artifactId>
  69. <version>3.0.0</version>
  70. <configuration>
  71. <descriptorRefs>
  72. <descriptorRef>jar-with-dependencies</descriptorRef>
  73.    </descriptorRefs>
  74. </configuration>
  75. <executions>
  76. <execution>
  77. <id>make-assembly</id>
  78. <phase>package</phase>
  79. <goals>
  80. <goal>single</goal>
  81. </goals>
  82. </execution>
  83. </executions>
  84. </plugin>
  85. </plugins>
  86. </build>
  87.  
  88. </project>
XML

(5)在项目DoubanMovie/src/main/java目录上单击鼠标右键,在弹出的菜单中选择“New”,再在弹出的菜单中选择“Scala Class”,然后,在弹出的界面中,输入类的名称“movieAnalyze”,类型选择“Object”,然后回车,就可以创建一个空的代码文件movieAnalyze.scala。

图3.5 新建代码文件

图3.6 创建Scala单例对象

2. 进行数据统计分析:

首先读取保存到HDFS上的文件:

  1.  /*读取HDFS上的数据集*/
  2. val inputHDFS: DataSet[movie] = env.readCsvFile[movie]("hdfs://localhost:9000/user/hadoop/douban_2.csv", ignoreFirstLine = true)
scala

接下来逐个完成数据统计:

(1) 每个年份的电影数

  1. /*统计每个年份的电影数*/
  2. val yearL = inputHDFS.map(=> (x.year, 1) ) //抽取每条电影记录的年份
  3. val yearNum = yearL.groupBy(0).sum(1) //按年份分组并计数
  4. yearNum.writeAsCsv("file:///home/hadoop/Output/yearNum.csv") //将结果写至本地文件
scala

(2) 每个评分区间的电影数

以1分为区间步长分别统计[1,10]分每个区间段的电影数。

  1. /*统计每个评分区间的电影数*/
  2. val ratingNum_L = new ArrayBuffer[(String, Int)]()  //先建立数组用于保存结果
  3. for(<- 1 to 9){   //对1-9分区间的电影数目进行循环统计
  4.   val tmp = inputHDFS.filter(_.rating.toString.startsWith(i.toString + '.')) //筛选出[i, i+1)分区间的电影
  5.   ratingNum_L.append((i.toString, tmp.collect().size)) //将本区间段的电影数目保存到数组
  6. }
  7. val ratingNum = env.fromCollection(ratingNum_L)  //将数组类型转换成DataSet类型
  8. ratingNum.writeAsCsv("file:///home/hadoop/Output/ratingNum.csv")  //将结果写至本地文件
scala

(3) 每个制片地区的电影数

  1. /*统计每个制片地区的电影数*/
  2. val filmCountry_L = inputHDFS.map(=> (x.country.split('/')(0), 1)) //将首要制片地区提取出来
  3. val filmCountryNum = filmCountry_L.groupBy(0).sum(1)  //对制片地区分组计数
  4. val filmCountryNum_Sort = filmCountryNum.sortPartition(1, Order.DESCENDING)  //对统计结果降序排序
  5. filmCountryNum_Sort.writeAsCsv("file:///home/hadoop/Output/filmCountryNum_Sort.csv") //将结果写至本地文件
scala

(4) 每个制片地区电影的平均评分

  1. /*统计每个制片地区电影的平均评分*/
  2. val filmCountry_rating_c = inputHDFS.map(=> (x.country.split('/')(0), x.rating)) //提取制片地区和评分
  3. val filmCountry_rating_sum = filmCountry_rating_c.groupBy(0).sum(1) //统计制片地区的电影总评分
  4. val filmCountry_ratingAverage = filmCountry_rating_sum.join(filmCountryNum).where(0).equalTo(0){ //将制片地区的总评分表和总电影数表合并
  5.   (left, right) => (left._1, left._2 / right._2)  //用总评分/总电影数计算平均得分
  6. }
  7. val filmCountry_ratingAverage_Sort = filmCountry_ratingAverage.sortPartition(1, Order.DESCENDING)  //降序排序
  8. filmCountry_ratingAverage_Sort.writeAsCsv("file:///home/hadoop/Output/filmCountry_ratingAverage_Sort.csv") //将结果写至本地文件
scala

(5) 每种类型的电影数

  1. /*统计每种类型的电影数*/
  2. val filmGenres = inputHDFS.flatMap(=> x.genre.split('/'))   //将每部电影的所属的多个类型展开
  3. val filmGenres_L = filmGenres.collect().toList  //所属类型由DataSet转List,方便后续操作
  4.  
  5. val genres_L = filmGenres.map(=> (x, 1))  
  6. val genres_num = genres_L.groupBy(0).sum(1)  //对所属类型计数得到结果
  7. val genres_num_Sort = genres_num.sortPartition(1, Order.DESCENDING)  //降序排序
  8. genres_num_Sort.writeAsCsv("file:///home/hadoop/Output/genres_num_Sort.csv")  //将结果写至本地文件 
scala

(6) 每种类型电影的平均评分

  1.    /*统计每种类型电影的平均评分*/
  2. val filmGenre_num = inputHDFS.map(=> x.genre.split('/').size)  //记录每部电影的所属类型有多少个
  3. val filmGenre_num_L =  filmGenre_num.collect().toList  //所属类型数目由DataSet转List,方便后续操作
  4.  
  5. val filmRating = inputHDFS.map(=> x.rating)  //记录每部电影的评分
  6. val filmRating_L = filmRating.collect().toList  //评分由DataSet转List,方便后续操作
  7.  
  8. val genresRatings = new ArrayBuffer[(String, Double)]()  //先建立数组用以记录展开后每个类型的评分
  9. var idx = 0  //指针
  10. for(<- 0 until filmRating_L.size){  //对每一部电影,按照所属类型的个数将评分分配若干次至展开的类型数组
  11.   val the_genre_num = filmGenre_num_L(i)  //提取第i部电影的类型数目
  12.   val the_rating = filmRating_L(i)  //提取第i部电影的评分
  13.   for(<- 0 until the_genre_num){  //循环分配评分给展开的类型数组
  14. genresRatings.append((filmGenres_L(idx + j), the_rating))
  15.   }
  16.   idx = idx + the_genre_num  //更新指针
  17. }
  18.  
  19. val genresRatings_Dataset = env.fromCollection(genresRatings)  //将得到的数组转DataSet
  20. val genreRating_sum = genresRatings_Dataset.groupBy(0).sum(1)  //分组计算每种类型电影的总评分
  21.  
  22. val genres_ratingAverage = genreRating_sum.join(genres_num).where(0).equalTo(0){  //将每种类型的总评分表和总电影数表合并
  23.   (left, right) => (left._1, left._2 / right._2)  //用总评分/总电影数计算平均评分
  24. }
  25. val genres_ratingAverage_Sort = genres_ratingAverage.sortPartition(1, Order.DESCENDING)  //降序排序
  26. genres_ratingAverage_Sort.writeAsCsv("file:///home/hadoop/Output/genres_ratingAverage_Sort.csv") //将结果写至本地文件 
scala

汇总以上代码得到movieAnalyze.scala:

  1. import org.apache.flink.api.common.operators.Order
  2. import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
  3.  
  4. import scala.collection.mutable.ArrayBuffer
  5.  
  6. object movieAnalyze {
  7.   def main(args: Array[String]): Unit = {
  8.     val env = ExecutionEnvironment.getExecutionEnvironment //建立执行环境
  9.     env.setParallelism(1)
  10.  
  11.  
  12.     /*读取HDFS上的数据集*/
  13.     val inputHDFS: DataSet[movie] = env.readCsvFile[movie]("hdfs://localhost:9000/user/hadoop/douban_2.csv", ignoreFirstLine = true)
  14.    
  15.    /*统计每个年份的电影数*/
  16.     val yearL = inputHDFS.map(=> (x.year, 1) ) //抽取每条电影记录的年份
  17.     val yearNum = yearL.groupBy(0).sum(1) //按年份分组并计数
  18.     yearNum.writeAsCsv("file:///home/hadoop/Output/yearNum.csv") //将结果写至本地文件
  19.  
  20.     /*统计每个评分区间的电影数*/
  21.     val ratingNum_L = new ArrayBuffer[(String, Int)]()  //先建立数组用于保存结果
  22.     for(<- 1 to 9){   //对1-9分区间的电影数目进行循环统计
  23.       val tmp = inputHDFS.filter(_.rating.toString.startsWith(i.toString + '.')) //筛选出[i, i+1)分区间的电影
  24.       ratingNum_L.append((i.toString, tmp.collect().size)) //将本区间段的电影数目保存到数组
  25.     }
  26.     val ratingNum = env.fromCollection(ratingNum_L)  //将数组类型转换成DataSet类型
  27.     ratingNum.writeAsCsv("file:///home/hadoop/Output/ratingNum.csv")  //将结果写至本地文件
  28.  
  29.     /*统计每个制片地区的电影数*/
  30.     val filmCountry_L = inputHDFS.map(=> (x.country.split('/')(0), 1)) //将首要制片地区提取出来
  31.     val filmCountryNum = filmCountry_L.groupBy(0).sum(1)  //对制片地区分组计数
  32.     val filmCountryNum_Sort = filmCountryNum.sortPartition(1, Order.DESCENDING)  //对统计结果降序排序
  33.     filmCountryNum_Sort.writeAsCsv("file:///home/hadoop/Output/filmCountryNum_Sort.csv") //将结果写至本地文件
  34.  
  35.     /*统计每个制片地区电影的平均评分*/
  36.     val filmCountry_rating_c = inputHDFS.map(=> (x.country.split('/')(0), x.rating)) //提取制片地区和评分
  37.     val filmCountry_rating_sum = filmCountry_rating_c.groupBy(0).sum(1) //统计制片地区的电影总评分
  38.     val filmCountry_ratingAverage = filmCountry_rating_sum.join(filmCountryNum).where(0).equalTo(0){ //将制片地区的总评分表和总电影数表合并
  39.       (left, right) => (left._1, left._2 / right._2)  //用总评分/总电影数计算平均得分
  40.     }
  41.     val filmCountry_ratingAverage_Sort = filmCountry_ratingAverage.sortPartition(1, Order.DESCENDING)  //降序排序
  42.     filmCountry_ratingAverage_Sort.writeAsCsv("file:///home/hadoop/Output/filmCountry_ratingAverage_Sort.csv") //将结果写至本地文件
  43.  
  44.  
  45.     /*统计每种类型的电影数*/
  46.     val filmGenres = inputHDFS.flatMap(=> x.genre.split('/'))   //将每部电影的所属的多个类型展开
  47.     val filmGenres_L = filmGenres.collect().toList  //所属类型由DataSet转List,方便后续操作
  48.  
  49.     val genres_L = filmGenres.map(=> (x, 1))  
  50.     val genres_num = genres_L.groupBy(0).sum(1)  //对所属类型计数得到结果
  51.     val genres_num_Sort = genres_num.sortPartition(1, Order.DESCENDING)  //降序排序
  52.     genres_num_Sort.writeAsCsv("file:///home/hadoop/Output/genres_num_Sort.csv")  //将结果写至本地文件 
  53.  
  54.  
  55.     /*统计每种类型电影的平均评分*/
  56.     val filmGenre_num = inputHDFS.map(=> x.genre.split('/').size)  //记录每部电影的所属类型有多少个
  57.     val filmGenre_num_L =  filmGenre_num.collect().toList  //所属类型数目由DataSet转List,方便后续操作
  58.  
  59.     val filmRating = inputHDFS.map(=> x.rating)  //记录每部电影的评分
  60.     val filmRating_L = filmRating.collect().toList  //评分由DataSet转List,方便后续操作
  61.  
  62.     val genresRatings = new ArrayBuffer[(String, Double)]()  //先建立数组用以记录展开后每个类型的评分
  63.     var idx = 0  //指针
  64.     for(<- 0 until filmRating_L.size){  //对每一部电影,按照所属类型的个数将评分分配若干次至展开的类型数组
  65.       val the_genre_num = filmGenre_num_L(i)  //提取第i部电影的类型数目
  66.       val the_rating = filmRating_L(i)  //提取第i部电影的评分
  67.       for(<- 0 until the_genre_num){  //循环分配评分给展开的类型数组
  68.         genresRatings.append((filmGenres_L(idx + j), the_rating))
  69.       }
  70.       idx = idx + the_genre_num  //更新指针
  71.     }
  72.  
  73.     val genresRatings_Dataset = env.fromCollection(genresRatings)  //将得到的数组转DataSet
  74.     val genreRating_sum = genresRatings_Dataset.groupBy(0).sum(1)  //分组计算每种类型电影的总评分
  75.  
  76.     val genres_ratingAverage = genreRating_sum.join(genres_num).where(0).equalTo(0){  //将每种类型的总评分表和总电影数表合并
  77.       (left, right) => (left._1, left._2 / right._2)  //用总评分/总电影数计算平均评分
  78.     }
  79.     val genres_ratingAverage_Sort = genres_ratingAverage.sortPartition(1, Order.DESCENDING)  //降序排序
  80.     genres_ratingAverage_Sort.writeAsCsv("file:///home/hadoop/Output/genres_ratingAverage_Sort.csv") //将结果写至本地文件 
  81.     
  82.     env.execute("Write Results to local files!") //执行写入操作
  83.  
  84.   }
  85. }
  86.  
  87.  
  88. case class movie(id:Long, name:String, year:Int, rating:Double, ratingsum:Long, genre:String, country:String)  //建立movie类来接收来自文件的记录
scala

运行以上代码可得到实验结果,存放在/home/Hadoop/Output目录下:

图3.7 查看统计结果目录

四、 实验结果可视化:

本实验的可视化工具采用plotly,它支持绘制可交互的图表,并完美兼容Jupyter Notebook(查看Jupyter Notebook可以体验图表的交互效果)。

1 每个年份的电影数

先导入用到的两个包,然后读取之前生成的结果,由于2019年的数据不完整,所以暂时不考虑,调用plotly的接口绘制图表。

  1. import pandas as pd
  2. import plotly.express as px
  3.  
  4. yearNum = pd.read_csv("../Output/yearNum.csv", header=None)
  5. yearNum.columns = ["year", "Num"]
  6. yearNum = yearNum[:-1]         #2019年只有前两月多的数据,不考虑
  7.  
  8. fig1 = px.bar(yearNum,
  9.               x = "year",
  10.               y = "Num", 
  11.               title = "1900至2018年的电影数",
  12.               color='Num'
  13.              )
  14. fig1.show()
Python

图4.1 1900至2018年地电影数目变化的条形图

从该数据集中可以观察到,自上世纪初起,生产的电影数基本呈现逐年增加的趋势,到本世纪初,达到一个高峰并且呈现稳定趋势。

2 每个评分区间的电影数

  1. ratingNum = pd.read_csv("../Output/ratingNum.csv", header=None)
  2. ratingNum.columns = ["rating_from", "Num"]
  3.  
  4. fig2 = px.pie(ratingNum,
  5.               values = "Num",
  6.               names = "rating_from", 
  7.               title = "各个评分区间的电影数"
  8.              )
  9. fig2.show()
Python

图4.2 各个评分区间电影数目占比的饼图

图例中的“1”表示[1,2)评分区间段,其余类似。从饼图中可以看出电影评分占比7分区间最多,其次是6分段,再次是8分和5分段。高于9分的佳作和低于5分的烂片占比均较少。

3 每个制片地区的电影数和平均评分

先把两张表合并,然后绘制电影数目前25位散点图。

  1. filmCountryNum_Sort = pd.read_csv("../Output/filmCountryNum_Sort.csv", header=None)
  2. filmCountryNum_Sort.columns = ["area", "Num"]
  3.  
  4. filmCountry_ratingAverage_Sort = pd.read_csv("../Output/filmCountry_ratingAverage_Sort.csv", header=None)
  5. filmCountry_ratingAverage_Sort.columns = ["area", "Average_Rating"]
  6.  
  7. countryNum_avergeRating = pd.merge(filmCountryNum_Sort, filmCountry_ratingAverage_Sort, on='area') #合并两表
  8.  
  9. fig3 = px.scatter(countryNum_avergeRating[:25], x="Num", y="Average_Rating",  #展示电影数前25位的制片地区的电影数目和均分的散点图
  10.                  color="area",
  11.                  hover_name="area", 
  12.                  title="各制片地区电影数和平均评分关系")
  13. fig3.show()
Python

图4.3 合并制片地区的电影数目和平均评分表

图4.4 电影数前25位的制片地区的电影数和平均评分的关系的散点图

为了增加展示维度,再绘制电影数前15位的情况的气泡图,以地区作为x轴,电影数量“Num”表示气泡大小。

  1. fig4 = px.scatter(countryNum_avergeRating[:15], x="area", y="Average_Rating", 
  2.                   size="Num", color = "area",
  3.                   hover_name="area", title="各制片地区电影数和平均评分")
  4. fig4.show()
Python

图4.5 电影数前15位的制片地区的电影数和平均评分情况的气泡图

从以上图表中可以看出,电影数目是美国、日本、中国大陆、中国香港、法国居于Top 5,从均分上来看,日本和法国的电影评价较好,较受豆瓣观众的喜爱,国产的电影总体均分较低。

4 每种类型的电影数

  1. genres_num_Sort = pd.read_csv("../Output/genres_num_Sort.csv", header=None)
  2. genres_num_Sort.columns = ["genre", "Num"]
  3.  
  4. fig5 = px.bar(genres_num_Sort,
  5.               x = "genre",
  6.               y = "Num", 
  7.               title = "各类型电影数",
  8.               color='Num'
  9.              )
  10. fig5.show()
Python

图4.6 各类型电影数目的条形图

从图上可以看出,电影数量位居前10的电影类型分别是剧情、喜剧、爱情、动作、惊悚、犯罪、恐怖、冒险、悬疑、动画。其中,“剧情”类型标签占比相当大。

5 每种类型电影的平均评分和电影数

  1. genres_ratingAverage_Sort = pd.read_csv("../Output/genres_ratingAverage_Sort.csv", header=None)
  2. genres_ratingAverage_Sort.columns = ["genre", "Average_Rating"]
  3.  
  4. genreNum_avergeRating = pd.merge(genres_num_Sort, genres_ratingAverage_Sort, on='genre') #两表合并
  5.  
  6. fig6 = px.scatter(genreNum_avergeRating, x="Num", y="Average_Rating",
  7.                  color="genre",
  8.                  hover_name="genre",
  9.                  title = "各类型电影数和平均评分关系"
  10.                  )
  11. fig6.show()
Python

图4.7 各类型电影数和平均评分关系的散点图

可以看出电影数目最多的几个类型均分都在7分左右,而数量少的类型电影的均分分布有高有低。

为了从不同的角度加以展示,再绘制以类型作为x轴,电影数量“Num”表示气泡大小的气泡图。

  1. fig7 = px.scatter(genreNum_avergeRating, x="genre", y="Average_Rating",
  2.                  size="Num", color = "genre",
  3.                  hover_name="genre", 
  4.                  title = "各类型电影数和平均评分")
  5. fig7.show()
Python

图4.8 各类型电影数和平均评分的气泡图

从图中可以更直观地看出类型的均分情况。

提示:想要更完整地体验可视化效果,请自行查阅Jupyter Notebook。