基于Python和Flink的全球疫苗接种数据分析

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学计算机科学与技术系2023级研究生 黄邦
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
时间:2024年6月
相关教材:林子雨编著《Flink编程基础(Java版)》(访问教材官网
相关案例:Flink大数据处理分析案例集锦
本案例使用了Kaggle上的全球疫苗接种数据集,在Ubuntu虚拟机上使用Flink框架完成数据处理,使用Python语言实现,最后使用Python plotly包完成数据可视化。
数据集和代码下载:从百度网盘下载本案例的代码和数据集。(提取码是ziyu)

一、实验介绍

本实验使用了kaggle上的公开数据集,在Ubuntu虚拟机上使用Flink框架完成数据处理,使用Python语言实现,最后使用Python plotly包完成数据可视化。
在本实验中,分别完成了以下数据分析任务:

1.分析疫苗全程接种人数最高的20个国家,及其接种比例,并以柱状图和折线图的方式展现。
2.分析最为流行的疫苗接种方案,并以饼状图的形式进行展现。
3.以疫苗全程接种人数最高的10个国家为代表,分析全球疫苗接种进程,并以曲线图的方式展现。
4.分析全球疫苗接种、新增案例的变化情况,并以柱状图堆叠的方式展现。
5.分析全球疫苗累计接种、累计案例以及累计死亡病例的变化情况,并以柱状图和折线图的方式展现。
6.分析中国的疫苗接种、新增案例和死亡病例的变化情况,并以曲线图的方式展现。

二、实验环境

三、数据集

1.新冠病毒全球感染情况数据集[1]:以下称“感染表“,来自kaggle的公开数据集,记录了全球225个国家或地区从2020-01-22到2022-05-14的关于新冠病毒的确诊情况、活跃案例以及死亡人数等信息,共包含约18万条数据。预览如下:

2.新冠疫苗全球接种情况数据集[2]:以下称“疫苗表“,同样为来自kaggle的公开数据集,包括全球223个国家或地区从2020-12-02到2022-03-29的新冠疫苗的使用及接种信息,共包含约8万条数据。预览如下(省略了部分列):

数据来源:
[1] https://www.kaggle.com/datasets/josephassaker/covid19-global-dataset
[2] https://www.kaggle.com/datasets/gpreda/covid-world-vaccination-progress

四、数据预处理

源代码:./dataset/preprocess.ipynb
本实验的数据预处理主要使用python pandas来完成,且以jupyter notebook的形式完成代码编写和运行,方便查看数据的中间状态和了解处理过程。本实验的数据预处理主要分为以下几个步骤:
1.移除数据表中没有在本次实验中被用到的列,具体来说,即使用pandas drop函数去掉感染表中的8个多余列,以及疫苗表中的1个多余列。以减小数据体积,方便后续处理。
2.将两个表格的日期列进行格式对齐,并调整到相同的时间范围,方便后续分析。在实现上,通过datetime.strptime完成格式对齐,通过pandas unique和apply完成时间范围对齐。
3.将两个表格的国家列中的国家命名进行对齐。如统一对一些国家使用缩写(United Stated,USA),采用统一命名(Cape Verde,Cabo Verde)等,可通过pandas replace函数完成。
4.填充空值。具体来说,首先,对于每日新增案例(daily_new_cases)这种类型的数据空值,直接填充为0;其次,对于累计案例(cumulative_total_cases)这种类型的数据空值,则先将其分别按国家和日期排序,然后将第一次出现的空值填充为0,后续空值则以上一行的值进行填充。
最后写入数据,以上步骤在notebook中分块实现且附有注释,在此省略代码展示。

五、数据上传

在本实验中用到了HDFS,因此,在数据处理前,还需将预处理后的本地数据上传到HDFS中。假设Hadoop环境已经配置完成,且已启动Hadoop并创建用户目录。
接下来,先建立输入文件夹:

hdfs dfs -mkdir input

再分别将两个数据集文件上传到HDFS上:

hdfs dfs -put ./dataset/vaccination/preprocessed/country_vaccinations.csv input
hdfs dfs -put ./dataset/covid19/preprocessed/worldometer_coronavirus_daily_data.csv input

数据上传完成。数据处理后的结果将存放在output文件夹下,实验完成后可使用以下命令查看结果:

hdfs dfs -ls output

另外,由于本实验直接使用PyFlink进行数据处理,因此,不需要建立项目或管理依赖,可直接应该一个.py文件完成。但是,为了确保程序能够正常运行,可能需要作以下配置:
首先,需要配置HADOOP CLASSPATH,否则可能无法访问HDFS:

export HADOOP_CLASSPATH=`hadoop classpath`

其次,可能需要指定python路径,以确保python命令启动的是python3.X。方便起见,可直接使用以下命令完成指定:

sudo apt install python-is-python3

六、数据处理

源代码:./process.py
用法:python process.py -t TASK [--hdfs] [-p PATH]
其中,-t指定需要执行的任务序号,--hdfs表示是否需要从HDFS上读取和写入数据,默认使用本地文件路径,-p指定HDFS项目路径(例如本实验中直接使用用户目录作为项目路径)
本实验共完成6个数据处理任务,分别由6个函数实现。根据不同任务的复杂度和考虑到实现的简单性,混合使用了Flink的Datastream API和Table API。
首先,分别设定StreamExecutionEnvironment和StreamTableEnvironment环境,代码如下:

# Datasteam API
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)

# Table API
env_settings = EnvironmentSettings.in_batch_mode()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

然后,使用Datastream API读取数据集(以感染表为例),代码如下:

# schema for covid-19 csv file
covd_schema = CsvSchema.builder() \
    .add_string_column('date') \
    .add_string_column('country') \
    .add_number_column('cumulative_total_cases', number_type=DataTypes.FLOAT()) \
    .add_number_column('daily_new_cases', number_type=DataTypes.FLOAT()) \
    .add_number_column('cumulative_total_deaths', number_type=DataTypes.FLOAT()) \
    .add_number_column('daily_new_deaths', number_type=DataTypes.FLOAT()) \
    .set_column_separator(',') \
    .set_skip_first_data_row(True) \
    .build()

csv_format = CsvReaderFormat.for_schema(covd_schema)
covd_ds = env.from_source(
    source=FileSource.for_record_stream_format(csv_format, covd_path).build(),
    watermark_strategy=WatermarkStrategy.no_watermarks(),
    source_name='csv-source',
)

另外,在介绍数据处理过程前,先说明本实验的数据输出方法。本实验统一使用Table API中的execute_insert函数进行数据写入,因为我觉得这样比较简单。但是,这样无法将表格头也进行写入(本人没找到合适的方式这么做)。具体代码如下:

def sink_tb(tb, schema, sink_name, path):
    if type(tb) is not Table:
        tb = t_env.from_data_stream(tb)

    t_env.create_temporary_table(
        sink_name, 
        TableDescriptor.for_connector("filesystem")
            .schema(schema)
            .option("path", path)
            .format(FormatDescriptor.for_format("csv")
                .option("field-delimiter", ",")
                .build()
            )
            .build()
    )

    tb.execute_insert(sink_name).wait()  #  fix Multiplexer hanging up
    print('SINKING: ' + sink_name)

任务一:计算疫苗全程接种人数最高的20个国家,及其接种比例。

在这个任务中,我们首先使用map操作将数据列进行缩减,同时将数据类型从Row变为Tuple,方便处理。然后,由于疫苗全程接种人数(people_fully_vaccinated)和接种比例(people_fully_vaccinated_per_hundred)属于累计数据,从而我们只需取每个国家中出现的最大的people_fully_vaccinated值,即可得到该国家的总全程接种人数,且其接种比例也随之得到。在具体实现上,通过key_by和max_by分别完成上述过程。最后,为了得到疫苗全程接种人数最高的20个国家(并从高到低进行排序),就需要用到Table API中的order_by和fetch操作。因此,需要通过StreamTableEnvironment中的from_data_stream函数将Datastream转为Table。最后,在定义好数据格式后,调用已经实现好的sink_tb函数完成数据写入。
具体实现代码如下:
def top20country_fully_vaccinated(vacc_ds: DataStream):
    # country, date, total_vaccinations, people_fully_vaccinated,
    # daily_vaccinations, people_fully_vaccinated_per_hundred, vaccines
    # -> country, people_fully_vaccinated, people_fully_vaccinated_per_hundred
    short_ds = vacc_ds.map(lambda x: (x[0], x[3], x[5]), 
                     output_type=Types.TUPLE([Types.STRING(), Types.FLOAT(), Types.FLOAT()]))

    keyed_ds = short_ds.key_by(lambda x: x[0]).max_by(1)

    keyed_tb = t_env.from_data_stream(keyed_ds)
    res_tb = keyed_tb \
        .order_by(col("f1").desc) \
        .fetch(20) \
        .alias("country", "people_fully_vaccinated", "people_fully_vaccinated_per_hundred")

    # prepare to sink
    schema = Schema.new_builder() \
        .column("country", DataTypes.STRING()) \
        .column("people_fully_vaccinated", DataTypes.FLOAT()) \
        .column("people_fully_vaccinated_per_hundred", DataTypes.FLOAT()) \
        .build()

    sink_tb(res_tb, schema, "Top20TabSink", output_path)

通过执行可视化代码(可视化源代码.ipynb文件中保留了图片输出,可直接打开该文件进行查看,并可进行交互),可到如下结果:

可以看出,中国、印度和美国分别为疫苗接种数最多的三个国家。考虑到中国人数众多,这个结果不会十分意外。但另外可以注意到的是,中国的接种比例也相当之高,远高于印度和美国等其他一些国家。后面的几个国家的接种比例则呈起伏状,与接种人数之间几乎没有直接关系。

任务二:计算最为流行的疫苗接种方案。

这个任务的实现过程比较简单。首先,同样先对数据进行map操作。然后,再根据疫苗方案进行划分并分别对总接种数进行求和,得到该疫苗方案的全球接种数。最后,再进行数据写入。
具体实现代码如下:
def popular_vaccines(vacc_ds: DataStream):
    # country, date, total_vaccinations, people_fully_vaccinated,
    # daily_vaccinations, people_fully_vaccinated_per_hundred, vaccines
    short_ds = vacc_ds.map(lambda x: (x[2], x[6]), 
                     output_type=Types.TUPLE([Types.FLOAT(), Types.STRING()]))

    # total_vaccinations, vaccines
keyed_ds = short_ds.key_by(lambda x: x[1]).sum(0)

    # prepare to sink
    schema = Schema.new_builder() \
        .column("total_vaccinations", DataTypes.FLOAT()) \
        .column("vaccines", DataTypes.STRING()) \
        .build()

    # DataStream.sink_to(FileSink) is an alternative sink method,
    # however, I got some errors when trying it out, so I choose TableSink here.    
    sink_tb(keyed_ds, schema, "PopVaccTabSink", output_path)

通过执行可视化代码,可以得到如下结果:

需要说明的是,为了美观效果,本图中对疫苗名称进行了缩写,疫苗原名称可通过可视化源代码文件进行查看。通过本图可知,使用量最高的疫苗方案为:"CanSino, Sinopharm/Beijing, Sinopharm/Wuhan, Sinovac, ZF2001"(CS,S/B,S/W,S,ZF2001),实际上,该疫苗方案的主要采用国家即为中国。另外,OTHERS表示所有占比小于0.01的疫苗方案,其占比总和与占比第三的疫苗方案相当。

任务三:计算疫苗全程接种人数最高的10个国家的疫苗接种进程。

在这个任务中,我们仍然先对数据进行map映射。然后,使用任务一中得到的top 10的国家信息,分别从数据中筛选对每个国家对应的记录,通过filter操作即可完成。最后,再分别进行数据存入。另外,需要留意的是:1)由于Flink写入的文件名很难具有区分性,无法直接得知输出的是哪个国家的疫苗接种进程数据,因此本实验中将国家列也作为结果一同输出,方便进行后续数据可视化分析;2)本实验中采用了Table API进行数据写入,其中设定的Sink名不能出现空格,否则可能出现意料之外的错误。因此,在本任务实现中使用replace函数对空格进行了替换。
具体实现代码如下:

def vaccination_progress(vacc_ds: DataStream):
    # country, date, total_vaccinations, people_fully_vaccinated,
    # daily_vaccinations, people_fully_vaccinated_per_hundred, vaccines
    # -> country, date, total_vaccinations
    short_ds = vacc_ds.map(lambda x: (x[0], x[1], x[2]), 
                     output_type=Types.TUPLE([Types.STRING(), Types.STRING(), Types.FLOAT()]))

    # prepare to sink
    schema = Schema.new_builder() \
        .column("country", DataTypes.STRING()) \
        .column("date", DataTypes.STRING()) \
        .column("total_vaccinations", DataTypes.FLOAT()) \
        .build()

    # -> country, date, total_vaccinations
    # we only calc top10 countries' vaccination progress here, and the
    # top10 countries can be obtained using 'top20country_fully_vaccinated' function
    # for each top10 countries, calc and sink its vaccination progress
    Top10countries = ["China", "India", "USA", "Brazil", "Indonesia", "Bangladesh", "Pakistan", "Japan", "Mexico", "Vietnam"]
    for country in top10countries:
        country_ds = short_ds.filter(lambda x: x[0] == country)
        sink_tb(
            country_ds, 
            schema, 
            country.replace(' ', '')+"TabSink", 
            output_path+"/vaccination_progress"
        )

通过执行可视化代码,可以得到如下结果:

从上图中可以看到,于2021年3月左右,各国的疫苗接种开始逐步推进,其中,中国在2021年的疫苗接种数上升最快,并仍在不断快速地上升中。其次则为印度,然后为美国等其他国家,他们的疫苗接种进程较长,呈逐步上升趋势。

任务四/五/六:计算疫苗接种情况、新增案例以及死亡案例的变化。

由于任务四、五、六的实现较为类似,在实现复杂度上以任务六为最高。因此,在此以任务六为例统一进行介绍,任务六考虑中国的疫苗接种、新增案例和死亡案例的变化情况。在进行数据处理的过程中,首先分别对疫苗数据和感染数据进行map映射处理。然后,再使用filter操作筛选出中国对应的数据记录,并随之将数据列进行进一步缩减(去除国家列)。接下来,再将Datastream数据转为Table格式,并对数据列进行重命名,方便后续进行合并操作。在得到Table数据后,再根据日期列将两个数据进行合并,得到新的数据表。以上两个步骤分别通过Table API中的join和select操作完成。最后,再次根据日期列对数据进行求和,得到每个月的数据情况,上述操作分别通过group_by、select和sum实现。之后,则可以进行数据写入。
另外,在本任务的实现中,还特别对日期格式进行了处理(rm_str_date_day),即将“年-月-日”格式的数据转换为“年-月”,以方便后续数据可视化并达到更美观的效果。

具体实现代码如下:

def china_daily_vaccination_case_death(vacc_ds: DataStream, covd_ds: DataStream):
    # country, date, total_vaccinations, people_fully_vaccinated,
    # daily_vaccinations, people_fully_vaccinated_per_hundred, vaccines
    # -> country, date, daily_vaccinations
    # -> from daily to monthly (for better visualization)
    vacc_ds = vacc_ds.map(lambda x: (x[0], rm_str_date_day(x[1]), x[4]), output_type=Types.TUPLE([Types.STRING(), Types.STRING(), Types.FLOAT()]))
    # -> date, daily_vaccinations
    vacc_ds = vacc_ds \
        .filter(lambda x: x[0] == "China") \
        .map(lambda x: (x[1], x[2]), 
             output_type=Types.TUPLE([Types.STRING(), Types.FLOAT()]))

    # date, country, cumulative_total_cases, 
    # daily_new_cases, cumulative_total_deaths, daily_new_deaths
    # -> country, date, daily_new_cases, daily_new_deaths
    # -> from daily to monthly (for better visualization)
    covd_ds = covd_ds.map(lambda x: (x[1], rm_str_date_day(x[0]), x[3], x[5]), output_type=Types.TUPLE([Types.STRING(), Types.STRING(), Types.FLOAT(), Types.FLOAT()]))
    # -> date, daily_new_cases, daily_new_deaths
    covd_ds = covd_ds \
        .filter(lambda x: x[0] == "China") \
        .map(lambda x: (x[1], x[2], x[3]), 
             output_type=Types.TUPLE([Types.STRING(), Types.FLOAT(), Types.FLOAT()]))

    # join the two ds (through Table api)
    vacc_tb = t_env.from_data_stream(vacc_ds).alias("a1", "b")
    covd_tb = t_env.from_data_stream(covd_ds).alias("a2", "c", "d")
    joined_tb =  vacc_tb \
        .join(covd_tb, col("a1") == col("a2")) \
        .select(col("a1"), col("b"), col("c"), col("d"))

    # sum monthly vaccinations/new-cases/new-deaths
    res_tb = joined_tb \
        .group_by(col("a1")) \
        .select(col("a1"), col("b").sum, col("c").sum, col("d").sum)
# prepare to sink
    schema = Schema.new_builder() \
        .column("date", DataTypes.STRING()) \
        .column("daily_vaccinations", DataTypes.FLOAT()) \
        .column("daily_new_cases", DataTypes.FLOAT()) \
        .column("daily_new_deaths", DataTypes.FLOAT()) \
        .build()

    sink_tb(res_tb, schema, "ZhDayVaccVirusTabSink", output_path)

通过执行可视化代码,可以得到如下结果:

从上图中可以看到,中国的死亡案例一直维持在极低的水平。在2021年间,疫苗接种情况有明显起伏,新增案例则只是稍有起伏,但未出现无法控制的情况。但从2022年3月左右开始,感染情况出现反弹,呈急速上升趋势,这可能与疫情管理政策逐步放开以及外部输入等因素有关。但是,由于本数据集只包含到2022年3月29日的数据,实际上,通过后续及时、高效的疫苗研发和接种,中国的新冠案例得到有效地控制,并逐步从新冠疫情的影响中恢复过来。
在此,另外附上任务四和任务五的可视化结果图:
任务四(疫苗与病毒):

任务五(全球视角):

七、可视化

源代码:./visualize.ipynb
本实验的可视化源代码编写在.ipynb文件中,这么做是因为考虑到为了方便进行代码阅读和可视化结果的查看,要将.ipynb文件转为.py文件也十分简单,只需将代码块中的代码分别粘贴到.py文件中即可。
本代码默认使用本地文件进行可视化,因此,您可能需要将结果数据从HDFS中拉取到本地:

hdfs dfs -get output ./ouput

另外需要注意的是,对于处理完成的数据结果,其命名不易区分。因此,在进行可视化前,需要对结果文件分别进行重命名(当然,本地./output文件夹中已经提供了处理好的结果文件)。
可视化的实现代码基于python的plotly包实现。

import plotly.graph_objects as go

其可视化步骤主要分为:建立go.Figure,添加绘图类(如Pie、Bar、Scatter等)、更新图片布局(update_layout)、展示图片(show)四部分。各任务的绘图代码较为类似,这里选取任务一的绘图代码进行展示:

# read data
df = pd.read_csv(
    './output/top20country_fully_vaccinated',
    header=None
)

df.columns = ["country", "fully_vaccinations", "vaccination_percentage"]

# helper functions 
def get_multi_line_title(title:str, subtitle:str):
    return f"{title}<br><sub>{subtitle}</sub>"

# plot
top20_df = df.reset_index()
title = get_multi_line_title("People Fully Vaccinated", "Individuals who received the all doses of the vaccines")
xcolumn = "country"
y1column = "fully_vaccinations"
y2column = "vaccination_percentage"
y1label = "Count"
y2label = "Percentage"
colors = "Blugrn"

hovertemplate1 ='<br><b>%{x}</b>'+f'<br><b>{y1label}: </b>'+'%{y}<br><extra></extra>'    
hovertemplate2 ='<br><b>%{x}</b>'+f'<br><b>{y2label}: </b>'+'%{y}<br><extra></extra>'    

fig = go.Figure(
    go.Bar(
        hoverinfo='skip',
        x=top20_df[xcolumn], 
        y=top20_df[y1column], 
        name=y1label,
        hovertemplate = hovertemplate1,
        marker=dict(
            color=top20_df[y1column],
            colorscale=colors,
        ),
    ),
)

fig.add_trace(
    go.Scatter(
        x=top20_df[xcolumn],
        y=top20_df[y2column],
        yaxis="y2",
        mode="lines+markers",
        name=y2label,
        hovertemplate=hovertemplate2,
        marker=dict(color="yellowgreen")
    )
)

fig.update_layout(
    legend=dict(orientation="h"),
    title=title,
    xaxis_title=f"Top 20 {xcolumn.title()}",
    yaxis=dict(
        title=dict(text=y1label),
        side="left",
    ),
    yaxis2=dict(
        title=dict(text=y2label),
        side="right",
        overlaying="y",
    ),
    plot_bgcolor='rgba(0,0,0,0)',
    hovermode="x"
)

fig.show()

八、个人心得

本次实验用到的数据集来自kaggle,均为与新冠疫情有关的数据集。虽然在时间跨度上仍不够大,数据不够新,疫情也已经结束,但我认为其仍有一定的分析价值。我们可以从中回顾全球以及我国的新冠历程,了解到我们通过疫苗和相关政策战胜新冠病毒,并逐渐恢复过来。疫情给我们带来了很大的破坏,但也让我们从中学到了很多,也让我们更加坚信现代医学的力量和我们国家的凝聚力与号召力。
关于本实验所使用的大数据处理框架Flink,我之前是没有用过的。因此,为了完成本次实验,我看了不少Flink相关教程,主要的参考资料还是apache官方的Flink使用文档,以及PyFlink API Reference文档。当然,在可视化上我也参考了一篇kaggle博客。我发现,Flink的功能主要由Datastream API和Table API实现,而这二者又可以相互转换。Datastream API提供了非常丰富和复杂的功能,我最喜欢咏的是其map、filter和keyby算子,但是,对于一些特别的功能,我发现使用Table API处理起来则更便捷,如排序(orderby)和组合(join)。Datastream API可以使用print函数打印数据,而Table API则只能使用print_schema函数打印表样式,这两个函数在我调试的时候帮到了我很多。另外,由于我选择的是Flink Python API来实现数据处理,其现有的博客资料相较于其他两种语言(Java、Scala)要少很多,因此我会先了解某个功能的Java实现,再查询PyFlink API Reference中的相关API来转换为Python实现。有趣的是,在这个过程中我还偶然发现了一个文档例子中的小错误,但这并不影响对其的理解。总之,学习Flink并完成本实验对我来说还是很有意义的。
我编写的源代码中使用的均为英文注释,没有经过机器翻译,因此很可能出现一些用词错误,希望不会影响阅读。另外,对Flink的学习和使用中,我也碰到了不少问题,其中最后仍没有解决的问题包括:如何输出表格头到文件中、对于任务三的实现仍比较笨(理想结果是重新构建一个以日期为第一列,各个国家为其余列的新表)、写入的文件仍需要手动重命名、以及意料之外的xxx type not support currently错误。
当然,我目前对Flink的一些理解可能还存在一些错误,只是可能我的想法恰好奏效罢了,以后会更加深入地学习。