【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学计算机科学与技术系2023级研究生 亢恒越
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
时间:2024年6月
相关教材:林子雨编著《Flink编程基础(Java版)》(访问教材官网)
相关案例:Flink大数据处理分析案例集锦
本案例用Python语言编写了Flink程序,对Spodify数据集进行了数据处理和分析,之后对分析结果使用Pyecharts进行了可视化,并分别保存为HTML和PNG格式。
数据集和代码下载:从百度网盘下载本案例的代码和数据集。(提取码是ziyu)
一、实验环境
本次实验采用的环境和软件版本如下:
•Ubuntu 22.04
•VSCode: 1.89.1
•Hadoop: 3.3.5
•Flink(PyFlink): 1.17.0
•Python: 3.9
•JDK: 1.8
•Anaconda: 23.7.4
首先创建conda环境,名为bigdata,并在创建后启动该环境(在每次重新启动终端后,都需要启动该环境):
yes y | conda create --name bigdata python=3.9
conda activate bigdata
接着通过以下方式安装相关的依赖和包:
pip install -r requirements.txt
二、数据集
1.数据集介绍
本次实验使用了kaggle中的Most Streamed Spotify Songs 2023数据集,该数据集包含了2023年Spotify上最著名的歌曲的信息,包括歌曲名称、艺术家名称、发行日期、在Spotify、Apple Music和Deezer等平台的播放列表和榜单包含情况以及各种音频特征等共24种信息。具体的字段有:
•track_name:歌曲名称
•artist(s)_name:艺术家名称
•artist_count: 艺术家数量
•released_year:发行年份
•releasedmonth:发行月份
•...
•instrumentalness%:歌曲中器乐内容比例
•liveness%:现场表演元素的比例
•speechiness%:歌曲中说出内容比例
完整的字段信息在数据集下载页面有详细介绍,数据集保存的相对路径为:./data/spotify-2023.csv
2.数据集预处理
相关代码在data_preprocess.ipynb文件中。
首先使用pandas读取原数据集,并查看其信息:
共有953个表项和24个字段。
为便于后续处理和分析,接着通过如下代码将原本为object类型的列转化为数值:
# Convert columns of type object to numeric value
convert_cols=['streams','in_deezer_playlists']
for col in df.columns:
if col in convert_cols:
df[col]=pd.to_numeric(df[col], errors='coerce')
df.info()
对于空值和重复值,通过如下代码进行统计:
na_cnt = df.isna().sum()
print(f"Duplicated rows: {df.duplicated().sum()}")
print("Missing Value Counts in Each Column:")
print(na_cnt)
可以看到,整个数据集中没有重复的行,但有4个字段中有空值。
之后删除这四个字段为空的所有表项,并保留后续分析需要的18个字段,剩余751行:
df.dropna(subset=['key','in_shazam_charts','streams','in_deezer_playlists'],inplace=True)
print(df.isna().sum())
save_cols=['track_name', 'artist(s)_name','in_spotify_playlists',
'in_spotify_charts', 'streams', 'in_apple_playlists', 'in_apple_charts',
'in_deezer_playlists', 'in_deezer_charts','bpm',
'key','danceability_%', 'valence_%', 'energy_%',
'acousticness_%', 'instrumentalness_%', 'liveness_%', 'speechiness_%']
df= df[save_cols]
df.info()
之后保存预处理后的数据集,相对路径为:./data/spotify-cleaned.csv。
最后将该文件上传到HDFS中:第一步运行以下脚本启动HDFS。
bash start-hadoop.sh
若正常启动,则可以看到如下输出:
其具体代码为:
set -e
HADOOP_HOME="/usr/local/hadoop"
pushd ${HADOOP_HOME}
./sbin/start-dfs.sh
popd
jps
其中HADOOP_HOME指定了hadoop的安装路径,如果有不同,需要根据安装情况替换。
接着运行以下脚本将预处理后的数据集上传到 HDFS 中:
bash save-to-hdfs.sh
若成功将文件上传到HDFS,可以看到如下输出:
其具体代码为:
set -e
# path of hadoop installation
hadoop_path="/usr/local/hadoop"
pushd ${hadoop_path}
./bin/hdfs dfs -mkdir -p data
popd
${hadoop_path}/bin/hdfs dfs -put data/spotify-cleaned.csv data
${hadoop_path}/bin/hdfs dfs -ls data
同理,需要根据安装情况替换hadoop的安装路径。
三、使用Flink对数据进行处理和分析
首先运行如下代码,导入环境变量,使得Pyflink可以读取HDFS中的文件(注意,每次终端重启后,都要运行该脚本):
source prepare-env.sh
其具体代码为:
同样的,需要根据安装情况替换hadoop安装路径,并导入HADOOP_CLASSPATH这个环境变量。
所有数据分析相关的代码位于data_process.py文件中。
1.环境创建和数据读取
在代码中,首先创建执行环境:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)
settings = EnvironmentSettings.new_instance().in_batch_mode().build()
t_env = StreamTableEnvironment.create(env, settings)
t_env.get_config().set("parallelism.default", "1")
接着根据数据集字段的名称和格式,创建CsvSchema,便于后续读取数据集:
# define the source
schema = csv.CsvSchema.builder() \
.add_string_column('track_name') \
.add_string_column('artist(s)_name') \
.add_number_column('in_spotify_playlists', number_type=DataTypes.BIGINT()) \
.add_number_column('in_spotify_charts', number_type=DataTypes.BIGINT()) \
.add_number_column('streams', number_type=DataTypes.DOUBLE()) \
.add_number_column('in_apple_playlists', number_type=DataTypes.BIGINT()) \
.add_number_column('in_apple_charts', number_type=DataTypes.BIGINT()) \
.add_number_column('in_deezer_playlists', number_type=DataTypes.DOUBLE()) \
.add_number_column('in_deezer_charts', number_type=DataTypes.BIGINT()) \
.add_number_column('bpm', number_type=DataTypes.BIGINT()) \
.add_string_column('key') \
.add_number_column('danceability_%', number_type=DataTypes.BIGINT()) \
.add_number_column('valence_%', number_type=DataTypes.BIGINT()) \
.add_number_column('energy_%', number_type=DataTypes.BIGINT()) \
.add_number_column('acousticness_%', number_type=DataTypes.BIGINT()) \
.add_number_column('instrumentalness_%', number_type=DataTypes.BIGINT()) \
.add_number_column('liveness_%', number_type=DataTypes.BIGINT()) \
.add_number_column('speechiness_%', number_type=DataTypes.BIGINT()) \
.set_use_header() \
.set_strict_headers() \
.build()
接下来从HDFS中读取该数据集,并使用Table类型保存:
ds = env.from_source(
source=FileSource.for_record_stream_format(csv.CsvReaderFormat.for_schema(schema),
hdfs_data_path).build(),
watermark_strategy=WatermarkStrategy.no_watermarks(),
source_name="data_source"
)
assert (ds is not None)
# get table from data stream
table = t_env.from_data_stream(ds)
其中文件的路径hdfs_data_path为:
hdfs://localhost:9000/user/hadoop/data/spotify-cleaned.csv
2.数据分析
本次对数据集的分析包括下列几个方面:
•统计播放量最高的10位歌手的播放总量和歌曲数量,保存格式为[歌手名称,播放总量,歌曲数量]。
•统计播放量最高的10位歌手的所有歌曲的平均bpm、danceability、能量级别、原声量、乐器内容量、现场表演元素量,保存格式为[歌手名称,平均bpm,danceability,能量级别,原声量,乐器内容量,现场表演元素量]。
•统计播放量最高的50位歌手在不同平台中的得分,并按照歌手的总播放量排序,其中每个平台的得分计算方式为在该平台播放列表的次数与在该平台榜单的排名之商,例如在Spotify平台的计算方式为:in_spotify_playlists/in_spotify_chart,因为播放列表的次数越高与排名越靠前表面该歌曲更受欢迎。保存格式为[歌手名称,在spotify得分,在apple得分,在deezer得分]。
•统计播放量最高的100首歌曲的音调分布,保存格式为[音调,该音调数量]。
•统计播放量最高的10首歌曲的属性,包括danceability、 积极性、能量级别、原声量、乐器内容量,现场表演元素量和口语量,保存格式为[歌曲名称,danceability,积极性,能量级别,原声量,乐器内容量,现场表演元素量,口语量]
代码分别位于如下的五个函数中:
save_top10_artists_streams()
save_top10_artists_avg_properties()
save_top50_artists_scores()
save_top100_songs_key()
save_top10_songs_properties()
(1)播放量最高的10位歌手的播放总量和歌曲数量
使用如下代码首先选出不同的歌手并统计其歌曲数量和播放总量,按照播放总量排序后获取前10行数据:
artist_stats = table.group_by(col('artist(s)_name')) \
.select(col('artist(s)_name'),
col('streams').sum.alias('total_streams'),
col('artist(s)_name').count.alias('total_songs'))
top10_artists_stats = artist_stats.order_by(col('total_streams').desc)\
.limit(10)
df = top10_artists_stats.to_pandas()
print(df)
df.to_csv('./result/top10_artists_streams.csv', index=False)
结果保存在result文件夹中的top10_artists_streams.csv文件中。
(2)播放量最高的10位歌手的所有歌曲的平均属性
首先使用如下代码选出播放量最高的10位歌手:
total_streams_per_artist = table.group_by(col("artist(s)_name")) \
.select(col("artist(s)_name").alias('top_10_artists.artist(s)_name'), col("streams").sum.alias("total_streams"))
top_10_artists = total_streams_per_artist.order_by(col("total_streams").desc) \
.limit(10)
接着选出这10位歌手的所有歌曲:
top_10_artists_songs = top_10_artists.join(table) \
.where(col("top_10_artists.artist(s)_name")
== col("artist(s)_name")) \
.select(col("artist(s)_name"),
col("bpm"),
col("danceability_%"),
col("energy_%"),
col("acousticness_%"),
col("instrumentalness_%"),
col("liveness_%"), col("streams"))
最后计算这些歌曲的平均性质,并按照播放量对歌手进行排序:
average_metrics = top_10_artists_songs.group_by(col("artist(s)_name")) \
.select(col("artist(s)_name"),
col("bpm").avg.alias("avg_bpm"),
col("danceability_%").avg.alias("avg_danceability"),
col("energy_%").avg.alias("avg_energy"),
col("acousticness_%").avg.alias("avg_acousticness"),
col("instrumentalness_%").avg.alias("avg_instrumentalness"),
col("liveness_%").avg.alias("avg_liveness"),
col("streams").sum.alias("total_streams"))\
.order_by(col("total_streams").desc)
average_metrics = average_metrics.drop_columns(col("total_streams"))
df = average_metrics.to_pandas()
print(df)
df.to_csv('./result/top10_artists_avg_properties.csv', index=False)
结果保存在result文件夹中的top10_artists_avg_properties.csv文件中。
(3)播放量最高的50位歌手在不同平台中的得分
首先通过如下代码计算得到每首歌在不同平台中的得分,计算方法如前所述:
songs_with_scores = t_env.sql_query(
"""WITH scored_songs AS (
SELECT
track_name,
`artist(s)_name`,
in_spotify_playlists / NULLIF(in_spotify_charts, 0) AS spotify_score,
in_apple_playlists / NULLIF(in_apple_charts, 0) AS apple_score,
in_deezer_playlists / NULLIF(in_deezer_charts, 0) AS deezer_score,
streams
FROM %s)
SELECT
track_name,
`artist(s)_name`,
spotify_score,
apple_score,
deezer_score,
streams
FROM scored_songs
WHERE spotify_score IS NOT NULL
AND apple_score IS NOT NULL
AND deezer_score IS NOT NULL""" % table)
接着将这些歌曲按照歌手进行分组,计算每位歌手在每个平台的总分,并按照该歌手的播放量排序:
top50_artist_scores = songs_with_scores \
.group_by(col('artist(s)_name')) \
.select(
col('artist(s)_name'),
col('spotify_score').sum.alias('total_spotify_score'),
col('apple_score').sum.alias('total_apple_score'),
col('deezer_score').sum.alias('total_deezer_score'),
col('streams').sum.alias('total_streams')
) \
.order_by(col('total_streams').desc) \
.limit(50)
top50_artist_scores = top50_artist_scores.drop_columns(
col('total_streams'))
df = top50_artist_scores.to_pandas()
print(df)
df.to_csv('./result/top50_artists_scores.csv', index=False)
注意这里选出的前50与之前按照播放量排序的结果不同,因为在计算歌曲得分时,有一部分歌曲因为在平台中的排名为0,此时该歌曲会被去除,不计入后续播放量的计算。
最后结果保存在result文件夹中的top50_artists_scores.csv文件中。
(4)播放量最高的100首歌曲的音调分布
通过如下代码首先对所有歌曲按照播放量进行排序,并获得这些歌的音调;接着计算前100首歌中每种音调的数量:
top100_songs = table.order_by(
col('streams').desc).limit(100).select(col('key'))
key_distribution = top100_songs.group_by(col('key')) \
.select(col('key'),
col('key').count.alias('total_count'))
df = key_distribution.to_pandas()
print(df)
df.to_csv('./result/top100_songs_key.csv', index=False)
结果保存在result文件夹中的top100_songs_key.csv文件中。
(5)播放量最高的10首歌曲的属性
对所有歌曲按照播放量排序,保留7个相关的属性:
top10_songs = table.order_by(col('streams').desc).limit(10)\
.select(col('track_name'),
col('streams'),
col('danceability_%'),
col('valence_%'),
col('energy_%'),
col('acousticness_%'),
col('instrumentalness_%'),
col('liveness_%'),
col('speechiness_%'))
sorted_top10_songs = top10_songs.order_by(
col('streams').desc).drop_columns(col('streams'))
df = sorted_top10_songs.to_pandas()
print(df)
df.to_csv('./result/top10_songs_properties.csv', index=False)
结果保存在result文件夹中的top10_songs_properties.csv文件中。
3.任务执行与结果
通过如下方式执行:
python data_process.py
以下分别为5个任务的结果:
四、数据可视化
数据图形化使用了Pyecharts,利用其功能结合Flink处理好的数据构建了如下图表,代码均位于result_visualization.ipynb文件中。
结果有三种类型,在原文件中渲染的结果与result/html文件夹中的结果均可以动态操作,/result/png文件夹中保存的结果为图片,放在实验报告中。
通过如下代码导入环境和设置路径:
from pyecharts import options as opts
from pyecharts.charts import Pie, Bar, Radar
import pandas as pd
from pyecharts.render import make_snapshot
from snapshot_selenium import snapshot
html_path_prefix = './result/html/'
png_path_prefix = './result/png/'
(1)播放量最高的10位歌手的播放总量和歌曲数量
df = pd.read_csv('./result/top10_artists_streams.csv')
df.rename(columns={'artist(s)_name': 'artist_name'}, inplace=True)
artist_streams = []
artist_songs = {}
for row in df.itertuples():
artist_streams.append(
(f"{row.artist_name}: {row.total_songs}", row.total_streams))
artist_songs[row.artist_name] = row.total_songs
def custom_formatter(params):
artist_name = params.name
total_streams = params.value
total_songs = artist_songs.get(artist_name, "无数据")
return f"{artist_name}: {total_songs}"
c = (
Pie(init_opts=opts.InitOpts(
width="1000px", # 设置图表宽度
height="600px", # 设置图表高度
# theme=ThemeType.LIGHT, # 设置图表主题
bg_color="#cae6ff" # 设置图表背景颜色
))
.add(
"",
artist_streams,
radius=["30", "250"],
center=["50%", "50%"],
rosetype="area",
)
.set_global_opts(title_opts=opts.TitleOpts(title="播放量最高的10位歌手总播放量及歌曲数量"),
toolbox_opts=opts.ToolboxOpts(
is_show=True, pos_right="20"),
legend_opts=opts.LegendOpts(orient="vertical",
pos_top="30%",
pos_left="left"))
)
c.render(html_path_prefix+'top10_artists_streams.html')
make_snapshot(snapshot, c.render(), png_path_prefix +
'top10_artists_streams.png')
c.render_notebook()
播放量最高的10位歌手总播放量及歌曲数量使用饼图表示,其中图例表示每位歌手的名字和歌曲数量:
(2)播放量最高的10位歌手的所有歌曲的平均属性
df = pd.read_csv('result/top10_artists_avg_properties.csv')
df.rename(columns={'artist(s)_name': 'artist_name'}, inplace=True)
properties_names = df.columns.to_list()[1:]
artists = df['artist_name'].to_list()
properties_map = {}
for row in df.itertuples():
properties_map[row.artist_name] = list(row)[2:]
bar = (
Bar(init_opts=opts.InitOpts(
width="1200px", # 设置图表宽度
height="1500px", # 设置图表高度
# theme=ThemeType.LIGHT, # 设置图表主题
bg_color="#cae6ff" # 设置图表背景颜色
))
.add_xaxis(properties_names) # 设置 x 轴数据为歌手名称
.set_global_opts(
title_opts=opts.TitleOpts(title="播放量最高的10位歌手每首歌的平均属性"),
toolbox_opts=opts.ToolboxOpts(is_show=True, pos_right="20"),
legend_opts=opts.LegendOpts(pos_top="2%"
)))
for artist_name in artists:
bar = bar.add_yaxis(
series_name=artist_name, y_axis=properties_map[artist_name], gap="0%")
print(properties_map[artist_name])
bar.render(html_path_prefix+'top10_artists_avg_properties.html')
make_snapshot(snapshot, bar.render(), png_path_prefix +
'top10_artists_avg_properties.png')
bar.render_notebook()
使用柱状图表示每个歌曲性质下,不同歌手的对比。
(3)播放量最高的50位歌手在不同平台中的得分
result_title = 'top10_artists_scores'
df = pd.read_csv('result/top50_artists_scores.csv', nrows=10)
df.rename(columns={'artist(s)_name': 'artist_name'}, inplace=True)
artist_names = df['artist_name'].to_list()
artist_names = [name.split(",")[0] for name in artist_names]
print(artist_names)
df['total_deezer_score'] = df['total_deezer_score'].astype(int)
platform_scores = df.columns.to_list()[1:]
platforms = ["spotify", "deezer"]
bar = (
Bar(init_opts=opts.InitOpts(
width="800px", # 设置图表宽度
height="800px", # 设置图表高度
# theme=ThemeType.LIGHT, # 设置图表主题
bg_color="#cae6ff" # 设置图表背景颜色
))
.add_xaxis(artist_names) # 设置 x 轴数据为歌手名称 # 设置标签显示在右侧
.set_global_opts(
xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-20)),
title_opts=opts.TitleOpts(title="Top10歌手在各平台的得分"),
toolbox_opts=opts.ToolboxOpts(is_show=True, pos_right="20"),
legend_opts=opts.LegendOpts(pos_top="2%"
))
)
for platform in platforms:
bar = bar.add_yaxis(
series_name=platform, y_axis=df[f"total_{platform}_score"].to_list(),
stack="stack1",
label_opts=opts.LabelOpts(position="right"),
category_gap="50%")
print(df[f"total_{platform}_score"].to_list())
bar.render(html_path_prefix+'top10_artists_scores.html')
make_snapshot(snapshot, bar.render(), png_path_prefix +
'top10_artists_scores.png')
bar.render_notebook()
为了使得图更清晰,此处仅画出播放量最高的前10位歌手的评分;且由于在apple平台中各位歌手的得分均较低,此处并没有进行可视化。
(4)播放量最高的100首歌曲的音调分布
df = pd.read_csv('result/top100_songs_key.csv')
key_counts = []
for row in df.itertuples():
key_counts.append((row.key, row.total_count))
c = (
Pie(init_opts=opts.InitOpts(
# width="1000px", # 设置图表宽度
# height="600px", # 设置图表高度
# theme=ThemeType.LIGHT, # 设置图表主题
bg_color="#cae6ff" # 设置图表背景颜色
))
.add("",
key_counts)
.set_colors(["blue", "green", "yellow", "red", "pink", "orange", "purple"])
.set_global_opts(title_opts=opts.TitleOpts(title="播放量最高前100首歌曲曲调分布"),
legend_opts=opts.LegendOpts(orient="vertical",
pos_top="30%",
pos_right="right"),
toolbox_opts=opts.ToolboxOpts(
is_show=True, pos_right="20"),)
.set_series_opts(label_opts=opts.LabelOpts(position="inside",
formatter="{b}: {c}"))
)
c.render(html_path_prefix+'top100_songs_key.html')
make_snapshot(snapshot, c.render(), png_path_prefix +
'top100_songs_key.png')
c.render_notebook()
使用饼图表示每种曲调的个数,标记在饼图中。
(5)播放量最高的10首歌曲的属性
df = pd.read_csv('result/top10_songs_properties.csv')
df.drop(columns=['instrumentalness_%'], inplace=True)
song_names = df['track_name'].to_list()
song_properties = []
for row in df.itertuples():
song_properties.append(list(row)[2:])
print(list(row)[2:])
# 提取性质名称
property_names = df.columns.to_list()[1:]
print(property_names)
# 创建 Radar 图表
radar = (
Radar(init_opts=opts.InitOpts(
width="800px", # 设置图表宽度
height="700px", # 设置图表高度
# theme=ThemeType.LIGHT, # 设置图表主题
bg_color="#cae6ff" # 设置图表背景颜色)
)).add_schema(
schema=[
opts.RadarIndicatorItem(name=prop_name, max_=100) for prop_name in property_names
],
axislabel_opt=opts.LabelOpts(is_show=True, color="#000"), # 隐藏坐标轴刻度
textstyle_opts=opts.TextStyleOpts(color="#000"), # 设置指标名称的颜色
splitline_opt=opts.SplitLineOpts(
is_show=True, linestyle_opts=opts.LineStyleOpts(color="#aaa")), # 设置分割线颜色
splitarea_opt=opts.SplitAreaOpts(is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)
)
))
colors = ["#fb0505", "#0090fd", "#009c4a",
"#ffbf00", "#000000", "#512bdb",
"#3a442b", "#fd05e5", "#f1cbff",
"#3a05fd"]
# colors=["blue", "green", "yellow", "red", "pink", "orange", "purple"]
# 添加每首歌曲的性质数据到雷达图中
for i, song_name in enumerate(song_names):
radar = radar.add(series_name=song_name,
data=[song_properties[i]],
label_opts=opts.LabelOpts(is_show=False),
color=colors[i % 7])
# 设置全局配置项
radar.set_global_opts(
title_opts=opts.TitleOpts(title="播放量前10的歌曲特点雷达图"),
legend_opts=opts.LegendOpts(pos_top="2.5%"), # 设置图例垂直排列,右侧显示
)
# 在 Jupyter Notebook 中显示图表
radar.render(html_path_prefix+'top10_songs_properties.html')
make_snapshot(snapshot, radar.render(), png_path_prefix +
'top10_songs_properties.png')
radar.render_notebook()
使用雷达图表示10首歌中每首歌的各个属性,每个属性的最大值均为100。
在原本的数据分析程序中,获取歌曲的7个相关性质。但由于instrumentalness各歌曲都为0,所以可视化时只选择其它6个字段。
想要更完整地体验可视化效果,可以查阅Jupyter Notebook或html文件。