基于Spark的地震数据处理与分析

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学信息学院计算机科学系2019级研究生 胡冰
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
相关教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》(访问教材官网
相关案例:基于Python语言的Spark数据处理分析案例集锦(PySpark)

本案例针对全球重大地震数据进行分析,采用Python为编程语言,采用Hadoop存储数据,采用Spark对数据进行处理分析,并对结果进行数据可视化。

一、 实验环境搭建

(1)Linux:Ubuntu 16.04
(2)Hadoop:3.1.3(安装教程
(3)Spark:2.4.0(安装教程
(4)Python:Anaconda3(Anaconda及Jupyter Notebook安装教程
(5)运行环境:Jupyter Notebook
(6)安装可视化所需软件包,在Linux终端下输入以下命令即可:

  1. sudo apt-get install python3-matplotlib
  2. sudo apt-get install python3-pandas
  3. sudo apt-get install python3-mpltoolkits.basemap
Shell 命令

二、数据准备

数据来自和鲸社区的1965-2016全球重大地震数据,文件名为earthquake.csv包括23412条地震数据,其中有很多属性大部分缺失且本次实验用不到,将其手动删除,只保留Date, Time, Latitude, Longitude, Type, Depth, Magnitude这七个属性。
数据集下载百度网盘地址为:https://pan.baidu.com/s/1tI12CzrqXB8SrlgWokFMyg 提取码: 1ieo

由于数据集中存在3行有问题的数据,因此,需要手动进行如下修改:

在Linux终端输入以下命令启动Hadoop,注意全程保持该终端开启。

  1. cd /usr/local/hadoop
  2. ./sbin/start-dfs.sh
Shell 命令

使用Jupyter Notebook建立一个代码文件preprocessing.py,进行数据的预处理,内容如下:

  1. from pyspark import SparkConf, SparkContext
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.functions import count, when, split, posexplode
  4.  
  5. conf = SparkConf().setMaster("local").setAppName("preprocessing")
  6. sc = SparkContext(conf = conf)
  7. sc.setLogLevel('WARN') # 减少不必要的log输出
  8. spark = SparkSession.builder \
  9. .config(conf = SparkConf()) \
  10. .getOrCreate()
  11.  
  12. rawFile = 'file:///home/hadoop/HBING/earthquake.csv'
  13. rawData = spark.read.format('csv') \
  14. .options(header='true', inferschema='true') \
  15. .load(rawFile)
  16. rawData.printSchema() # 查看数据结构
  17. print('total count: %d' % rawData.count()) # 打印总行数
  18. # 查看每列的非空行数
  19. rawData.agg(*[count(c).alias(c) for c in rawData.columns]).show()
  20.  
  21. # 提取数据
  22. newData = rawData.select('Date', 'Time', 'Latitude',
  23. 'Longitude', 'Type', 'Depth',
  24. 'Magnitude', 'Magnitude Type',
  25. 'ID', 'Source','Location Source',
  26. 'Magnitude Source', 'Status')
  27. # 拆分'Date'到'Month',' Day', 'Year'
  28. newData = newData.withColumn('Split Date', split(newData.Date, '/'))
  29. attrs = sc.parallelize(['Month',' Day', 'Year']).zipWithIndex().collect()
  30. for name, index in attrs:
  31. newColumn = newData['Split Date'].getItem(index)
  32. newData = newData.withColumn(name, newColumn)
  33. newData = newData.drop('Split Date')
  34. newData.show(5)
  35.  
  36. # 上传文件至HDFS
  37. newData.write.csv('earthquakeData.csv', header='true')
Python

在上述代码中,尽管数据集定义了很多数据特征,但是不少特征的缺失值较多,其数据过于稀疏,不利于进行分析。因此,在上述代码中,仅选择数据稠密的特征进行分析,并对Date列进行拆分。
上述代码运行结果如下:


三、数据分析及可视化

1.全部代码

使用Jupyter Notebook编写数据分析代码文件analyze.py,内容如下:

  1. from pyspark import SparkConf, SparkContext
  2. from pyspark.sql import SparkSession
  3. from pyspark.ml.feature import StringIndexer, IndexToString
  4. from pyspark.ml import Pipeline
  5.  
  6. import pandas as pd
  7. import matplotlib.pyplot as plt
  8. import mpl_toolkits.basemap
  9.  
  10. conf = SparkConf().setMaster("local").setAppName("analyze")
  11. sc = SparkContext(conf = conf)
  12. sc.setLogLevel('WARN') # 减少不必要的log输出
  13. spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
  14.  
  15. dataFile = 'earthquakeData.csv'
  16.  
  17. rawData = spark.read.format('csv') \
  18. .options(header='true', inferschema='true') \
  19. .load(dataFile)
  20.  
  21. def earthquakesPerYear():
  22. yearDF = rawData.select('Year').dropna().groupBy(rawData['Year']).count()
  23. yearDF = yearDF.sort(yearDF['Year'].asc())
  24. yearPd = yearDF.toPandas()
  25. # 数据可视化
  26. plt.bar(yearPd['Year'], yearPd['count'], color='skyblue')
  27. plt.title('earthquakes per year')
  28. plt.xlabel('Year')
  29. plt.ylabel('count')
  30. plt.show()
  31.  
  32. def earthquakesPerMonth():
  33. data = rawData.select('Year', 'Month').dropna()
  34. yearDF = data.groupBy(['Year', 'Month']).count()
  35. yearPd = yearDF.sort(yearDF['Year'].asc(), yearDF['Month'].asc()).toPandas()
  36. # 数据可视化
  37. x = [m+1 for m in range(12)]
  38. for groupName, group in yearPd.groupby('Year'):
  39. # plt.plot(x, group['count'], label=groupName)
  40. plt.scatter(x, group['count'], color='g', alpha=0.15)
  41. plt.title('earthquakes per month')
  42. # plt.legend() # 显示label
  43. plt.xlabel('Month')
  44. plt.show()
  45.  
  46. def depthVsMagnitude():
  47. data = rawData.select('Depth', 'Magnitude').dropna()
  48. vsPd = data.toPandas()
  49. # 数据可视化
  50. plt.scatter(vsPd.Depth, vsPd.Magnitude, color='g', alpha=0.2)
  51. plt.title('Depth vs Magnitude')
  52. plt.xlabel('Depth')
  53. plt.ylabel('Magnitude')
  54. plt.show()
  55.  
  56. def magnitudeVsType():
  57. data = rawData.select('Magnitude', 'Magnitude Type').dropna()
  58. typePd = data.toPandas()
  59. # 准备箱体数据
  60. typeBox = []
  61. typeName = []
  62. for groupName, group in typePd.groupby('Magnitude Type'):
  63. typeName.append(groupName)
  64. typeBox.append(group['Magnitude'])
  65. # 数据可视化
  66. plt.boxplot(typeBox, labels=typeName)
  67. plt.title('Type vs Magnitude')
  68. plt.xlabel('Type')
  69. plt.ylabel('Magnitude')
  70. plt.show()
  71.  
  72. def location():
  73. data = rawData.select('Latitude', 'Longitude', 'Magnitude').dropna()
  74. locationPd = data.toPandas()
  75. # 世界地图
  76. basemap = mpl_toolkits.basemap.Basemap()
  77. basemap.drawcoastlines()
  78. # 数据可视化
  79. plt.scatter(locationPd.Longitude, locationPd.Latitude,
  80. color='g', alpha=0.25, s=locationPd.Magnitude)
  81. plt.title('Location')
  82. plt.xlabel('Longitude')
  83. plt.ylabel('Latitude')
  84. plt.show()
  85.  
  86. if __name__ == '__main__':
  87. earthquakesPerYear()
  88. # earthquakesPerMonth()
  89. # depthVsMagnitude()
  90. # magnitudeVsType()
  91. # location()
Python

上面是分析的全部代码,下面我们分开讲解每个部分。

2.全球每年发生重大地震的次数

这个部分对应的代码如下:

  1. def earthquakesPerYear():
  2. yearDF = rawData.select('Year').dropna() \
  3. .groupBy(rawData['Year']).count()
  4. yearDF = yearDF.sort(yearDF['Year'].asc())
  5. yearPd = yearDF.toPandas()
  6. # 数据可视化
  7. plt.bar(yearPd['Year'], yearPd['count'], color='skyblue')
  8. plt.title('earthquakes per year')
  9. plt.xlabel('Year')
  10. plt.ylabel('count')
  11. plt.show()
Python

可视化结果如下图所示。由图可知,1965-2016年期间,每年发生重大地震的次数均超过200次,且总体趋势上,随着年份的增加地震发生的频率也提高了,从2012年开始有有所回落,但每年发生地震次数仍达到400以上。

2.全球不同年份每月发生重大地震的次数

这个部分对应的代码如下:

  1. def earthquakesPerMonth():
  2. data = rawData.select('Year', 'Month').dropna()
  3. yearDF = data.groupBy(['Year', 'Month']).count()
  4. yearPd = yearDF.sort(yearDF['Year'].asc(),
  5. yearDF['Month'].asc()).toPandas()
  6. # 数据可视化
  7. x = [m+1 for m in range(12)]
  8. for groupName, group in yearPd.groupby('Year'):
  9. # plt.plot(x, group['count'], label=groupName)
  10. plt.scatter(x, group['count'], color='g', alpha=0.15)
  11. plt.title('earthquakes per month')
  12. # plt.legend() # 显示label
  13. plt.xlabel('Month')
  14. plt.show()
Python

可视化结果如下图所示。从图中我们可以看出,每月不同年份发生地震的次数基本都在 [10, 60] 这一区间,仅有个别年份的个别月份地震发生频率激增,最高超过200次。

3.全球重大地震深度和强度之间的关系

这个部分对应的代码如下:

  1. def depthVsMagnitude():
  2. data = rawData.select('Depth', 'Magnitude').dropna()
  3. vsPd = data.toPandas()
  4. # 数据可视化
  5. plt.scatter(vsPd.Depth, vsPd.Magnitude, color='g', alpha=0.1)
  6. plt.title('Depth vs Magnitude')
  7. plt.xlabel('Depth')
  8. plt.ylabel('Magnitude')
  9. plt.show()
Python

可视化结果如下图所示。在图中,地震深度和强度并不存在明显的线性关系,在 [250, 500] 这一区间内,地震的强度和频率明显少于其他深度区间。反而在深度约60以下为地震高发区,且震级强度也分布较广。

4.全球重大地震深度和类型之间的关系

这个部分对应的代码如下:

  1. def magnitudeVsType():
  2. data = rawData.select('Magnitude', 'Magnitude Type').dropna()
  3. typePd = data.toPandas()
  4. #准备箱体数据
  5. typeBox = []
  6. typeName = []
  7. for groupName, group in typePd.groupby('Magnitude Type'):
  8. typeName.append(groupName)
  9. typeBox.append(group['Magnitude'])
  10. #数据可视化
  11. plt.boxplot(typeBox, labels=typeName)
  12. plt.title('Type vs Magnitude')
  13. plt.xlabel('Type')
  14. plt.ylabel('Magnitude')
  15. plt.show()
Python

可视化结果如下图所示。由图可知,MH类型的地震强度普遍较其他类型更高,主要分布于 [5.7, 7.0] 之间,而MB、MS、MW、MWB、MWC和MWW类型的地震数据出现了较多的异常点,可能由于数据不服从正态分布存在较高的偏态。

5.全球经常发生重大地震的地带

这个部分对应的代码如下:

  1. def location():
  2. data = rawData.select('Latitude',
  3. 'Longitude',
  4. 'Magnitude').dropna()
  5. locationPd = data.toPandas()
  6. # 世界地图
  7. basemap = mpl_toolkits.basemap.Basemap()
  8. basemap.drawcoastlines()
  9. # 数据可视化
  10. plt.scatter(locationPd.Longitude, locationPd.Latitude,
  11. color='g', alpha=0.25, s=locationPd.Magnitude)
  12. plt.title('Location')
  13. plt.xlabel('Longitude')
  14. plt.ylabel('Latitude')
  15. plt.show()
Python

可视化结果如下图所示。由图中可明显看出不同的地震带把地球明显分成好几个板块,环太平洋火山地震带上,地震发生频率和震级都较其他地震带要高。