基于Flink的奥运会数据分析

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学信息学院2020级研究生 杨志鹏
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
相关教材:林子雨、陶继平编著《Flink编程基础(Scala版)》(官网
相关案例:基于Scala语言的Flink数据处理分析案例集锦
本实验针对奥运会数据进行分析,使用scala语言进行flink编程来处理数据,最后结果使用python进行可视化

一、实验环境

(1)Windows10
(2)Java:1.8
(3)Scala:2.11.12
(4)Python:3.x
(5)Python包:Matplotlib、Pandas
(6)所需工具:Maven、IntelloJ IDEA、Jupyter Notebook

二、项目创建说明

1.安装maven,在终端输入如下命令创建scala版flink项目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.13.0 -DgroupId=org.apache.flink.quickstart -DartifactId=flink-scala-project -Dversion=0.1 -Dpackage=org.apache.flink.quickstart -DinteractiveMode=false 

2.打开IntelliJ IDEA配置项目的环境

(1)添加相关依赖包

打开第1步所创建的pom.xml文件

选择open as project,然后在pom,xml中添加如下内容(table API和csv格式相关的依赖包)并更新

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
   <version>1.13.0</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-planner-blink_2.11</artifactId>
   <version>1.13.0</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-scala_2.11</artifactId>
   <version>1.13.0</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-csv</artifactId>
   <version>1.13.0</version>
</dependency>

(2)设置好与pom.xml中相对应的java版本和scala版本

点击file,然后点击project structure

在poject SDK中选择JDK1.8

在module SDK中选择JDK1.8

在SDKs中选择JDK1.8

在global libraries中选择scala-sdk-2.11.12

(3)创建scala文件,并配置运行环境

新建一个名为”Homework”的scala文件,在这个scala文件内编写代码,最后运行代码,注意在运行代码时需要进行下面所示的一些运行配置

在右上角中点击edit configurations

在modify options中勾选include dependencies with “provided” scope并保存,接下来就可以直接运行所写的scala代码了

三、数据集说明

本次作业所使用的数据集为olympic-athletes.csv文件(从百度网盘下载数据集,提取码:ziyu),数据集中包含了2000年到2012年一部分(一共153名)选手的信息。
文件内每行的内容及类型如下:
第1列为运动员名字,格式为String;
第2列为国家,格式为String
第3列为年份,格式为Int
第4列为运动项目,格式为String
第5列为获得的金牌数,格式为Int
第6列为获得的银牌数,格式为Int
第7列为获得的铜牌数,格式为Int
第8列为总共获得的奖牌数,格式为Int
部分数据如下图所示

四、步骤概述

首先在IDEA中使用scala利用flink对数据集进行处理,统计每个国家的总人数,每个运动项目的总人数,每个国家所获得的的总奖牌数,并分别输出数据到3个csv文件people_per_country.csv,people_per_Game.csv,total_per_country.csv。然后在jupyter notebook中使用python读取这3个csv文件并进行可视化。

五、代码详解

实验完整工程文件从百度云盘获取(提取码:ziyu),下面分块介绍具体代码作用。

1、scala代码数据处理(位于文件夹内的flink-scala-project\src\main\scala\org\apache\flink\quickstart\Homework.scala中)

导入相关的包

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.util.Collector
import java.util.Calendar

在数据处理过程中使用了flink提供的datastream API和tableAPI,因为需要进行批处理但是使用了datastream API,因此需要使用窗口window的概念将流处理转化为批处理(如果不使用window,则每接收到一个数据就需要进行处理并得到一个输出;当使用了window,则可以对这个窗口内的所有数据进行处理后再得到一个输出)。Window可以使用时间窗口和计数窗口,这里采用时间窗口进行数据处理,所用的时间又分为processing时间和event时间,这里采用了event时间进行处理。当使用event时间时,还需要用到watermark的概念。因此创建steam环境和table环境的代码如下(在steam环境中设置event时间,并每1秒发送一个watermark):

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(1000L)
val tEnv = StreamTableEnvironment.create(env)

接下来使用table API来读取olympic-athletes.csv文件,并设置每列数据的名称以便于后续进行数据处理,读取文件得到的表数据取名为”my_table”,注意运行代码时文件路径需要进行修改:

val schema = new Schema()
  .field("player", DataTypes.STRING())
  .field("Country", DataTypes.STRING())
  .field("Year", DataTypes.INT())
  .field("Game", DataTypes.STRING())
  .field("Gold", DataTypes.INT())
  .field("Silver", DataTypes.INT())
  .field("Bronze", DataTypes.INT())
  .field("Total", DataTypes.INT())
tEnv.connect(new FileSystem().path("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\olympic-athletes.csv"))
  .withFormat(new Csv().deriveSchema())
  .withSchema(schema)
  .createTemporaryTable("my_table")
val table = tEnv.from("my_table")

在得到表数据table后,需要将table转换为DataStream类型,然后就可以使用flink提供的datastream API进行数据处理,为了方便处理,首先定义一个case class

case class Athlete(player:String, Country:String, Years:Int, Game:String, Gold:Int, Silver:Int, Bronze:Int, Total:Int)

然后将table数据转换为DataStream类型数据

val athletes = tEnv.toAppendStream[Athlete](table)

上面得到的athletes的类型为DataStream[Athlete],由于后面时间窗口使用的event时间需要数据源提供时间戳,为了方便再建立一个时间戳属性的case class:

case class AthleteWithTime(player:String, Country:String, Years:Int, Game:String, Gold:Int, Silver:Int, Bronze:Int, Total:Int, Time:Long)

接下来是构建一个带有时间戳的数据流:

val athletesWithTime= athletes.map(player=>new AthleteWithTime(player.player, player.Country,player.Years,player.Game,player.Gold,player.Silver,player.Bronze,player.Total,Calendar.getInstance.getTimeInMillis)).assignTimestampsAndWatermarks(new TimeAssigner)

在上面的代码中,最后给带有时间戳的数据流定义了(定义在TimeAssigner里)如何从数据流中提取时间戳,以及watermark的生成方式, TimeAssigner的具体代码如下:

class TimeAssigner
  extends BoundedOutOfOrdernessTimestampExtractor[AthleteWithTime](Time.seconds(5)) {
  override def extractTimestamp(r: AthleteWithTime): Long = r.Time
}

上面代码中使用了flink提供的BoundedOutOfOrdernessTimestampExtractor,这可以使得乱序的数据和稍微延迟(5秒)的数据得到处理。在得到数据源后,就可以对数据进行处理了,首先统计每个国家的总人数:

val peoplePerCountry = athletesWithTime
  .map(player => (player.Country,1))
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1)

上面代码中使用了时间窗口的概念,将流处理转为批处理,时间窗口的大小决定了所能处理的批的大小,为了将整个数据集都得到批处理,这里将时间窗口的大小设置得足够大(5秒)。然后是统计每个运动项目的总人数,这里采用reduce函数实现sum的功能(reduce函数的输入为两个相同类型的数据,输出为一个相同类型的数据,因此利用reduce可以实现一些较简单的数据滚动处理比如sum):

val peoplePerGame = athletesWithTime
  .map(player => (player.Game,1))
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .reduce((a,b)=>(a._1, a._2 + b._2))

最后是统计每个国家获得的总奖牌数,这里采用aggregate来实现sum的功能(aggregate函数是reduce的推广,它的输出类型可以和输入类型不相同,因此可以利用aggregate实现比较复杂的数据滚动处理):

val totalPerCountry = athletesWithTime
  .map(player => (player.Country, player.Total))
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .aggregate(new MySum)

上面代码中的MySum首先需要定义一个存放中间处理结果的accumulator,然后再定义由输入数据得到accumulator的方式,最后从最终的accumulator提取出完整的结果,完整定义如下所示(由于只有session时间窗口才需要实现merge函数,而这里使用的窗口是tumbling时间窗口,所以merge函数只要返回null即可):

class MySum
  extends AggregateFunction[(String, Int), (String, Int), (String, Int)] {
  override def createAccumulator() = {
    ("", 0)
  }
  override def add(in: (String, Int), acc: (String, Int)) = {
    (in._1, in._2 + acc._2)
  }
  override def getResult(acc: (String, Int)) = {
    (acc._1, acc._2)
  }
  override def merge(acc1: (String, Int), acc2: (String, Int)) = {
    null
  }
}

在得到处理完的流数据后,将这些流数据输出到csv格式中去(注意运行代码时文件路径需要进行修改),由于flink是并行处理,所以最后输出到一个csv文件时需要设置并行度为1,这里注意因为在作业文件夹中已经存在这三个csv文件,因此重新运行时需要先删除原来的csv文件,再运行代码得到这三个csv文件:

peoplePerCountry.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\people_per_country.csv").setParallelism(1)
peoplePerGame.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\people_per_Game.csv").setParallelism(1)
totalPerCountry.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\total_per_country.csv").setParallelism(1)
env.execute()

这里将上面所述的完整scala代码进行展示:

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.util.Collector

import java.util.Calendar

object Homework {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(1000L)
    val tEnv = StreamTableEnvironment.create(env)
    val schema = new Schema()
      .field("player", DataTypes.STRING())
      .field("Country", DataTypes.STRING())
      .field("Year", DataTypes.INT())
      .field("Game", DataTypes.STRING())
      .field("Gold", DataTypes.INT())
      .field("Silver", DataTypes.INT())
      .field("Bronze", DataTypes.INT())
      .field("Total", DataTypes.INT())
    tEnv.connect(new FileSystem().path("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\olympic-athletes.csv"))
      .withFormat(new Csv().deriveSchema())
      .withSchema(schema)
      .createTemporaryTable("my_table")

    val table = tEnv.from("my_table")
    val athletes = tEnv.toAppendStream[Athlete](table)
    val athletesWithTime= athletes.map(player=>new AthleteWithTime(player.player, player.Country,player.Years,
      player.Game,player.Gold,player.Silver,player.Bronze,player.Total,Calendar.getInstance.getTimeInMillis))
      .assignTimestampsAndWatermarks(new TimeAssigner)

    val peoplePerCountry = athletesWithTime
      .map(player => (player.Country,1))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    val peoplePerGame = athletesWithTime
      .map(player => (player.Game,1))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .reduce((a,b)=>(a._1, a._2 + b._2))

    val totalPerCountry = athletesWithTime
      .map(player => (player.Country, player.Total))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .aggregate(new MySum)

    peoplePerCountry.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\people_per_country.csv").setParallelism(1)
    peoplePerGame.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\people_per_Game.csv").setParallelism(1)
    totalPerCountry.writeAsCsv("C:\\Users\\60549\\Desktop\\杨志鹏-31520201153900\\total_per_country.csv").setParallelism(1)
    env.execute()

  }
}

case class Athlete(player:String, Country:String, Years:Int, Game:String, Gold:Int, Silver:Int, Bronze:Int, Total:Int)
case class AthleteWithTime(player:String, Country:String, Years:Int, Game:String, Gold:Int, Silver:Int, Bronze:Int, Total:Int, Time:Long)

class TimeAssigner
  extends BoundedOutOfOrdernessTimestampExtractor[AthleteWithTime](Time.seconds(5)) {

  override def extractTimestamp(r: AthleteWithTime): Long = r.Time

}

class MySum
  extends AggregateFunction[(String, Int), (String, Int), (String, Int)] {
  override def createAccumulator() = {
    ("", 0)
  }
  override def add(in: (String, Int), acc: (String, Int)) = {
    (in._1, in._2 + acc._2)
  }
  override def getResult(acc: (String, Int)) = {
    (acc._1, acc._2)
  }
  override def merge(acc1: (String, Int), acc2: (String, Int)) = {
    null
  }
}

2、python代码可视化(位于文件夹的” python代码可视化.ipynb”中)

在处理完数据并得到相应的csv文件后,可以打开jupyter notebook,写入python代码读取相关的csv文件并进行可视化。在终端输入jupyter-lab打开jupyter notebook。
首先导入相关的包,并读取csv文件:

import pandas as pd
import matplotlib.pyplot as plt
#当python文件和csv文件路径不同时要修改运行路径
people_per_country = pd.read_csv("people_per_country.csv", header=None, names=["country", "people"], index_col=0)
people_per_game = pd.read_csv("people_per_game.csv", header=None, names=["game", "people"], index_col=0)
total_per_country = pd.read_csv("total_per_country.csv", header=None, names=["country", "total"], index_col=0)

(1)画出每个国家总人数条形图和饼图:

people_per_country.plot.bar()

people_per_country.plot.pie(subplots=True, figsize=(15,15),rotatelabels=True,labeldistance=0.8,radius=0.9,fontsize=11)

(2)画出每个运动项目参加总人数的条形图和饼图:

people_per_game.plot.bar()

people_per_game.plot.pie(subplots=True, figsize=(15,15),rotatelabels=True,labeldistance=0.8,radius=0.5,fontsize=20)

(3)画出每个国家获得奖牌总数的条形图和饼图:

total_per_country.plot.bar()

total_per_country.plot.pie(subplots=True, figsize=(17,17),rotatelabels=True,labeldistance=0.8,radius=0.9,fontsize=10)