【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学计算机科学与技术系2023级研究生 黄邦
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
时间:2024年6月
相关教材:林子雨编著《Flink编程基础(Java版)》(访问教材官网)
相关案例:Flink大数据处理分析案例集锦
本案例使用了Kaggle上的全球疫苗接种数据集,在Ubuntu虚拟机上使用Flink框架完成数据处理,使用Python语言实现,最后使用Python plotly包完成数据可视化。
数据集和代码下载:从百度网盘下载本案例的代码和数据集。(提取码是ziyu)
一、实验介绍
本实验使用了kaggle上的公开数据集,在Ubuntu虚拟机上使用Flink框架完成数据处理,使用Python语言实现,最后使用Python plotly包完成数据可视化。
在本实验中,分别完成了以下数据分析任务:
1.分析疫苗全程接种人数最高的20个国家,及其接种比例,并以柱状图和折线图的方式展现。
2.分析最为流行的疫苗接种方案,并以饼状图的形式进行展现。
3.以疫苗全程接种人数最高的10个国家为代表,分析全球疫苗接种进程,并以曲线图的方式展现。
4.分析全球疫苗接种、新增案例的变化情况,并以柱状图堆叠的方式展现。
5.分析全球疫苗累计接种、累计案例以及累计死亡病例的变化情况,并以柱状图和折线图的方式展现。
6.分析中国的疫苗接种、新增案例和死亡病例的变化情况,并以曲线图的方式展现。
二、实验环境
三、数据集
1.新冠病毒全球感染情况数据集[1]:以下称“感染表“,来自kaggle的公开数据集,记录了全球225个国家或地区从2020-01-22到2022-05-14的关于新冠病毒的确诊情况、活跃案例以及死亡人数等信息,共包含约18万条数据。预览如下:
2.新冠疫苗全球接种情况数据集[2]:以下称“疫苗表“,同样为来自kaggle的公开数据集,包括全球223个国家或地区从2020-12-02到2022-03-29的新冠疫苗的使用及接种信息,共包含约8万条数据。预览如下(省略了部分列):
数据来源:
[1] https://www.kaggle.com/datasets/josephassaker/covid19-global-dataset
[2] https://www.kaggle.com/datasets/gpreda/covid-world-vaccination-progress
四、数据预处理
源代码:./dataset/preprocess.ipynb
本实验的数据预处理主要使用python pandas来完成,且以jupyter notebook的形式完成代码编写和运行,方便查看数据的中间状态和了解处理过程。本实验的数据预处理主要分为以下几个步骤:
1.移除数据表中没有在本次实验中被用到的列,具体来说,即使用pandas drop函数去掉感染表中的8个多余列,以及疫苗表中的1个多余列。以减小数据体积,方便后续处理。
2.将两个表格的日期列进行格式对齐,并调整到相同的时间范围,方便后续分析。在实现上,通过datetime.strptime完成格式对齐,通过pandas unique和apply完成时间范围对齐。
3.将两个表格的国家列中的国家命名进行对齐。如统一对一些国家使用缩写(United Stated,USA),采用统一命名(Cape Verde,Cabo Verde)等,可通过pandas replace函数完成。
4.填充空值。具体来说,首先,对于每日新增案例(daily_new_cases)这种类型的数据空值,直接填充为0;其次,对于累计案例(cumulative_total_cases)这种类型的数据空值,则先将其分别按国家和日期排序,然后将第一次出现的空值填充为0,后续空值则以上一行的值进行填充。
最后写入数据,以上步骤在notebook中分块实现且附有注释,在此省略代码展示。
五、数据上传
在本实验中用到了HDFS,因此,在数据处理前,还需将预处理后的本地数据上传到HDFS中。假设Hadoop环境已经配置完成,且已启动Hadoop并创建用户目录。
接下来,先建立输入文件夹:
hdfs dfs -mkdir input
再分别将两个数据集文件上传到HDFS上:
hdfs dfs -put ./dataset/vaccination/preprocessed/country_vaccinations.csv input
hdfs dfs -put ./dataset/covid19/preprocessed/worldometer_coronavirus_daily_data.csv input
数据上传完成。数据处理后的结果将存放在output文件夹下,实验完成后可使用以下命令查看结果:
hdfs dfs -ls output
另外,由于本实验直接使用PyFlink进行数据处理,因此,不需要建立项目或管理依赖,可直接应该一个.py文件完成。但是,为了确保程序能够正常运行,可能需要作以下配置:
首先,需要配置HADOOP CLASSPATH,否则可能无法访问HDFS:
export HADOOP_CLASSPATH=`hadoop classpath`
其次,可能需要指定python路径,以确保python命令启动的是python3.X。方便起见,可直接使用以下命令完成指定:
sudo apt install python-is-python3
六、数据处理
源代码:./process.py
用法:python process.py -t TASK [--hdfs] [-p PATH]
其中,-t指定需要执行的任务序号,--hdfs表示是否需要从HDFS上读取和写入数据,默认使用本地文件路径,-p指定HDFS项目路径(例如本实验中直接使用用户目录作为项目路径)
本实验共完成6个数据处理任务,分别由6个函数实现。根据不同任务的复杂度和考虑到实现的简单性,混合使用了Flink的Datastream API和Table API。
首先,分别设定StreamExecutionEnvironment和StreamTableEnvironment环境,代码如下:
# Datasteam API
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)
# Table API
env_settings = EnvironmentSettings.in_batch_mode()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
然后,使用Datastream API读取数据集(以感染表为例),代码如下:
# schema for covid-19 csv file
covd_schema = CsvSchema.builder() \
.add_string_column('date') \
.add_string_column('country') \
.add_number_column('cumulative_total_cases', number_type=DataTypes.FLOAT()) \
.add_number_column('daily_new_cases', number_type=DataTypes.FLOAT()) \
.add_number_column('cumulative_total_deaths', number_type=DataTypes.FLOAT()) \
.add_number_column('daily_new_deaths', number_type=DataTypes.FLOAT()) \
.set_column_separator(',') \
.set_skip_first_data_row(True) \
.build()
csv_format = CsvReaderFormat.for_schema(covd_schema)
covd_ds = env.from_source(
source=FileSource.for_record_stream_format(csv_format, covd_path).build(),
watermark_strategy=WatermarkStrategy.no_watermarks(),
source_name='csv-source',
)
另外,在介绍数据处理过程前,先说明本实验的数据输出方法。本实验统一使用Table API中的execute_insert函数进行数据写入,因为我觉得这样比较简单。但是,这样无法将表格头也进行写入(本人没找到合适的方式这么做)。具体代码如下:
def sink_tb(tb, schema, sink_name, path):
if type(tb) is not Table:
tb = t_env.from_data_stream(tb)
t_env.create_temporary_table(
sink_name,
TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", path)
.format(FormatDescriptor.for_format("csv")
.option("field-delimiter", ",")
.build()
)
.build()
)
tb.execute_insert(sink_name).wait() # fix Multiplexer hanging up
print('SINKING: ' + sink_name)
任务一:计算疫苗全程接种人数最高的20个国家,及其接种比例。
在这个任务中,我们首先使用map操作将数据列进行缩减,同时将数据类型从Row变为Tuple,方便处理。然后,由于疫苗全程接种人数(people_fully_vaccinated)和接种比例(people_fully_vaccinated_per_hundred)属于累计数据,从而我们只需取每个国家中出现的最大的people_fully_vaccinated值,即可得到该国家的总全程接种人数,且其接种比例也随之得到。在具体实现上,通过key_by和max_by分别完成上述过程。最后,为了得到疫苗全程接种人数最高的20个国家(并从高到低进行排序),就需要用到Table API中的order_by和fetch操作。因此,需要通过StreamTableEnvironment中的from_data_stream函数将Datastream转为Table。最后,在定义好数据格式后,调用已经实现好的sink_tb函数完成数据写入。
具体实现代码如下:
def top20country_fully_vaccinated(vacc_ds: DataStream):
# country, date, total_vaccinations, people_fully_vaccinated,
# daily_vaccinations, people_fully_vaccinated_per_hundred, vaccines
# -> country, people_fully_vaccinated, people_fully_vaccinated_per_hundred
short_ds = vacc_ds.map(lambda x: (x[0], x[3], x[5]),
output_type=Types.TUPLE([Types.STRING(), Types.FLOAT(), Types.FLOAT()]))
keyed_ds = short_ds.key_by(lambda x: x[0]).max_by(1)
keyed_tb = t_env.from_data_stream(keyed_ds)
res_tb = keyed_tb \
.order_by(col("f1").desc) \
.fetch(20) \
.alias("country", "people_fully_vaccinated", "people_fully_vaccinated_per_hundred")
# prepare to sink
schema = Schema.new_builder() \
.column("country", DataTypes.STRING()) \
.column("people_fully_vaccinated", DataTypes.FLOAT()) \
.column("people_fully_vaccinated_per_hundred", DataTypes.FLOAT()) \
.build()
sink_tb(res_tb, schema, "Top20TabSink", output_path)
通过执行可视化代码(可视化源代码.ipynb文件中保留了图片输出,可直接打开该文件进行查看,并可进行交互),可到如下结果:
可以看出,中国、印度和美国分别为疫苗接种数最多的三个国家。考虑到中国人数众多,这个结果不会十分意外。但另外可以注意到的是,中国的接种比例也相当之高,远高于印度和美国等其他一些国家。后面的几个国家的接种比例则呈起伏状,与接种人数之间几乎没有直接关系。
任务二:计算最为流行的疫苗接种方案。
这个任务的实现过程比较简单。首先,同样先对数据进行map操作。然后,再根据疫苗方案进行划分并分别对总接种数进行求和,得到该疫苗方案的全球接种数。最后,再进行数据写入。
具体实现代码如下:
def popular_vaccines(vacc_ds: DataStream):
# country, date, total_vaccinations, people_fully_vaccinated,
# daily_vaccinations, people_fully_vaccinated_per_hundred, vaccines
short_ds = vacc_ds.map(lambda x: (x[2], x[6]),
output_type=Types.TUPLE([Types.FLOAT(), Types.STRING()]))
# total_vaccinations, vaccines
keyed_ds = short_ds.key_by(lambda x: x[1]).sum(0)
# prepare to sink
schema = Schema.new_builder() \
.column("total_vaccinations", DataTypes.FLOAT()) \
.column("vaccines", DataTypes.STRING()) \
.build()
# DataStream.sink_to(FileSink) is an alternative sink method,
# however, I got some errors when trying it out, so I choose TableSink here.
sink_tb(keyed_ds, schema, "PopVaccTabSink", output_path)
通过执行可视化代码,可以得到如下结果:
需要说明的是,为了美观效果,本图中对疫苗名称进行了缩写,疫苗原名称可通过可视化源代码文件进行查看。通过本图可知,使用量最高的疫苗方案为:"CanSino, Sinopharm/Beijing, Sinopharm/Wuhan, Sinovac, ZF2001"(CS,S/B,S/W,S,ZF2001),实际上,该疫苗方案的主要采用国家即为中国。另外,OTHERS表示所有占比小于0.01的疫苗方案,其占比总和与占比第三的疫苗方案相当。
任务三:计算疫苗全程接种人数最高的10个国家的疫苗接种进程。
在这个任务中,我们仍然先对数据进行map映射。然后,使用任务一中得到的top 10的国家信息,分别从数据中筛选对每个国家对应的记录,通过filter操作即可完成。最后,再分别进行数据存入。另外,需要留意的是:1)由于Flink写入的文件名很难具有区分性,无法直接得知输出的是哪个国家的疫苗接种进程数据,因此本实验中将国家列也作为结果一同输出,方便进行后续数据可视化分析;2)本实验中采用了Table API进行数据写入,其中设定的Sink名不能出现空格,否则可能出现意料之外的错误。因此,在本任务实现中使用replace函数对空格进行了替换。
具体实现代码如下:
def vaccination_progress(vacc_ds: DataStream):
# country, date, total_vaccinations, people_fully_vaccinated,
# daily_vaccinations, people_fully_vaccinated_per_hundred, vaccines
# -> country, date, total_vaccinations
short_ds = vacc_ds.map(lambda x: (x[0], x[1], x[2]),
output_type=Types.TUPLE([Types.STRING(), Types.STRING(), Types.FLOAT()]))
# prepare to sink
schema = Schema.new_builder() \
.column("country", DataTypes.STRING()) \
.column("date", DataTypes.STRING()) \
.column("total_vaccinations", DataTypes.FLOAT()) \
.build()
# -> country, date, total_vaccinations
# we only calc top10 countries' vaccination progress here, and the
# top10 countries can be obtained using 'top20country_fully_vaccinated' function
# for each top10 countries, calc and sink its vaccination progress
Top10countries = ["China", "India", "USA", "Brazil", "Indonesia", "Bangladesh", "Pakistan", "Japan", "Mexico", "Vietnam"]
for country in top10countries:
country_ds = short_ds.filter(lambda x: x[0] == country)
sink_tb(
country_ds,
schema,
country.replace(' ', '')+"TabSink",
output_path+"/vaccination_progress"
)
通过执行可视化代码,可以得到如下结果:
从上图中可以看到,于2021年3月左右,各国的疫苗接种开始逐步推进,其中,中国在2021年的疫苗接种数上升最快,并仍在不断快速地上升中。其次则为印度,然后为美国等其他国家,他们的疫苗接种进程较长,呈逐步上升趋势。
任务四/五/六:计算疫苗接种情况、新增案例以及死亡案例的变化。
由于任务四、五、六的实现较为类似,在实现复杂度上以任务六为最高。因此,在此以任务六为例统一进行介绍,任务六考虑中国的疫苗接种、新增案例和死亡案例的变化情况。在进行数据处理的过程中,首先分别对疫苗数据和感染数据进行map映射处理。然后,再使用filter操作筛选出中国对应的数据记录,并随之将数据列进行进一步缩减(去除国家列)。接下来,再将Datastream数据转为Table格式,并对数据列进行重命名,方便后续进行合并操作。在得到Table数据后,再根据日期列将两个数据进行合并,得到新的数据表。以上两个步骤分别通过Table API中的join和select操作完成。最后,再次根据日期列对数据进行求和,得到每个月的数据情况,上述操作分别通过group_by、select和sum实现。之后,则可以进行数据写入。
另外,在本任务的实现中,还特别对日期格式进行了处理(rm_str_date_day),即将“年-月-日”格式的数据转换为“年-月”,以方便后续数据可视化并达到更美观的效果。
具体实现代码如下:
def china_daily_vaccination_case_death(vacc_ds: DataStream, covd_ds: DataStream):
# country, date, total_vaccinations, people_fully_vaccinated,
# daily_vaccinations, people_fully_vaccinated_per_hundred, vaccines
# -> country, date, daily_vaccinations
# -> from daily to monthly (for better visualization)
vacc_ds = vacc_ds.map(lambda x: (x[0], rm_str_date_day(x[1]), x[4]), output_type=Types.TUPLE([Types.STRING(), Types.STRING(), Types.FLOAT()]))
# -> date, daily_vaccinations
vacc_ds = vacc_ds \
.filter(lambda x: x[0] == "China") \
.map(lambda x: (x[1], x[2]),
output_type=Types.TUPLE([Types.STRING(), Types.FLOAT()]))
# date, country, cumulative_total_cases,
# daily_new_cases, cumulative_total_deaths, daily_new_deaths
# -> country, date, daily_new_cases, daily_new_deaths
# -> from daily to monthly (for better visualization)
covd_ds = covd_ds.map(lambda x: (x[1], rm_str_date_day(x[0]), x[3], x[5]), output_type=Types.TUPLE([Types.STRING(), Types.STRING(), Types.FLOAT(), Types.FLOAT()]))
# -> date, daily_new_cases, daily_new_deaths
covd_ds = covd_ds \
.filter(lambda x: x[0] == "China") \
.map(lambda x: (x[1], x[2], x[3]),
output_type=Types.TUPLE([Types.STRING(), Types.FLOAT(), Types.FLOAT()]))
# join the two ds (through Table api)
vacc_tb = t_env.from_data_stream(vacc_ds).alias("a1", "b")
covd_tb = t_env.from_data_stream(covd_ds).alias("a2", "c", "d")
joined_tb = vacc_tb \
.join(covd_tb, col("a1") == col("a2")) \
.select(col("a1"), col("b"), col("c"), col("d"))
# sum monthly vaccinations/new-cases/new-deaths
res_tb = joined_tb \
.group_by(col("a1")) \
.select(col("a1"), col("b").sum, col("c").sum, col("d").sum)
# prepare to sink
schema = Schema.new_builder() \
.column("date", DataTypes.STRING()) \
.column("daily_vaccinations", DataTypes.FLOAT()) \
.column("daily_new_cases", DataTypes.FLOAT()) \
.column("daily_new_deaths", DataTypes.FLOAT()) \
.build()
sink_tb(res_tb, schema, "ZhDayVaccVirusTabSink", output_path)
通过执行可视化代码,可以得到如下结果:
从上图中可以看到,中国的死亡案例一直维持在极低的水平。在2021年间,疫苗接种情况有明显起伏,新增案例则只是稍有起伏,但未出现无法控制的情况。但从2022年3月左右开始,感染情况出现反弹,呈急速上升趋势,这可能与疫情管理政策逐步放开以及外部输入等因素有关。但是,由于本数据集只包含到2022年3月29日的数据,实际上,通过后续及时、高效的疫苗研发和接种,中国的新冠案例得到有效地控制,并逐步从新冠疫情的影响中恢复过来。
在此,另外附上任务四和任务五的可视化结果图:
任务四(疫苗与病毒):
任务五(全球视角):
七、可视化
源代码:./visualize.ipynb
本实验的可视化源代码编写在.ipynb文件中,这么做是因为考虑到为了方便进行代码阅读和可视化结果的查看,要将.ipynb文件转为.py文件也十分简单,只需将代码块中的代码分别粘贴到.py文件中即可。
本代码默认使用本地文件进行可视化,因此,您可能需要将结果数据从HDFS中拉取到本地:
hdfs dfs -get output ./ouput
另外需要注意的是,对于处理完成的数据结果,其命名不易区分。因此,在进行可视化前,需要对结果文件分别进行重命名(当然,本地./output文件夹中已经提供了处理好的结果文件)。
可视化的实现代码基于python的plotly包实现。
import plotly.graph_objects as go
其可视化步骤主要分为:建立go.Figure,添加绘图类(如Pie、Bar、Scatter等)、更新图片布局(update_layout)、展示图片(show)四部分。各任务的绘图代码较为类似,这里选取任务一的绘图代码进行展示:
# read data
df = pd.read_csv(
'./output/top20country_fully_vaccinated',
header=None
)
df.columns = ["country", "fully_vaccinations", "vaccination_percentage"]
# helper functions
def get_multi_line_title(title:str, subtitle:str):
return f"{title}<br><sub>{subtitle}</sub>"
# plot
top20_df = df.reset_index()
title = get_multi_line_title("People Fully Vaccinated", "Individuals who received the all doses of the vaccines")
xcolumn = "country"
y1column = "fully_vaccinations"
y2column = "vaccination_percentage"
y1label = "Count"
y2label = "Percentage"
colors = "Blugrn"
hovertemplate1 ='<br><b>%{x}</b>'+f'<br><b>{y1label}: </b>'+'%{y}<br><extra></extra>'
hovertemplate2 ='<br><b>%{x}</b>'+f'<br><b>{y2label}: </b>'+'%{y}<br><extra></extra>'
fig = go.Figure(
go.Bar(
hoverinfo='skip',
x=top20_df[xcolumn],
y=top20_df[y1column],
name=y1label,
hovertemplate = hovertemplate1,
marker=dict(
color=top20_df[y1column],
colorscale=colors,
),
),
)
fig.add_trace(
go.Scatter(
x=top20_df[xcolumn],
y=top20_df[y2column],
yaxis="y2",
mode="lines+markers",
name=y2label,
hovertemplate=hovertemplate2,
marker=dict(color="yellowgreen")
)
)
fig.update_layout(
legend=dict(orientation="h"),
title=title,
xaxis_title=f"Top 20 {xcolumn.title()}",
yaxis=dict(
title=dict(text=y1label),
side="left",
),
yaxis2=dict(
title=dict(text=y2label),
side="right",
overlaying="y",
),
plot_bgcolor='rgba(0,0,0,0)',
hovermode="x"
)
fig.show()
八、个人心得
本次实验用到的数据集来自kaggle,均为与新冠疫情有关的数据集。虽然在时间跨度上仍不够大,数据不够新,疫情也已经结束,但我认为其仍有一定的分析价值。我们可以从中回顾全球以及我国的新冠历程,了解到我们通过疫苗和相关政策战胜新冠病毒,并逐渐恢复过来。疫情给我们带来了很大的破坏,但也让我们从中学到了很多,也让我们更加坚信现代医学的力量和我们国家的凝聚力与号召力。
关于本实验所使用的大数据处理框架Flink,我之前是没有用过的。因此,为了完成本次实验,我看了不少Flink相关教程,主要的参考资料还是apache官方的Flink使用文档,以及PyFlink API Reference文档。当然,在可视化上我也参考了一篇kaggle博客。我发现,Flink的功能主要由Datastream API和Table API实现,而这二者又可以相互转换。Datastream API提供了非常丰富和复杂的功能,我最喜欢咏的是其map、filter和keyby算子,但是,对于一些特别的功能,我发现使用Table API处理起来则更便捷,如排序(orderby)和组合(join)。Datastream API可以使用print函数打印数据,而Table API则只能使用print_schema函数打印表样式,这两个函数在我调试的时候帮到了我很多。另外,由于我选择的是Flink Python API来实现数据处理,其现有的博客资料相较于其他两种语言(Java、Scala)要少很多,因此我会先了解某个功能的Java实现,再查询PyFlink API Reference中的相关API来转换为Python实现。有趣的是,在这个过程中我还偶然发现了一个文档例子中的小错误,但这并不影响对其的理解。总之,学习Flink并完成本实验对我来说还是很有意义的。
我编写的源代码中使用的均为英文注释,没有经过机器翻译,因此很可能出现一些用词错误,希望不会影响阅读。另外,对Flink的学习和使用中,我也碰到了不少问题,其中最后仍没有解决的问题包括:如何输出表格头到文件中、对于任务三的实现仍比较笨(理想结果是重新构建一个以日期为第一列,各个国家为其余列的新表)、写入的文件仍需要手动重命名、以及意料之外的xxx type not support currently错误。
当然,我目前对Flink的一些理解可能还存在一些错误,只是可能我的想法恰好奏效罢了,以后会更加深入地学习。