基于Covid-19传播数据的Spark数据处理分析

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学信息学院2021级研究生 费翔
基于Scala语言的Spark数据处理分析案例
案例制作:厦门大学数据库实验室
指导老师:厦门大学信息学院计算机系数据库实验室 林子雨 博士/副教授 E-mail: ziyulin@xmu.edu.cn
相关教材:林子雨,赖永炫,陶继平《Spark编程基础(Scala版)》(访问教材官网)
【查看基于Scala语言的Spark数据分析案例集锦】

一、实验环境

Ubuntu 18.04 LTS
Hadoop 3.1.3
Spark 3.2.0
Scala 2.12.15
Python 3.6.9
此外,使用了Python3相关库进行数据分析与可视化操作,并将实验结果保存为notebook格式以方便后续查看。具体使用的库包括:

  1. NumPy:Python3语言的扩展程序库,支持大量的维度数据处理操作,并能够方便、高效地对数据进行运算
  2. Pandas:Python3语言用于数据分析的拓展程序库,提供高性能、易于使用的数据结构和数据分析工具。
  3. Matplotlib:在Python3绘图领域最为广泛使用的套件,能够方便地实现数据的可视化,并支持用户自定义配置文件与多种输出格式。
  4. Seaborn:基于Matplotlib进行高级封装对可视化库,能够绘制更加集成化、具有更高风格自由度的图表。
  5. Jupyter notebook:基于网页的帮助实现交互计算的应用程序,能够使用交互式的笔记本文件格式,实现将代码、文字与数据输出结果进行结合,从而实现一站式的数据处理与分析流程。
  6. SciPy:开源的Python算法库与数学工具包,包含了基于NumPy的科学计算库。

    二、数据预处理

    数据集清洗

    本项目使用的数据集来自数据网站Kaggle的Covid-19新型冠状肺炎病毒传播数据集,可以直接从百度网盘下载(提取码:ziyu),该数据集包括共67字段,但其中不少字段可以由其它字段数据计算得到,例如“每百万人病例数量 = 1,000,000 * 总病例数 / 国家总人口数”,因此对数据字段进行选择。
    字段名称 含义
    Continent 所在洲
    Location 国家/地区名
    Date 数据日期
    Total_cases 总病例数
    New_cases 当日新增病例数
    Total_deaths 总死亡数
    New_deaths 当日新增死亡数
    Total_vaccinations 接种疫苗总剂次
    GDP_per_capita 人均GDP
    Population 总人口数
    Population_density 人口密度
    Median_age 年龄中位数
    Life_expectancy 期望寿命
    Human_dev_index 人类发展指数
    同时,由于数据量较大,但其中很多字段都包含大量空值,因此对数据分析时也需要进行进一步的处理。原数据集中共67字段,但其中大部分字段均缺失了超过六成的数据。对这部分字段数据进行分析时,需要做进一步的判断,删除无效数据。剔除可以通过计算得到的等效字段后,最终选定如上表所示的字段用于分析。

    数据转换为DataFrame

    随后,将数据转换为便于使用的DataFrame格式,以便于后续使用Spark进行分析。首先,启动hadoop伪分布式运行,并将处理好的csv文件存储到HDFS中:

    cd /usr/local/hadoop  
    ./sbin/start-dfs.sh  
    bin/hadoop fs -mkdir -p /dbcovid/data  
    bin/hadoop fs -put ~/Downloads/covid.csv /dbcovid/data 

    此时,将csv文件保存至HDFS对应的目录下。可以通过Spark-shell进行检查,通过对路径使用hdfs.exists()方法,确认文件是否存在。随后,需要将保存的数据转换为DataFrame类型。借助课程提供的Spark版本,可以十分容易地直接完成读取:

    var datapath = "hdfs://localhost:9000/dbcovid/data/covid.csv"  
    var df = spark.read.option("header", "true").option("inferSchema", "true").csv(datapath) 

    由于考虑到csv文件包含了字段名,因此需要设置header属性为True,否则默认将第一行也会读取为记录值。而对inferSchema属性默认设置的False值,会将所有输入值均作为字符串类型进行保存,因此可以设置为True值以帮助保留浮点数的数据类型。值得注意的是,得益于Spark的处理性能,即使虚拟机仅设置了2GB内存,依然能够在一秒钟内完成创建。此时,可以通过df.printSchema()方法检查得到的数据。

    三、使用Spark进行数据分析

    在完成数据保存为DataFrame后,可以使用Spark对其进行数据分析。本章首先介绍数据的整体结构,然后对独立字段进行分析,最后对不同字段之间的相关性进行分析。考虑到后续可视化分析需要,此处将语句执行结果获取的数据保存为json文件,使用Spark提供的DataFrame.write.json()方法可以轻松实现。
    为了便于使用Spark SQL,需要从DataFrame类型的数据上,建立临时视图:

    df.createOrReplaceTempView("data")  

    使用Spark SQL可以便于完成后续的操作,具体使用的分析语句见后文。对数据集进行分析得到,该数据集共有166k行记录,其中每一行对应某一个国家在特定日期的疫情情况。数据共包含六大洲、225个国家,以及不属于上述划分的部分数据如全球总数、各大洲总数。对于每一个国家而言,数据记录了最多达774天的病例情况,包括新增/总阳性病例数量,新增/总死亡数量。对于部分国家还包含了总疫苗接种剂次的记录。此外,也包含了国家的基本情况,例如人均GDP、人口、人口密度、年龄中位数、期望寿命与人类发展指数。通过这些数据及其相互联系,可以获得一个国家疫情的传播情况,并与其国家各项指标进行综合分析。
    获得上述部分统计指标数值的代码如下:

    spark.sql("select DISTINCT continent from data") -- 统计洲数量  
    spark.sql("select COUNT(DISTINCT location) from data") -- 统计国家数量  
    spark.sql("select COUNT(date) from data where location=='China'").show() -- 统计日期总数  

    独立指标分析

    因此,首先定义字符串数组,并在后续的实验中使用循环遍历数组中的国家/地区即可。

    var locs = List("China", "United States", "European Union", "Russia", "Japan", "United Kingdom", "Singapore") 
  7. (重点国家)新增病例/死亡数量
    for (loc <- locs){  
    spark.sql("select new_cases from data where location='"+loc+"'").write.json("/dbcovid/result/new_cases/"+loc+"/")  
    spark.sql("select new_deaths from data where location='"+loc+"'").write.json("/dbcovid/result/new_deaths/"+loc+"/")  
    } 
  8. (重点国家)累计病例/死亡数量
    for (loc <- locs){  
    spark.sql("select total_cases from data where location='"+loc+"'").write.json("/dbcovid/result/total_cases/"+loc+"/")  
    spark.sql("select total_deaths from data where location='"+loc+"'").write.json("/dbcovid/result/total_deaths/"+loc+"/")  
    }  
  9. (重点国家)疫苗接种剂次
    for (loc <- locs){  
    spark.sql("select total_vaccinations from data where location='"+loc+"'").write.json("/dbcovid/result/total_vacc/"+loc+"/")  
    }  
  10. (全部国家)每百万人累计病例/死亡数量
    spark.sql("select location,max(1000000*total_cases/population) from data where continent is not null group by location").write.json("/dbcovid/result/million_cases/")  
    spark.sql("select location,max(1000000*total_deaths/population) from data where continent is not null group by location").write.json("/dbcovid/result/million_deaths/") 

    相关性分析

  11. (重点国家)每日新增病例数与疫苗接种剂次的关系
    for (loc <- locs){  
    spark.sql("select new_cases,total_vaccinations from data where location='"+loc+"'").write.json("/dbcovid/result/cases_with_vaccs/"+loc+"/")  
    }  
  12. (所有国家)每百万人病例/死亡数量与人均GDP的关系
    spark.sql("select location,max(1000000*total_cases/population),max(gdp_per_capita) from data where continent is not null group by location").write.json("/dbcovid/result/million_cases_with_gdp/")  
    spark.sql("select location,max(1000000*total_deaths/population),max(gdp_per_capita) from data where continent is not null group by location").write.json("/dbcovid/result/million_deaths_with_gdp/")  
  13. (所有国家)每百万人病例/死亡数量与年龄中位数的关系
    spark.sql("select location,max(1000000*total_cases/population),max(median_age) from data where continent is not null group by location").write.json("/dbcovid/result/million_cases_with_age/")
    spark.sql("select location,max(1000000*total_deaths/population),max(median_age) from data where continent is not null group by location").write.json("/dbcovid/result/million_deaths_with_age/") 
  14. (所有国家)每百万人病例/死亡数量与人口密度的关系
    spark.sql("select location,max(1000000*total_cases/population),max(population_density) from data where continent is not null group by location").write.json("/dbcovid/result/million_cases_with_density/")  
    spark.sql("select location,max(1000000*total_deaths/population),max(population_density) from data where continent is not null group by location").write.json("/dbcovid/result/million_deaths_with_density/")  
  15. (所有国家)每百万人病例/死亡数量与期望寿命的关系
    spark.sql("select location,max(1000000*total_cases/population),max(life_expectancy) from data where continent is not null group by location").write.json("/dbcovid/result/million_cases_with_lifeexp/")  
    spark.sql("select location,max(1000000*total_deaths/population),max(life_expectancy) from data where continent is not null group by location").write.json("/dbcovid/result/million_deaths_with_lifeexp/") 
  16. (所有国家)每百万人病例/死亡数量与人类发展指数的关系
    spark.sql("select location,max(1000000*total_cases/population),max(human_development_index) from data where continent is not null group by location").write.json("/dbcovid/result/million_cases_with_hdi/")  
    spark.sql("select location,max(1000000*total_deaths/population),max(human_development_index) from data where continent is not null group by location").write.json("/dbcovid/result/million_deaths_with_hdi/")

    数据检查与导出

    将相应的数据从HDFS导出到系统文件

    bin/hdfs dfs -get /dbcovid/result ~/covid_data/

    四、可视化分析

    独立指标分析

    
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns
    import json
    from scipy.ndimage import gaussian_filter1d
    # 定义重点国家/地区
    locs = ["China", "United States", "European Union", "Russia", "Japan", "United Kingdom", "Singapore"]
    # 重点国家新增病例数量
    path_root = "result/new_cases/"

数据导入

all_data = []

for loc in locs:
path = path_root + loc + ".json"
tmp = pd.read_json(path, lines=True).values.squeeze() # turn to NumPy type
all_data.append(tmp)
all_data = np.array(
[
list(i) + [float("nan")] * (max([len(j) for j in all_data]) - len(i))
for i in all_data
]
)

数据 NaN 值处理

for tmp in all_data:
if np.isnan(tmp[0]):
tmp[0] = 0
for i in range(len(tmp) - 1):
if np.isnan(tmp[i + 1]):
tmp[i + 1] = tmp[i]

数据平滑

for i in range(len(all_data)):
all_data[i] = gaussian_filter1d(all_data[i], sigma=2.5)

保存为 DataFrame

df = pd.DataFrame(all_data).transpose()
df.columns = locs

绘图

plt.figure(figsize=(12, 8))
plt.xlabel('Days')
plt.ylabel('Number of new cases')
sns.lineplot(data=df, dashes=False)
plt.show()

重点国家新增死亡数量

path_root = "result/new_deaths/"

all_data = []

for loc in locs:
path = path_root + loc + ".json"
tmp = pd.read_json(path, lines=True).values.squeeze() # turn to NumPy type
all_data.append(tmp)
all_data = np.array(
[
list(i) + [float("nan")] * (max([len(j) for j in all_data]) - len(i))
for i in all_data
]
)

for tmp in all_data:
if np.isnan(tmp[0]):
tmp[0] = 0
for i in range(len(tmp) - 1):
if np.isnan(tmp[i + 1]):
tmp[i + 1] = tmp[i]

for i in range(len(all_data)):
all_data[i] = gaussian_filter1d(all_data[i], sigma=2.5)

df = pd.DataFrame(all_data).transpose()
df.columns = locs

plt.figure(figsize=(12, 8))
plt.xlabel('Days')
plt.ylabel('Number of new deaths')
sns.lineplot(data=df, dashes=False)
plt.show()

重点国家累计病例数量

path_root = "result/total_cases/"

数据导入

all_data = []

for loc in locs:
path = path_root + loc + ".json"
tmp = pd.read_json(path, lines=True).values.squeeze() # NumPy type
all_data.append(tmp)
all_data = np.array(
[
list(i) + [float("nan")] * (max([len(j) for j in all_data]) - len(i))
for i in all_data
]
)

数据 NaN 值处理

for tmp in all_data:
if np.isnan(tmp[0]):
tmp[0] = 0
for i in range(len(tmp) - 1):
if np.isnan(tmp[i + 1]):
tmp[i + 1] = tmp[i]

本数据不使用平滑。

保存为 DataFrame

df = pd.DataFrame(all_data).transpose()
df.columns = locs

绘图

plt.figure(figsize=(12, 8))
plt.xlabel('Days')
plt.ylabel('Number of all cases')
sns.lineplot(data=df, dashes=False)
plt.show()

重点国家累计死亡数量

path_root = "result/total_deaths/"

数据导入

all_data = []

for loc in locs:
path = path_root + loc + ".json"
tmp = pd.read_json(path, lines=True).values.squeeze() # NumPy type
all_data.append(tmp)
all_data = np.array(
[
list(i) + [float("nan")] * (max([len(j) for j in all_data]) - len(i))
for i in all_data
]
)

数据 NaN 值处理

for tmp in all_data:
if np.isnan(tmp[0]):
tmp[0] = 0
for i in range(len(tmp) - 1):
if np.isnan(tmp[i + 1]):
tmp[i + 1] = tmp[i]

本数据不使用平滑。

保存为 DataFrame

df = pd.DataFrame(all_data).transpose()
df.columns = locs

绘图

plt.figure(figsize=(12, 8))
plt.xlabel('Days')
plt.ylabel('Number of all deaths')
sns.lineplot(data=df, dashes=False)
plt.show()

重点国家累计疫苗接种剂次

path_root = "result/total_vacc/"

数据导入

all_data = []

for loc in locs:
path = path_root + loc + ".json"
tmp = pd.read_json(path, lines=True).values.squeeze() # NumPy type
all_data.append(tmp)
all_data = np.array(
[
list(i) + [float("nan")] * (max([len(j) for j in all_data]) - len(i))
for i in all_data
]
)

数据 NaN 值处理

for tmp in all_data:
if np.isnan(tmp[0]):
tmp[0] = 0
for i in range(len(tmp) - 1):
if np.isnan(tmp[i + 1]):
tmp[i + 1] = tmp[i]

本数据不使用平滑。

保存为 DataFrame

df = pd.DataFrame(all_data).transpose()
df.columns = locs

绘图

plt.figure(figsize=(12, 8))
plt.xlabel('Days')
plt.ylabel('Number of all voccinations')
fig = sns.lineplot(data=df, dashes=False)
fig.set_xlim(300, 775)
plt.show()

全部国家每百万人累计病例

读取文件

path = "result/million_cases.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data']
tmp_data = pd.DataFrame(all_data, columns=['data']).values.squeeze() # NumPy type

处理 NaN 缺失值

number_data = [x for x in tmp_data if not np.isnan(x)]

保存密度数据

countdata = [0 for in range(15)]
for i in number_data:
count_data[int(i/5e4)] += 1

保存为 DataFrame

df = pd.DataFrame(count_data).transpose()

绘图

plt.figure(figsize=(12, 8))
sns.barplot(data=df)
plt.xlabel('Number of cases per million (*75k)')
plt.ylabel('Number of locations')
plt.show()

全部国家每百万人累计死亡

读取文件

path = "result/million_deaths.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data']
tmp_data = pd.DataFrame(all_data, columns=['data']).values.squeeze() # NumPy type

处理 NaN 缺失值

number_data = [x for x in tmp_data if not np.isnan(x)]

保存密度数据

countdata = [0 for in range(16)]
for i in number_data:
count_data[int(i/400)] += 1

保存为 DataFrame

df = pd.DataFrame(count_data).transpose()

绘图

plt.figure(figsize=(12, 8))
sns.barplot(data=df)
plt.xlabel('Number of deaths per million (*400)')
plt.ylabel('Number of locations')
plt.show()

新增病例数量
![](https://dblab.xmu.edu.cn/blog/wp-content/uploads/2022/09/新增病例数量.png)
新增死亡数量
![](https://dblab.xmu.edu.cn/blog/wp-content/uploads/2022/09/新增死亡数量.png)
累计病例数量
![](https://dblab.xmu.edu.cn/blog/wp-content/uploads/2022/09/累计病例数量.png)
累计死亡数量
![](https://dblab.xmu.edu.cn/blog/wp-content/uploads/2022/09/累计死亡数量.png)
累计疫苗数量
![](https://dblab.xmu.edu.cn/blog/wp-content/uploads/2022/09/累计疫苗数量.png)
百万人累计病例
![](https://dblab.xmu.edu.cn/blog/wp-content/uploads/2022/09/百万人累计病例.png)
百万人累计死亡
![](https://dblab.xmu.edu.cn/blog/wp-content/uploads/2022/09/百万人累计死亡.png)
### 相关性分析
```python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
from scipy.ndimage import gaussian_filter1d

sns.set_style("darkgrid")
# 定义重点国家/地区
locs = ["China", "United States", "European Union", "Russia", "Japan", "United Kingdom", "Singapore"]

# 重点国家新增病例数量与疫苗剂次关系
path_root = "result/cases_with_vaccs/"

# 各国家独立保存图像
for loc in locs:
    path = path_root + loc + ".json"
    data = pd.read_json(path, lines=True).values.squeeze()  # turn to NumPy type
    data = np.array(data).T

    # 数据 NaN 值处理
    for tmp in data:
        if np.isnan(tmp[0]):
            tmp[0] = 0
        for i in range(len(tmp) - 1):
            if np.isnan(tmp[i + 1]):
                tmp[i + 1] = tmp[i]

    # 数据平滑
    data[0] = gaussian_filter1d(data[0], sigma=2.5)

    # 保存 DataFrame
    df0 = pd.DataFrame(data[0])
    df0.columns = ['Number of daily new cases']
    df1 = pd.DataFrame(data[1])
    df1.columns = ['Number of total vaccinations']

    # 绘图
    fig = plt.figure(figsize=(12, 8))
    plt.title(loc)
    ax1 = fig.add_subplot(111)
    df0.plot(ax=ax1, style='-')
    plt.xlabel('Day')
    ax1.set_ylabel('Number of daily new cases')
    # ax1.set_yticks(np.arange(0, 1.5e6, 1e5))
    plt.legend(loc=2) # 10

    ax2 = ax1.twinx()
    df1.plot(ax=ax2, style='-', color='green')
    ax2.set_ylabel('Number of total vaccinations')
    # ax2.set_yticks(np.arange(0, 3.5e9, 5e8))
    plt.legend(loc=1) # 10

    plt.savefig(u'新增与疫苗关系_'+loc+'.png', bbox_inches='tight')

# 全部国家每百万人累计病例&人均GDP关系

# 读取文件
path = "result/million_cases_with_gdp.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data', 'gdp']

all_data.dropna(inplace=True)

# 绘图
plt.figure(figsize=(12, 8))
sns.scatterplot(data=all_data, x='gdp', y='data')
plt.xlabel('GDP_per_capita')
plt.ylabel('Number of cases per million')
plt.show()

# 全部国家每百万人累计死亡&人均GDP关系

# 读取文件
path = "result/million_deaths_with_gdp.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data', 'gdp']

all_data.dropna(inplace=True)

# 绘图
plt.figure(figsize=(12, 8))
sns.scatterplot(data=all_data, x='gdp', y='data')
plt.xlabel('GDP_per_capita')
plt.ylabel('Number of deaths per million')
plt.show()

# 全部国家每百万人累计病例&年龄中位数关系

# 读取文件
path = "result/million_cases_with_age.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data', 'age']

all_data.dropna(inplace=True)

# 绘图
plt.figure(figsize=(12, 8))
sns.scatterplot(data=all_data, x='age', y='data')
plt.xlabel('age')
plt.ylabel('Number of cases per million')
plt.show()

# 全部国家每百万人累计死亡&年龄中位数关系

# 读取文件
path = "result/million_deaths_with_age.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data', 'age']

all_data.dropna(inplace=True)

# 绘图
plt.figure(figsize=(12, 8))
sns.scatterplot(data=all_data, x='age', y='data')
plt.xlabel('age')
plt.ylabel('Number of deaths per million')
plt.show()

# 全部国家每百万人累计病例&人口密度关系

# 读取文件
path = "result/million_cases_with_density.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data', 'den']

all_data.dropna(inplace=True)

# 绘图
plt.figure(figsize=(12, 8))
fig = sns.scatterplot(data=all_data, x='den', y='data')
# fig.set_xticks(np.arange(0,5000,250))
fig.set_xlim(-100, 2000)
plt.xlabel('Population density')
plt.ylabel('Number of cases per million')
plt.show()

# 全部国家每百万人累计死亡&人口密度关系

# 读取文件
path = "result/million_deaths_with_density.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data', 'den']

all_data.dropna(inplace=True)

# 绘图
plt.figure(figsize=(12, 8))
fig = sns.scatterplot(data=all_data, x='den', y='data')
# fig.set_xticks(np.arange(0,5000,250))
fig.set_xlim(-100, 2000)
plt.xlabel('Population density')
plt.ylabel('Number of deaths per million')
plt.show()

# 全部国家每百万人累计病例&期望寿命关系

# 读取文件
path = "result/million_cases_with_lifeexp.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data', 'exp']

all_data.dropna(inplace=True)

# 绘图
plt.figure(figsize=(12, 8))
fig = sns.scatterplot(data=all_data, x='exp', y='data')
plt.xlabel('Life expectancy')
plt.ylabel('Number of cases per million')
plt.show()

# 全部国家每百万人累计死亡&期望寿命关系

# 读取文件
path = "result/million_deaths_with_lifeexp.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data', 'exp']

all_data.dropna(inplace=True)

# 绘图
plt.figure(figsize=(12, 8))
fig = sns.scatterplot(data=all_data, x='exp', y='data')
plt.xlabel('Life expectancy')
plt.ylabel('Number of deaths per million')
plt.show()

# 全部国家每百万人累计病例&hdi关系

# 读取文件
path = "result/million_cases_with_hdi.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data', 'hdi']

all_data.dropna(inplace=True)

# 绘图
plt.figure(figsize=(12, 8))
fig = sns.scatterplot(data=all_data, x='hdi', y='data')
plt.xlabel('Human Development Index')
plt.ylabel('Number of cases per million')
plt.show()

# 全部国家每百万人累计死亡&hdi关系

# 读取文件
path = "result/million_deaths_with_hdi.json"
all_data = pd.read_json(path, lines=True)
all_data.columns = ['location', 'data', 'hdi']

all_data.dropna(inplace=True)

# 绘图
plt.figure(figsize=(12, 8))
fig = sns.scatterplot(data=all_data, x='hdi', y='data')
plt.xlabel('Human Development Index')
plt.ylabel('Number of deaths per million')
plt.show()

重点国家新增病例数量与疫苗剂次关系







全部国家每百万人累计病例&人均GDP关系

全部国家每百万人累计死亡&人均GDP关系

全部国家每百万人累计病例&年龄中位数关系

全部国家每百万人累计死亡&年龄中位数关系

全部国家每百万人累计病例&人口密度关系

全部国家每百万人累计死亡&人口密度关系

全部国家每百万人累计病例&期望寿命关系

全部国家每百万人累计死亡&期望寿命关系

全部国家每百万人累计病例&hdi关系

全部国家每百万人累计死亡&hdi关系