基于Flink的地震数据处理

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
本案例由厦门大学计算机系2020级研究生龚国文同学制作,这里对他表示衷心的感谢!
相关教材:林子雨、陶继平编著《Flink编程基础(Scala版)》(官网
相关案例:基于Scala语言的Flink数据处理分析案例集锦
本实验针对全球重大地震数据进行分析,使用scala语言进行flink编程来处理数据,最后结果使用python进行可视化

本实验涉及到的所有数据集和代码,可以从百度网盘下载。提取码:ziyu。

一、 实验环境搭建

1.操作系统:Windows10
2.scala:2.12.12

3.flink:1.11.2
4.python:3.8.8

5.运行环境
- 数据处理:IDEA => flink编程;
- 可视化:命令行运行.py文件

6.安装可视化需要的包
(1)pip install matplotlib
(2)下面是可视化第三部分额外用到的包:
pip install geos;
在下方链接中下载pyproj:
https://www.lfd.uci.edu/~gohlke/pythonlibs/#pyproj
找到对应python版本的下载:

在下方链接中下载basemap:
https://www.lfd.uci.edu/~gohlke/pythonlibs/#basemap
找到对应python版本的下载:

进入下载保存的pyproj和basemap的路径,运行“pip install 包名” 命令进行安装:

二、数据准备

参考往年作业:基于Spark的地震数据处理与分析

数据来自和鲸社区的1965-2016全球重大地震数据,文件名为earthquake.csv包括23412条地震数据,数据中各个字段为:Date, Time, Latitude, Longitude, Type, Depth, Magnitude,分别表示
日期,时间,维度,精度,类型,深度,地震等级。数据集中有三行数据有问题,参照往年作业进行了修改:

三、数据处理

1.IDEA中创建项目

参考教程 第4章 Flink环境搭建和使用方法 中的4.3.4节:使用IntelliJ IDEA开发Flink应用程序,链接:
http://dblab.xmu.edu.cn/wp-content/uploads/2021/01/Chapter4-%E5%8E%A6%E9%97%A8%E5%A4%A7%E5%AD%A6-%E6%9E%97%E5%AD%90%E9%9B%A8-Flink%E7%BC%96%E7%A8%8B%E5%9F%BA%E7%A1%80Scala%E7%89%88-%E7%AC%AC4%E7%AB%A0-Flink%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA%E5%92%8C%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95%EF%BC%882021%E5%B9%B43%E6%9C%88%E7%89%88%E6%9C%AC%EF%BC%89.pdf

2.代码

pom.xml文件和源码已和本实验报告一起打包发送。
代码如下(代码每个部分的含义已在注释中给出):

package cn.edu.xmu.dblab
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala._

object EarthQuakes {
  def main(args: Array[String]): Unit = {
    //第1步:建立执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //第2步:指定数据文件路径
    val filePath="file:///earthquake.csv"
    //第3步:读取数据
    val csv = env.readCsvFile[EarthQuakesLog](filePath,ignoreFirstLine = true)
    //第3步:对数据集指定转换操作
    //每条数据映射到(年份, 1)
    val yearMap : DataSet[(String,Int)] = csv.map(x=>x.Date).map(yearSplit(_))
    //对每年地震数量统计
    val yearCount = yearMap.groupBy(0).sum(1)

    //每条数据映射到(月份, 年份, 1)
    val monthMap : DataSet[(String,String,Int)] = csv.map(x=>x.Date).map(monthSplit(_))
    //对分组(月份, 年份)统计数量
    val monthCount = monthMap.groupBy(0,1).sum(2)

    //取出(纬度, 经度, 地震等级)
    val location : DataSet[(Double,Double,Double)] = csv.map(x=>(x.Latitude,x.Longitude,x.Magnitude))

    // 第4步:输出结果
    yearCount.writeAsCsv("file:///yearCount")
    monthCount.writeAsCsv("file:///monthCount")
    location.writeAsCsv("file:///location")
    env.execute()
  }
  case class
  EarthQuakesLog(Date:String, Time:String, Latitude:Double, Longitude:Double,
                 Type:String, Depth:Double, Magnitude:Double)

  def yearSplit(x:String) : (String,Int) = {
    val strs = x.split("/")
    return (strs(2),1)
  }
  def monthSplit(x:String) : (String,String,Int) = {
    val strs = x.split("/")
    return (strs(0),strs(2),1)
  }

}

实际的数据文件earthquake.csv的路径以及分析之后的输出路径可修改为需要的路径。
分析结果写入本地之后,实际上将结果保存为了多个文件:

每个文件保存了部分结果:

四、可视化

使用python进行可视化,.py文件已和本实验报告一起打包发送
使用时需要修改需要读取的数据文件实际的保存路径:

1.全球每年发生重大地震的次数

可视化代码:

import os
import matplotlib.pyplot as plt

def file_name(path):
    for root,dirs,files in os.walk(path):
        pass
    return files

def readData(file_path,yearcounts):
    f=open(file_path,'r')
    for term in f:
        #print(term)
        term=term.strip('\n')
        temp=term.split(',')
        yearcounts.append((int(temp[0]),int(temp[1])))
    f.close()

path="D:\\yearCount"
files=file_name(path)

years=[]
counts=[]
yearcounts=[]

#读取文件夹下所有文件
for file in files:
    file_path = path + "\\" + file
    #对每个文件, 读出其中数据
    readData(file_path,yearcounts)
#按年份进行排序 (升序)
yearcounts = sorted(yearcounts)

#画图
for year,count in yearcounts:
    years.append(year)
    counts.append(count)

plt.title('Earthquakes Per Year')
plt.xlabel('Year')
plt.ylabel('Count')
plt.plot(years,counts)
plt.show()

运行:

结果:

可以看到从1965年到2016每年地震次数有上升的趋势

2.全球不同年份每月发生重大地震的次数

可视化代码:

import os
import matplotlib.pyplot as plt

def file_name(path):
    for root,dirs,files in os.walk(path):
        pass
    return files

def readData(file_path,x,y):
    f=open(file_path,'r')
    for term in f:
        term=term.strip('\n')
        temp=term.split(',')
        x.append(int(temp[0]))
        y.append(int(temp[2]))
    f.close()

path="D:\\monthCount"
files=file_name(path)

month=[]
count=[]

#读取文件夹下所有文件
for file in files:
    file_path = path + "\\" + file
    #对每个文件, 读出其中数据
    readData(file_path,month,count)

#画图
plt.title('Earthquakes Per Month')
plt.xlabel('Month')
plt.ylabel('Count')
plt.scatter(month, count, alpha=0.2)
plt.show()

运行:

结果:

从图中可以看到每年每个月的地震次数在[10,60]这个区间

3.全球经常发生重大地震的地带

可视化代码:

import os
import matplotlib.pyplot as plt
import mpl_toolkits.basemap

def file_name(path):
    for root,dirs,files in os.walk(path):
        pass
    return files

def readData(file_path,latitudes,longitudes,magnitudes):
    f=open(file_path,'r')
    for term in f:
        #print(term)
        term=term.strip('\n')
        temp=term.split(',')
        latitudes.append(float(temp[0]))
        longitudes.append(float(temp[1]))
        magnitudes.append(float(temp[2]))
    f.close()

path="D:\\location"
files=file_name(path)

latitudes=[]
longitudes=[]
magnitudes=[]

#读取文件夹下所有文件
for file in files:
    file_path = path + "\\" + file
    #对每个文件, 读出其中数据
    readData(file_path,latitudes,longitudes,magnitudes)

#世界地图
basemap = mpl_toolkits.basemap.Basemap()
basemap.drawcoastlines()
#画图
plt.title('Location')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.scatter(longitudes, latitudes, alpha=0.1, s=magnitudes)
plt.show()

运行:

结果: