基于Spark的地震数据处理与分析

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学信息学院计算机科学系2019级研究生 胡冰
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
相关教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》(访问教材官网
相关案例:基于Python语言的Spark数据处理分析案例集锦(PySpark)

本案例针对全球重大地震数据进行分析,采用Python为编程语言,采用Hadoop存储数据,采用Spark对数据进行处理分析,并对结果进行数据可视化。

一、 实验环境搭建

(1)Linux:Ubuntu 16.04
(2)Hadoop:3.1.3(安装教程
(3)Spark:2.4.0(安装教程
(4)Python:Anaconda3(Anaconda及Jupyter Notebook安装教程
(5)运行环境:Jupyter Notebook
(6)安装可视化所需软件包,在Linux终端下输入以下命令即可:

sudo apt-get install python3-matplotlib
sudo apt-get install python3-pandas
sudo apt-get install python3-mpltoolkits.basemap 

二、数据准备

数据来自和鲸社区的1965-2016全球重大地震数据,文件名为earthquake.csv包括23412条地震数据,其中有很多属性大部分缺失且本次实验用不到,将其手动删除,只保留Date, Time, Latitude, Longitude, Type, Depth, Magnitude这七个属性。
数据集下载百度网盘地址为:https://pan.baidu.com/s/1tI12CzrqXB8SrlgWokFMyg 提取码: 1ieo

由于数据集中存在3行有问题的数据,因此,需要手动进行如下修改:

在Linux终端输入以下命令启动Hadoop,注意全程保持该终端开启。

cd /usr/local/hadoop
./sbin/start-dfs.sh

使用Jupyter Notebook建立一个代码文件preprocessing.py,进行数据的预处理,内容如下:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, when, split, posexplode

conf = SparkConf().setMaster("local").setAppName("preprocessing")
sc = SparkContext(conf = conf)
sc.setLogLevel('WARN') # 减少不必要的log输出
spark = SparkSession.builder \
                    .config(conf = SparkConf()) \
                    .getOrCreate()

rawFile = 'file:///home/hadoop/HBING/earthquake.csv'
rawData = spark.read.format('csv') \
                    .options(header='true', inferschema='true') \
                    .load(rawFile)
rawData.printSchema() # 查看数据结构
print('total count: %d' % rawData.count()) # 打印总行数
# 查看每列的非空行数
rawData.agg(*[count(c).alias(c) for c in rawData.columns]).show()

# 提取数据
newData = rawData.select('Date', 'Time', 'Latitude',
                         'Longitude', 'Type', 'Depth',
                         'Magnitude', 'Magnitude Type',
                         'ID', 'Source','Location Source', 
                         'Magnitude Source', 'Status')
# 拆分'Date'到'Month',' Day', 'Year'
newData = newData.withColumn('Split Date', split(newData.Date, '/'))
attrs = sc.parallelize(['Month',' Day', 'Year']).zipWithIndex().collect()
for name, index in attrs:
    newColumn = newData['Split Date'].getItem(index)
    newData = newData.withColumn(name, newColumn)
newData = newData.drop('Split Date')
newData.show(5)

# 上传文件至HDFS
newData.write.csv('earthquakeData.csv', header='true')

在上述代码中,尽管数据集定义了很多数据特征,但是不少特征的缺失值较多,其数据过于稀疏,不利于进行分析。因此,在上述代码中,仅选择数据稠密的特征进行分析,并对Date列进行拆分。
上述代码运行结果如下:


三、数据分析及可视化

1.全部代码

使用Jupyter Notebook编写数据分析代码文件analyze.py,内容如下:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml import Pipeline

import pandas as pd
import matplotlib.pyplot as plt
import mpl_toolkits.basemap

conf = SparkConf().setMaster("local").setAppName("analyze")
sc = SparkContext(conf = conf)
sc.setLogLevel('WARN') # 减少不必要的log输出
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

dataFile = 'earthquakeData.csv'

rawData = spark.read.format('csv') \
                    .options(header='true', inferschema='true') \
                    .load(dataFile) 

def earthquakesPerYear():
    yearDF = rawData.select('Year').dropna().groupBy(rawData['Year']).count()
    yearDF = yearDF.sort(yearDF['Year'].asc())
    yearPd = yearDF.toPandas()
    # 数据可视化
    plt.bar(yearPd['Year'], yearPd['count'], color='skyblue')
    plt.title('earthquakes per year')
    plt.xlabel('Year')
    plt.ylabel('count')
    plt.show()

def earthquakesPerMonth():
    data = rawData.select('Year', 'Month').dropna()
    yearDF = data.groupBy(['Year', 'Month']).count()
    yearPd = yearDF.sort(yearDF['Year'].asc(), yearDF['Month'].asc()).toPandas()
    # 数据可视化
    x = [m+1 for m in range(12)]
    for groupName, group in yearPd.groupby('Year'):
        # plt.plot(x, group['count'], label=groupName)
        plt.scatter(x, group['count'], color='g', alpha=0.15)
    plt.title('earthquakes per month')
    # plt.legend() # 显示label
    plt.xlabel('Month')
    plt.show()

def depthVsMagnitude():
    data = rawData.select('Depth', 'Magnitude').dropna()
    vsPd = data.toPandas()
    # 数据可视化
    plt.scatter(vsPd.Depth, vsPd.Magnitude, color='g', alpha=0.2)
    plt.title('Depth vs Magnitude')
    plt.xlabel('Depth')
    plt.ylabel('Magnitude')
    plt.show()

def magnitudeVsType():
    data = rawData.select('Magnitude', 'Magnitude Type').dropna()
    typePd = data.toPandas()
    # 准备箱体数据
    typeBox = []
    typeName = []
    for groupName, group in typePd.groupby('Magnitude Type'):
        typeName.append(groupName)
        typeBox.append(group['Magnitude'])
    # 数据可视化
    plt.boxplot(typeBox, labels=typeName)
    plt.title('Type vs Magnitude')
    plt.xlabel('Type')
    plt.ylabel('Magnitude')
    plt.show()

def location():
    data = rawData.select('Latitude', 'Longitude', 'Magnitude').dropna()
    locationPd = data.toPandas()
    # 世界地图
    basemap = mpl_toolkits.basemap.Basemap()
    basemap.drawcoastlines()
    # 数据可视化
    plt.scatter(locationPd.Longitude, locationPd.Latitude,
                color='g', alpha=0.25, s=locationPd.Magnitude)
    plt.title('Location')
    plt.xlabel('Longitude')
    plt.ylabel('Latitude')
    plt.show()

if __name__ == '__main__':
    earthquakesPerYear()
    # earthquakesPerMonth()
    # depthVsMagnitude()
    # magnitudeVsType()
    # location()

上面是分析的全部代码,下面我们分开讲解每个部分。

2.全球每年发生重大地震的次数

这个部分对应的代码如下:

def earthquakesPerYear():
   yearDF = rawData.select('Year').dropna() \
                    .groupBy(rawData['Year']).count()
   yearDF = yearDF.sort(yearDF['Year'].asc())
   yearPd = yearDF.toPandas()
   # 数据可视化
   plt.bar(yearPd['Year'], yearPd['count'], color='skyblue')
   plt.title('earthquakes per year') 
   plt.xlabel('Year') 
   plt.ylabel('count') 
   plt.show()

可视化结果如下图所示。由图可知,1965-2016年期间,每年发生重大地震的次数均超过200次,且总体趋势上,随着年份的增加地震发生的频率也提高了,从2012年开始有有所回落,但每年发生地震次数仍达到400以上。

2.全球不同年份每月发生重大地震的次数

这个部分对应的代码如下:

def earthquakesPerMonth():
   data = rawData.select('Year', 'Month').dropna()
   yearDF = data.groupBy(['Year', 'Month']).count()
   yearPd = yearDF.sort(yearDF['Year'].asc(),
                         yearDF['Month'].asc()).toPandas()
    # 数据可视化
   x = [m+1 for m in range(12)]
   for groupName, group in yearPd.groupby('Year'):
      # plt.plot(x, group['count'], label=groupName)
      plt.scatter(x, group['count'], color='g', alpha=0.15)
   plt.title('earthquakes per month')
   # plt.legend() # 显示label
   plt.xlabel('Month')
   plt.show()

可视化结果如下图所示。从图中我们可以看出,每月不同年份发生地震的次数基本都在 [10, 60] 这一区间,仅有个别年份的个别月份地震发生频率激增,最高超过200次。

3.全球重大地震深度和强度之间的关系

这个部分对应的代码如下:

def depthVsMagnitude():
   data = rawData.select('Depth', 'Magnitude').dropna()
   vsPd = data.toPandas()
    # 数据可视化
   plt.scatter(vsPd.Depth, vsPd.Magnitude, color='g', alpha=0.1)
   plt.title('Depth vs Magnitude')
   plt.xlabel('Depth')
   plt.ylabel('Magnitude')
   plt.show()

可视化结果如下图所示。在图中,地震深度和强度并不存在明显的线性关系,在 [250, 500] 这一区间内,地震的强度和频率明显少于其他深度区间。反而在深度约60以下为地震高发区,且震级强度也分布较广。

4.全球重大地震深度和类型之间的关系

这个部分对应的代码如下:

def magnitudeVsType():
   data = rawData.select('Magnitude', 'Magnitude Type').dropna()
   typePd = data.toPandas()
   #准备箱体数据
   typeBox = []
   typeName = []
   for groupName, group in typePd.groupby('Magnitude Type'):
      typeName.append(groupName)
      typeBox.append(group['Magnitude'])
   #数据可视化
   plt.boxplot(typeBox, labels=typeName)
   plt.title('Type vs Magnitude')
   plt.xlabel('Type')
   plt.ylabel('Magnitude')
   plt.show()

可视化结果如下图所示。由图可知,MH类型的地震强度普遍较其他类型更高,主要分布于 [5.7, 7.0] 之间,而MB、MS、MW、MWB、MWC和MWW类型的地震数据出现了较多的异常点,可能由于数据不服从正态分布存在较高的偏态。

5.全球经常发生重大地震的地带

这个部分对应的代码如下:

def location():
   data = rawData.select('Latitude',
                          'Longitude',
                          'Magnitude').dropna()
   locationPd = data.toPandas()
   # 世界地图
   basemap = mpl_toolkits.basemap.Basemap()
   basemap.drawcoastlines()
   # 数据可视化
   plt.scatter(locationPd.Longitude, locationPd.Latitude,
                color='g', alpha=0.25, s=locationPd.Magnitude)
   plt.title('Location')
   plt.xlabel('Longitude')
   plt.ylabel('Latitude')
   plt.show()

可视化结果如下图所示。由图中可明显看出不同的地震带把地球明显分成好几个板块,环太平洋火山地震带上,地震发生频率和震级都较其他地震带要高。