【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学计算机科学与技术系2023级研究生 李坤华
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
时间:2024年6月
相关教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版,第2版)》(访问教材官网)
相关案例:基于Python语言的Spark数据处理分析案例集锦(PySpark)
数据集和代码下载:从百度网盘下载本案例数据集和代码。(提取码是ziyu)
本案例数据集来自阿里云天池公共数据集——淘宝展示广告点击率预估数据集。采用pandas对数据进行清洗,采用分布式文件系统HDFS进行数据存储,采用Python编写Spark程序进行数据分析,使用Matplotlib进行数据可视化。
一、数据集
数据集来自阿里云天池公共数据集:淘宝展示广告点击率预估数据集(淘宝展示广告点击率预估数据集_数据集-阿里云天池 (aliyun.com))。
数据集介绍:
该数据集中包含有4个csv文件,分别为样本数据(raw_sample.csv) 、(广告的基本信息)ad_feature.csv 、(用户的基本信息)user_profile.csv 、(用户的行为日志)raw_behavior_log.csv。
1、原始样本骨架raw_sample
淘宝网站中随机抽样了114万用户8天内的广告展示/点击日志(2600万条记录)
(1) user_id:脱敏过的用户ID;
(2) adgroup_id:脱敏过的广告单元ID;
(3) time_stamp:时间戳;
(4) pid:资源位;
(5) noclk:为1代表没有点击;为0代表点击;
(6) clk:为0代表没有点击;为1代表点击;
2、广告基本信息表ad_feature
广告的基本信息
(1) adgroup_id:脱敏过的广告ID;
(2) cate_id:脱敏过的商品类目ID;
(3) campaign_id:脱敏过的广告计划ID;
(4) customer_id:脱敏过的广告主ID;
(5) brand:脱敏过的品牌ID;
(6) price: 宝贝的价格
3、用户基本信息表user_profile
用户的基本信息
(1) userid:脱敏过的用户ID;
(2) cms_segid:微群ID;
(3) cms_group_id:cms_group_id;
(4) final_gender_code:性别 1:男,2:女;
(5) age_level:年龄层次;
(6) pvalue_level:消费档次,1:低档,2:中档,3:高档;
(7) shopping_level:购物深度,1:浅层用户,2:中度用户,3:深度用户
(8) occupation:是否大学生 ,1:是,0:否
(9) new_user_class_level:城市层级
4、用户的行为日志behavior_log
raw_sample中全部用户22天内的购物行为(共七亿条记录)
(1) user:脱敏过的用户ID;
(2) time_stamp:时间戳;
(3) btag:行为类型, 包括以下四种:ipv(浏览)、cart(加入购物车)、fav(喜欢)、buy(购买)
(4) cate:脱敏过的商品类目;
(5) brand: 脱敏过的品牌词
二、数据预处理
1、读取ad_featrue.csv
1.# 处理广告数据
2.ad_feature_file = pd.read_csv('../data/ad_feature.csv')
3.print(ad_feature_file.head(12))
4.ad_feature_file.info()
5.ad_feature_file.shape
读取ad_featrue.csv文件,查看文件信息
2、处理ad_featrue.csv
1.for i in ad_feature_file.columns:
2. null_rate = ad_feature_file[i].isna().sum() / len(ad_feature_file) * 100
3. if null_rate > 0:
4. print("{} null rate: {} %".format(i, round(null_rate, 2)))
5.
6.# 处理缺失值
7.ad_feature_file.dropna(inplace=True) # 删除含有缺失值的行
8.# 处理重复值
9.ad_feature_file.drop_duplicates(inplace=True) # 删除重复行
10.#将ad_feature_file中的brand列转为整型
11.ad_feature_file['brand'] = ad_feature_file['brand'].astype(int)
12.#保存文件
ad_feature_file.to_csv('ad_feature_data.csv', index=False)
检查是否含有缺失值
删除含有缺失值的行、删除重复行
3、读取user_profile.csv
1.# 用户基本信息
2.user_profile_file = pd.read_csv('../data/user_profile.csv')
3.print(user_profile_file.head(12))
user_profile_file.info()
4、处理user_profile.csv
1.from sklearn.impute import SimpleImputer
2.for i in user_profile_file.columns:
3. null_rate = user_profile_file[i].isna().sum() / len(user_profile_file) * 100
4. if null_rate > 0:
5. print("{} null rate: {} %".format(i, round(null_rate, 2)))
6.city_level = user_profile_file.loc[:,'new_user_class_level '].values.reshape(-1,1) # loc:对索引名进行切片
7.si = SimpleImputer(strategy = 'most_frequent') # 实例化,使用众数填补
8.user_profile_file.loc[:,'new_user_class_level '] = si.fit_transform(city_level) # fit_transform一步训练导出结果
9.user_profile_file.info()
10.# 利用KNN对消费档次进行预测填充
11.from sklearn.neighbors import KNeighborsClassifier
12.columns = ['userid', 'cms_segid', 'cms_group_id', 'final_gender_code', 'age_level',
13. 'shopping_level', 'occupation', 'new_user_class_level ','pvalue_level']
14.user_data = user_profile_file[columns]
15.# pavlue_level为空的样本作为测试集
16.pvalue_null = user_data.loc[user_data['pvalue_level'].isnull().values == True]
17.# pavlue_level不空的样本作为训练集
18.pvalue_nonull = user_data.loc[user_data['pvalue_level'].isnull().values == False]
19.X_train_user,y_train_user = pvalue_nonull.iloc[:,:-1],pvalue_nonull.iloc[:,-1]
20.X_test_user,y_test_user = pvalue_null.iloc[:,:-1],pvalue_null.iloc[:,-1]
21.knn = KNeighborsClassifier(n_neighbors=3,weights='distance')
22.knn.fit(X_train_user,y_train_user)
23.# 得到预测结果
24.y_test_user = knn.predict(X_test_user)
25.y_test_user = pd.DataFrame(y_test_user)
26.y_test_user.columns = ['pvalue_level']
27.X_test_user.reset_index(drop=True,inplace=True)
28.pvalue_null = pd.concat([X_test_user,y_test_user],axis=1)
29.user = pd.concat([pvalue_nonull,pvalue_null],ignore_index=False)
30.# 将处理过后的user信息保存
user.to_csv('user_data.csv',index=False,sep=',')
检查是否含有缺失值
我发现pvalue_level(消费水平)缺失值占到了54.24%,缺失的比重过大,所以不能够简单的将含有缺失值的行删除掉,故此采用KNN来预测缺失值,我们将不含缺失值的数据作为训练样本,含有缺失值的样本为测试样本,最终得到预测的数据。
发现new_user_class(城市层级)缺失值为32.49%,故此可以采用利用众数对缺失数据进行填充。
5、读取raw_sample.csv
1.# 样本
2.raw_sample_file = pd.read_csv('../data/raw_sample.csv', nrows=500000)
3.print(raw_sample_file.head(12))
4.raw_sample_file.info()
因为样本数据量过大,所以在本次数据分析中仅读取了50万条数据。
6、处理raw_sample.csv
1.for i in raw_sample_file.columns:
2. null_rate = raw_sample_file[i].isna().sum() / len(raw_sample_file) * 100
3. if null_rate > 0:
4. print("{} null rate: {} %".format(i, round(null_rate, 2)))
5.# 将处理过后的user信息保存
raw_sample_file.to_csv('sample_data.csv',index=False)
检查发现没有缺失值
7、合并数据
1.# 修改表dataset中列名user为userid,以便后面基于主键连接
2.raw_sample_file.rename(columns={'user':'userid'},inplace=True)
3.raw_sample_file.head()
4.
5.# 将数据集dataset与用户基本信息表user合并,基于主键userid,how='right'表示以右边表为基准连接
6.user_dataset = pd.merge(user,raw_sample_file,on='userid',how='right')
7.print(f'user_dataset表的维度:{user_dataset.shape}')
8.
9.# 将数据集与广告基本信息表ads合并,基于主键adgroup_id,how='right'表示以右边表为基准连接
10.ads_user_dataset = pd.merge(ad_feature_file,user_dataset,on='adgroup_id',how='right')
11.print(f'ads_user_dataset表的维度:{ads_user_dataset.shape}')
12.
13.ads_user_dataset.to_csv('data.csv',index=False,sep=',')
最终得到一个50行,19列的表
1.# 获取data每列的缺失值占比
2.data_null = data.isnull().sum()/len(raw_sample_file)*100
3.data_null = data_null.drop(data_null[data_null==0].index).sort_values(ascending=False) # 将缺失值占比从高到低排序
4.missing_data = pd.DataFrame({'Missing Ratio(%)':data_null})
5.print(f'dataset含有缺失值的属性个数:{len(data_null)}')
6.print(missing_data)
data.dropna(axis=0, how='any',inplace=True)
检查是否右缺失值、删除含有缺失值的数据
1.# 将数据中的时间戳形式转换为日期和时间形式
2.import datetime
3.import time
4.data['time_stamp']=pd.to_datetime(data['time_stamp'],unit='s')
5.data['time_stamp']
6.# 从转换后的数据中分别提取:日期、时间、小时,组成新的列
7.data['date'] = data['time_stamp'].dt.date
8.data['time'] = data['time_stamp'].dt.time
9.data['hour'] = data['time_stamp'].dt.hour
10.# 调整数据集data的列顺序:将'data'、'time'、'hour'这三列数据调至'time_stamp'列后
11.columns = ['adgroup_id', 'cate_id', 'campaign_id', 'customer', 'brand', 'price',
12. 'userid', 'cms_segid', 'cms_group_id', 'final_gender_code', 'age_level',
13. 'shopping_level', 'occupation', 'new_user_class_level ', 'pvalue_level',
14. 'time_stamp', 'date', 'time', 'hour', 'pid', 'nonclk', 'clk']
15.data = data[columns]
16.data = data.drop(['time_stamp','time','nonclk'],axis=1)
17.data.to_csv('user_ad_data.csv',index=False,sep=',')
处理数据中的时间戳信息,将时间戳转化为日期和时间形式以便分析。
8、读取behavior_log.csv
1.# 处理用户行为数据
2.behavior_file = pd.read_csv('../data/behavior_log.csv',nrows=500000)
3.print(behavior_file.head(12))
4.behavior_file.info()
5.behavior_file.shape
因为样本数据量过大,所以在本次数据分析中仅读取了50万条数据。
9、处理behavior_log.csv
1.for i in behavior_file.columns:
2. null_rate = behavior_file[i].isna().sum() / len(behavior_file) * 100
3. if null_rate > 0:
4. print("{} null rate: {} %".format(i, round(null_rate, 2)))
5.behavior_file.rename(columns={'user':'userid'},inplace=True)
6.behavior_file.rename(columns={'cate':'cate_id'},inplace=True)
7.behavior_file['time_stamp']=pd.to_datetime(behavior_file['time_stamp'],unit='s')
8.# 从转换后的数据中分别提取:日期、时间、小时,组成新的列
9.behavior_file['date'] = behavior_file['time_stamp'].dt.date
10.behavior_file['time'] = behavior_file['time_stamp'].dt.time
11.behavior_file['hour'] = behavior_file['time_stamp'].dt.hour
12.behavior_file = behavior_file.drop(['time_stamp'],axis=1)
13.behavior_file.to_csv('behavior_data.csv',index=False,sep=',')
为方便后续合并表,修改user列为userid,修改cate列为cate_id。
10、使用HDFS存储数据
三、数据分析
1、读取数据
1.spark = SparkSession.builder.getOrCreate()
2.df = spark.read.format("csv")\
3..option("header","true")\
4..option("inferSchema","true")\
5..option('escape','\"')\
6..load("user_ad_data.csv")
7.
8.df.createOrReplaceTempView("data")
9.df.show(4)
读取user_ad_data.csv数据
1.behavior_df = spark.read.format("csv")\
2..option("header","true")\
3..option("inferSchema","true")\
4..option('escape','\"')\
5..load("behavior_data.csv")
6.
7.behavior_df.createOrReplaceTempView("behavior")
behavior_df.show(4)
读取behavior_data.csv数据
2、分析广告点击率
1.def clk_counts(df):
2. counts = df.groupBy("clk").count()
3. total = df.count()
4. data_clk = counts.withColumn("Ratio(%)", F.round((F.col("count") / total * 100),2))
5. return data_clk
6.data_clk = clk_counts(df)
7.data_clk.show(3)
8.data_clk = data_clk.toPandas()
9.data_clk.to_csv("data_clk.csv",index=False)
可以发现整体的广告点击率在4.8%
3、分析一段时间的广告点击情况
1.data_date = df.groupBy('date').agg(
2. F.count('clk').alias('展示量'),
3. F.sum('clk').alias('点击量'),
4. F.mean('clk').alias('点击率')
5.)
6.data_date = data_date.orderBy("date")
7.data_date.show()
data_date.to_csv("data_date.csv",index=False)
通过分析可知每天的广告点击率在4.8%左右,但随着日期的增长,点击率有略微的下降。
4、分析一天中用户的点击情况
1.data_hour = df.groupBy('hour').agg(
2. F.count('clk').alias('展示量'),
3. F.sum('clk').alias('点击量'),
4. F.mean('clk').alias('点击率')
5.)
6.data_hour = data_hour.orderBy("hour")
7.data_hour.show()
8.data_hour.to_csv("data_hour.csv",index=False)
通过分析可知,用户在上午与中午与凌晨较为活跃。广告的点击率在傍晚17点有显著的提升。
5、查看点击量最多的广告
1.# 查看所有投放的广告并按点击量排序
2.data_adgroup = df.groupBy('adgroup_id').agg(
3. F.count('clk').alias('展示量'),
4. F.sum('clk').alias('点击量'),
5. F.mean('clk').alias('点击率')
6.)
7.data_adgroup = data_adgroup.orderBy("点击量", ascending=False)
8.data_adgroup.show(3)
data_adgroup.to_csv("data_adgroup.csv", index=False)
可以从表中观察到广告点击量最多的ID为44952
1.1. # 筛选adgroup_id为44952的广告
2.2. data_44952 = df[df['adgroup_id']==44952]
3.3. # 筛选【adgroup_id为44952】且【被点击】的广告
4.4. data_44952_clk = data_44952[data_44952['clk']==1]
5.5. data_44952_clk.show(3)
6.# 根据【adgroup_id】、【userid】这两列去重
7.data_44952_clk.drop_duplicates(subset=['userid','adgroup_id'])
8.data_44952_cms = data_44952.groupby('cms_segid').agg(
9. F.sum('clk').alias('点击量'),
10.).toPandas()
11.data_44952_cms_group = data_44952.groupby('cms_group_id').agg(
12. F.sum('clk').alias('点击量'),
13.).toPandas()
14.data_44952_gender = data_44952.groupby('final_gender_code').agg(
15. F.sum('clk').alias('点击量'),
16.).toPandas()
17.data_44952_age = data_44952.groupby('age_level').agg(
18. F.sum('clk').alias('点击量'),
19.).toPandas()
20.data_44952_shopping = data_44952.groupby('shopping_level').agg(
21. F.sum('clk').alias('点击量'),
22.).toPandas()
23.data_44952_occupation = data_44952.groupby('occupation').agg(
24. F.sum('clk').alias('点击量'),
25.).toPandas()
26.data_44952_city = data_44952.groupby('new_user_class_level ').agg(
27. F.sum('clk').alias('点击量'),
28.).toPandas()
29.data_44952_pvalue = data_44952.groupby('pvalue_level').agg(
30. F.sum('clk').alias('点击量'),
).toPandas()
统计观看此广告最多的用户特征
分析可得观看该广告最多的人群来源于微群2,其中女生略多与男生,绝大多数是年龄层次1与年龄层次5的人。他们大多数购物深度为中度,身份为非大学生,所在城市层次为4,消费等级为高档。
6、分析一天中用户的点击情况
1.merge_df = df.join(behavior_df,on='userid',how='left')
2.merge_df = merge_df.dropna(subset=['btag'])
3.md_btag = merge_df.groupBy('adgroup_id').pivot("btag").count()
4.# 用0填充缺失值
5.md_btag = md_btag.fillna(0)
6.md_btag = md_btag.withColumn("total", F.col("cart") + F.col("buy") + F.col("fav") + F.col("pv"))
7.# 计算购买率
8.md_btag= md_btag.withColumn("purchase_rate", F.round(F.col("buy") / F.col("total"),4))
9.# 计算加入购物车率
10.md_btag = md_btag.withColumn("cart_rate", F.round(F.col("cart") / F.col("total"),4))
11.# 计算加入喜欢率
12.md_btag = md_btag.withColumn("favor_rate", F.round(F.col("fav") / F.col("total"),4))
13.md_btag.show(4)
14.md_btag.to_csv("md_btag.csv",index=False)
通过分析能够看到各个广告的实际起到的作用。