基于Spark的气象监测数据分析

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学信息学院2021级研究生 杨浩哲
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
相关教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Scala版)》
【查看基于Scala语言的Spark数据分析案例集锦】

本案例针对气象监测数据进行分析,采用Scala为编程语言,采用Hadoop存储数据,采用Spark对数据进行处理分析,并对结果进行数据可视化。

一、实验概述

1.实验环境

本次实验主要通过综合运用大数据处理框架 Spark、 Hadoop及数据可视化技术,对数据进行存储、处理和分析,实验的主要环境配置及其版本信息如下:
(1)操作系统Linux:Ubuntu18.06
(2)Hadoop:3.1.3
(3)Spark:3.2.0
(4)Python:3.6.9
(5)Scala:2.12.15
(6)IntelliJ IDEA:2022.1(sbt)
其中,Hadoop安装参考教程(https://dblab.xmu.edu.cn/blog/2441-2/),Spark安装教程参考(https://dblab.xmu.edu.cn/blog/2501-2/)。使用Idea创建项目参考教程(https://dblab.xmu.edu.cn/blog/1492-2/)。环境安装结果如下(以Spark为例):

2.数据说明

实验数据来源于2021年中国研究生数学建模竞赛B题(data1.csv、data2.csv),为空气质量监测站点实际监测得到的数据。可以从百度网盘下载数据集(提取码:ziyu)。
其中,data1.csv为某监测点的逐小时数据,包括:监测时间、SO2监测浓度(μg/m³)、NO2监测浓度(μg/m³)、PM10监测浓度(μg/m³)、PM2.5监测浓度(μg/m³)、O3监测浓度(μg/m³)、CO监测浓度(mg/m³)、温度(℃)、湿度(%)、气压(MBar)、风速(m/s)、风向(°)、云量、长波辐射(W/m²),数值类型为浮点型,共计19432条;data2.csv为某监测点的逐日数据,包括:监测时间、SO2监测浓度(μg/m³)、NO2监测浓度(μg/m³)、PM10监测浓度(μg/m³)、PM2.5监测浓度(μg/m³)、O3监测浓度(μg/m³)、CO监测浓度(mg/m³),数值类型为浮点型,共计822条。

3.内容概述

本次实验的主要内容如下:
(1)对数据集进行数据预处理,填补缺失值,剔除异常值,保存到HDFS 中,使用编程语言为Python;
(2)使用 Spark 对数据进行分析,其中数据分析包括3个部分,相关系分析包括3个部分,使用的语言为Scala;
(3)对分析结果进行可视化呈现,如汇总数据可视化等,使用的语言为Python。

二、数据预处理

1. 预处理分析

表格中的数据存在部分缺失的问题。因为实测数据具有一定时间连续性,所以针对连续缺失的部分,可通过设定标准来剔除连续缺失数过大的部分,而对其他缺失部分可以采用填补附近值来保留前后数据时序性的信息,一方面可以保留更多地数据,另一方面保证了数据的连续性。具体而言,数据文件包含监测点A的“逐小时污染物浓度与气象实测数据”,其中存在的问题是因监测站点设备调试、维护等原因,实测数据在连续时间内存在部分或全部缺失、或者部分气象指标在某些监测站点无法获取的情况。
针对上述问题,对于非负指标的负数值数据,观察到其前后数据都为0,推测是在测量0值时出现错误,故补0即可。实测数据在连续时间内存在部分或全部丢失,认定若实测数据存在连续大于 3条及以上的情况丢失了重要的时序信息,故进行数据删除;其他情况考虑到由于数据是具有时序连续性,所以考虑结合历史时间条件来对需要补全的数据进行填充,本次实验均对连续丢失数不大于3条的数据,按照前向一小时数据进行复制填充,来补全数据。

2. 代码介绍

文件为data_pre.py,对data1.csv进行:填充操作、删除空行操作以及前向填充操作,存储文件为res.csv,供后续分析,操作命令:python3 data_pre.py,图2.1为运行截图:

使用hdfs dfs -put res.csv即可上传至HDFS,以下为文件目录:

其读取路径为:“hdfs://localhost:9000/user/hadoop/res.csv”。为了操作方便,后续数据分析暂时先使用Linux本地路径。

三、数据分析

1. 筛选六大污染物浓度排名前20的时段(Task1.scala)

六大污染物为SO2、NO2、PM10、PM2.5、O3以及CO,该部分数据分析主要包括:
(1)读入res.csv,创建临时视图,从临时视图中选取字段:监测时间,SO2监测浓度(μg/m3)
(2)将选取的数据按照浓度降序排列,并选取前20的数据;
(3)将上述前20数据再按年度时间进行升序排序,存入文件。
部分代码如下,以SO2分析为例:

df.createOrReplaceTempView("SO2")
val SO2_1 = spark.sql("select `监测时间`,`SO2监测浓度(μg/m3)`from SO2 where `SO2监测浓度(μg/m3)` > 0 " + "order by `SO2监测浓度(μg/m3)` desc limit 20 "  )
val SO2_2 = SO2_1.orderBy("监测时间")
SO2_2.write.option("header",true).mode("overwrite").csv("file:///home/hadoop/IdeaProjects/work/Task1/SO2_20.csv")

以下为运行截图:

2. 计算PM2.5浓度在五大浓度限值区间的分布(Task2.scala)

该部分数据分析主要针对PM2.5浓度数据,其中浓度限值区间如表3.1所示,但此处没有严格要求限制为每日数据,而是只使用其区间信息。由于在第六区间及之后,PM2.5浓度数据条目为0,所以仅计算在前五个区间的分布,分析主要包括:
(1)读入res.csv,创建临时视图,从临时视图中选取字段PM2.5监测浓度(μg/m3)
(2)使用count函数计算各个区间的分布数量;
(3)存入新的dataframe,表头为(区间等级,数量)。
部分代码如下:

val PM25_total = spark.sql("select `PM2.5监测浓度(μg/m3)` from PM25 " + "where `PM2.5监测浓度(μg/m3)` >= 0" ).count()
val PM25_1 = spark.sql("select `PM2.5监测浓度(μg/m3)` from PM25 " +  "where `PM2.5监测浓度(μg/m3)` >= 0 and `PM2.5监测浓度(μg/m3)` <= 35" ).count()

以下为运行截图:

3. 计算每日首要污染物及其污染程度(Task3.scala)

该部分数据分析使用data2.csv,利用各污染物浓度计算每日的AQI值,得到相应的首要污染物和空气质量等级。根据《环境空气质量指数(AQI)技术规定(试行)》(HJ633-2012),空气质量指数(AQI)可用于判别空气质量等级。首先需得到各项污染物的空气质量分指数(IAQI),其计算公式如下:

式中各符号含义如下:

对于AQI的计算公式如下:

空气质量等级范围根据AQI数值划分,等级对应的AQI范围见表3.2。

代码细节为:
(1)首先利用sql语句挑选出所需要的浓度数据,再使用df.collect将其转换成数组;
(2)利用上述公式对表格数据进行计算,存入ArrayBuffer;
(3)将ArrayBuffer转换成Array类型,然后转换成Seq类型,最后转化为RDD;
(4)创建schema以及使用map映射,转化为dataframe格式并储存。
以下为部分代码:

var arr = arrBuf.toArray
var arrRDD = spark.sparkContext.parallelize(arr.toSeq)

val fields = Array(StructField("日期",StringType,true), StructField("AQI",IntegerType,true), StructField("首要污染物",StringType,true),StructField("空气质量等级",StringType,true),StructField("空气质量等级数",StringType,true))
val schema = StructType(fields)
val rowRDD = arrRDD.map(attributes => Row(attributes(0), attributes(1),attributes(2),attributes(3),attributes(4)))
val df_PM25 = spark.createDataFrame(rowRDD, schema)

以下为运行截图:

四、关系分析

1. 分析SO2浓度与NO2浓度的关系(Task4.scala)

该部分数据分析使用res.csv,即污染物浓度的逐小时数据。其中使用到了Spark ML中计算皮尔逊相关系数的函数包,主要步骤如下:
(1)使用sql语句,从res.csv中选择SO2和NO2的逐小时浓度数据;
(2)保存其数据至csv文件,供后续可视化使用;
(3)使用describe函数,计算其数量、均值、标准差以及最值;
(4)将选择的数据映射为Double类型;
(5)使用Statistics.corr(SO2, NO2, "pearson")函数,计算其相关系数。
计算结果为:SO2浓度与NO2浓度的皮尔逊相关系数为0.41。
以下是部分代码:

val df1 = spark.sql("select `监测时间`,`SO2监测浓度(μg/m3)`,`NO2监测浓度(μg/m3)`from SO2 where `SO2监测浓度(μg/m3)` > 0 " )
df1.write.option("header",true).mode("overwrite").csv("file:///home/hadoop/IdeaProjects/work/Task4/SO2_NO2.csv")
 //df1.describe().show
val df_real = df1.select("SO2监测浓度(μg/m3)","NO2监测浓度(μg/m3)")
val rdd_real = df_real.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))
val SO2 = rdd_real.map(x=>x._1.toDouble )
val NO2 = rdd_real.map(x=>x._2.toDouble )
val cor_pearson:Double = Statistics.corr(SO2, NO2, "pearson")

以下是运行结果:

  1. 分析PM10浓度与空气湿度的关系(Task5.scala)
    因为PM10浓度的变化与空气含水量密切相关,比如当相对湿度大于60%以上时, PM10颗粒会吸湿增大,所以该部分试图分析PM10浓度与空气湿度的关系。主要步骤参考上一部分:
    (1)使用sql语句,从res.csv中选择PM10浓度和空气湿度的逐小时浓度数据;
    (2)保存其数据至csv文件,供后续可视化使用;
    (3)使用describe函数,计算其数量、均值、标准差以及最值;
    (4)将选择的数据映射为Double类型;
    (5)使用Statistics.corr(PM10, Air, "pearson")函数,计算其相关系数。
    计算结果为:PM10浓度与空气湿度的皮尔逊相关系数为 -0.41。
    以下是部分代码:

    val df_real = df1.select("PM10监测浓度(μg/m3)","湿度(%)")
    val rdd_real = df_real.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))
    val PM10 = rdd_real.map(x=>x._1.toDouble )
    val Air = rdd_real.map(x=>x._2.toDouble )
    val cor_pearson:Double = Statistics.corr(PM10, Air, "pearson")

    以下是运行结果:

  1. 分析O3浓度与云量的关系(Task6.scala)
    因为云量及其云内液体的吸收对O3含量有一定影响,所以该部分试图对O3浓度与云量的关系进行分析,主要使用斯皮尔曼相关系数。主要步骤参考上一部分:
    (1)使用sql语句,从res.csv中选择O3浓度和云量的逐小时浓度数据;
    (2)保存其数据至csv文件,供后续可视化使用;
    (3)使用describe函数,计算其数量、均值、标准差以及最值;
    (4)将选择的数据映射为Double类型;
    (5)使用Statistics.corr(O3, Cloud, "spearman")函数,计算其相关系数。
    计算结果为:O3浓度与云量的斯皮尔曼相关系数为 0.04,关联性不高。
    以下是部分代码:

    val df_real = df1.select("O3监测浓度(μg/m3)","云量")
    val rdd_real = df_real.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))
    val O3 = rdd_real.map(x=>x._1.toDouble )
    val Cloud = rdd_real.map(x=>x._2.toDouble )
    val cor_spearman:Double = Statistics.corr(O3, Cloud, "spearman")

    以下是运行结果:

五、数据可视化

1.污染物浓度排名前20的时段数据可视化(draw_task1.py)

数据可视化工具为matplotlib库,以下是污染物浓度排名前20的时段数据可视化结果:

2.PM2.5浓度在浓度限值区间的分布数据可视化(draw_task2.py)

由于第四区间及之后区间的数据量过少,所以此处仅仅对前3个区间分布进行可视化:

3.每日AQI值数据曲线可视化(draw_task3.py)

该部分为每日AQI值数据曲线的可视化:

4.SO2浓度与NO2浓度变化趋势可视化(draw_task4.py)

下图为SO2浓度与NO2浓度变化趋势的可视化曲线图,其中只取了前400条数据:

5.PM10浓度与空气湿度变化趋势可视化(draw_task5.py)

下图为PM10浓度与空气湿度变化趋势的可视化曲线图,其中只取了前400条数据:

6.O3浓度与云量变化趋势可视化(draw_task6.py)

下图为O3浓度与云量变化趋势的可视化曲线图,其中只取了前400条数据: