基于Python和Spark的淘宝展示广告点击率预估

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学计算机科学与技术系2023级研究生 李坤华
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
时间:2024年6月
相关教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版,第2版)》(访问教材官网
相关案例:基于Python语言的Spark数据处理分析案例集锦(PySpark)
数据集和代码下载:从百度网盘下载本案例数据集和代码。(提取码是ziyu)
本案例数据集来自阿里云天池公共数据集——淘宝展示广告点击率预估数据集。采用pandas对数据进行清洗,采用分布式文件系统HDFS进行数据存储,采用Python编写Spark程序进行数据分析,使用Matplotlib进行数据可视化。

一、数据集

数据集来自阿里云天池公共数据集:淘宝展示广告点击率预估数据集(淘宝展示广告点击率预估数据集_数据集-阿里云天池 (aliyun.com))。
数据集介绍:
该数据集中包含有4个csv文件,分别为样本数据(raw_sample.csv) 、(广告的基本信息)ad_feature.csv 、(用户的基本信息)user_profile.csv 、(用户的行为日志)raw_behavior_log.csv。

1、原始样本骨架raw_sample

淘宝网站中随机抽样了114万用户8天内的广告展示/点击日志(2600万条记录)
(1) user_id:脱敏过的用户ID;
(2) adgroup_id:脱敏过的广告单元ID;
(3) time_stamp:时间戳;
(4) pid:资源位;
(5) noclk:为1代表没有点击;为0代表点击;
(6) clk:为0代表没有点击;为1代表点击;

2、广告基本信息表ad_feature

广告的基本信息
(1) adgroup_id:脱敏过的广告ID;
(2) cate_id:脱敏过的商品类目ID;
(3) campaign_id:脱敏过的广告计划ID;
(4) customer_id:脱敏过的广告主ID;
(5) brand:脱敏过的品牌ID;
(6) price: 宝贝的价格

3、用户基本信息表user_profile

用户的基本信息
(1) userid:脱敏过的用户ID;
(2) cms_segid:微群ID;
(3) cms_group_id:cms_group_id;
(4) final_gender_code:性别 1:男,2:女;
(5) age_level:年龄层次;
(6) pvalue_level:消费档次,1:低档,2:中档,3:高档;
(7) shopping_level:购物深度,1:浅层用户,2:中度用户,3:深度用户
(8) occupation:是否大学生 ,1:是,0:否
(9) new_user_class_level:城市层级

4、用户的行为日志behavior_log

raw_sample中全部用户22天内的购物行为(共七亿条记录)
(1) user:脱敏过的用户ID;
(2) time_stamp:时间戳;
(3) btag:行为类型,  包括以下四种:ipv(浏览)、cart(加入购物车)、fav(喜欢)、buy(购买)
(4) cate:脱敏过的商品类目;
(5) brand: 脱敏过的品牌词

二、数据预处理

1、读取ad_featrue.csv

1.# 处理广告数据  
2.ad_feature_file = pd.read_csv('../data/ad_feature.csv')  
3.print(ad_feature_file.head(12))  
4.ad_feature_file.info()  
5.ad_feature_file.shape 

读取ad_featrue.csv文件,查看文件信息

2、处理ad_featrue.csv

1.for i in ad_feature_file.columns:  
2.    null_rate = ad_feature_file[i].isna().sum() / len(ad_feature_file) * 100  
3.    if null_rate > 0:  
4.        print("{} null rate: {} %".format(i, round(null_rate, 2)))  
5.  
6.# 处理缺失值  
7.ad_feature_file.dropna(inplace=True)  # 删除含有缺失值的行  
8.# 处理重复值  
9.ad_feature_file.drop_duplicates(inplace=True)  # 删除重复行  
10.#将ad_feature_file中的brand列转为整型  
11.ad_feature_file['brand'] = ad_feature_file['brand'].astype(int)  
12.#保存文件  
ad_feature_file.to_csv('ad_feature_data.csv', index=False)

检查是否含有缺失值

删除含有缺失值的行、删除重复行

3、读取user_profile.csv

1.# 用户基本信息  
2.user_profile_file = pd.read_csv('../data/user_profile.csv')  
3.print(user_profile_file.head(12))  
user_profile_file.info() 

4、处理user_profile.csv

1.from sklearn.impute import SimpleImputer  
2.for i in user_profile_file.columns:  
3.    null_rate = user_profile_file[i].isna().sum() / len(user_profile_file) * 100  
4.    if null_rate > 0:  
5.        print("{} null rate: {} %".format(i, round(null_rate, 2)))  
6.city_level = user_profile_file.loc[:,'new_user_class_level '].values.reshape(-1,1)  # loc:对索引名进行切片     
7.si = SimpleImputer(strategy = 'most_frequent')  # 实例化,使用众数填补  
8.user_profile_file.loc[:,'new_user_class_level '] = si.fit_transform(city_level) # fit_transform一步训练导出结果  
9.user_profile_file.info()  
10.# 利用KNN对消费档次进行预测填充  
11.from sklearn.neighbors import KNeighborsClassifier  
12.columns = ['userid', 'cms_segid', 'cms_group_id', 'final_gender_code', 'age_level',  
13.       'shopping_level', 'occupation', 'new_user_class_level ','pvalue_level']  
14.user_data = user_profile_file[columns]  
15.# pavlue_level为空的样本作为测试集  
16.pvalue_null = user_data.loc[user_data['pvalue_level'].isnull().values == True]  
17.# pavlue_level不空的样本作为训练集  
18.pvalue_nonull = user_data.loc[user_data['pvalue_level'].isnull().values == False]  
19.X_train_user,y_train_user = pvalue_nonull.iloc[:,:-1],pvalue_nonull.iloc[:,-1]  
20.X_test_user,y_test_user = pvalue_null.iloc[:,:-1],pvalue_null.iloc[:,-1]  
21.knn = KNeighborsClassifier(n_neighbors=3,weights='distance')  
22.knn.fit(X_train_user,y_train_user)  
23.# 得到预测结果  
24.y_test_user = knn.predict(X_test_user)  
25.y_test_user = pd.DataFrame(y_test_user)  
26.y_test_user.columns = ['pvalue_level']  
27.X_test_user.reset_index(drop=True,inplace=True)  
28.pvalue_null = pd.concat([X_test_user,y_test_user],axis=1)  
29.user = pd.concat([pvalue_nonull,pvalue_null],ignore_index=False)  
30.# 将处理过后的user信息保存  
user.to_csv('user_data.csv',index=False,sep=',') 

检查是否含有缺失值

我发现pvalue_level(消费水平)缺失值占到了54.24%,缺失的比重过大,所以不能够简单的将含有缺失值的行删除掉,故此采用KNN来预测缺失值,我们将不含缺失值的数据作为训练样本,含有缺失值的样本为测试样本,最终得到预测的数据。
发现new_user_class(城市层级)缺失值为32.49%,故此可以采用利用众数对缺失数据进行填充。

5、读取raw_sample.csv

1.# 样本  
2.raw_sample_file = pd.read_csv('../data/raw_sample.csv', nrows=500000)  
3.print(raw_sample_file.head(12))  
4.raw_sample_file.info()  

因为样本数据量过大,所以在本次数据分析中仅读取了50万条数据。

6、处理raw_sample.csv

1.for i in raw_sample_file.columns:  
2.    null_rate = raw_sample_file[i].isna().sum() / len(raw_sample_file) * 100  
3.    if null_rate > 0:  
4.        print("{} null rate: {} %".format(i, round(null_rate, 2)))  
5.# 将处理过后的user信息保存  
raw_sample_file.to_csv('sample_data.csv',index=False) 

检查发现没有缺失值

7、合并数据

1.# 修改表dataset中列名user为userid,以便后面基于主键连接  
2.raw_sample_file.rename(columns={'user':'userid'},inplace=True)  
3.raw_sample_file.head()  
4.  
5.# 将数据集dataset与用户基本信息表user合并,基于主键userid,how='right'表示以右边表为基准连接  
6.user_dataset = pd.merge(user,raw_sample_file,on='userid',how='right')  
7.print(f'user_dataset表的维度:{user_dataset.shape}')  
8.  
9.# 将数据集与广告基本信息表ads合并,基于主键adgroup_id,how='right'表示以右边表为基准连接  
10.ads_user_dataset = pd.merge(ad_feature_file,user_dataset,on='adgroup_id',how='right')  
11.print(f'ads_user_dataset表的维度:{ads_user_dataset.shape}')  
12.  
13.ads_user_dataset.to_csv('data.csv',index=False,sep=',') 


最终得到一个50行,19列的表

1.# 获取data每列的缺失值占比  
2.data_null = data.isnull().sum()/len(raw_sample_file)*100  
3.data_null = data_null.drop(data_null[data_null==0].index).sort_values(ascending=False)     # 将缺失值占比从高到低排序  
4.missing_data = pd.DataFrame({'Missing Ratio(%)':data_null})  
5.print(f'dataset含有缺失值的属性个数:{len(data_null)}')  
6.print(missing_data)  
data.dropna(axis=0, how='any',inplace=True)

检查是否右缺失值、删除含有缺失值的数据

1.# 将数据中的时间戳形式转换为日期和时间形式  
2.import datetime  
3.import time  
4.data['time_stamp']=pd.to_datetime(data['time_stamp'],unit='s')  
5.data['time_stamp']  
6.# 从转换后的数据中分别提取:日期、时间、小时,组成新的列  
7.data['date'] = data['time_stamp'].dt.date  
8.data['time'] = data['time_stamp'].dt.time  
9.data['hour'] = data['time_stamp'].dt.hour  
10.# 调整数据集data的列顺序:将'data'、'time'、'hour'这三列数据调至'time_stamp'列后  
11.columns = ['adgroup_id', 'cate_id', 'campaign_id', 'customer', 'brand', 'price',  
12.       'userid', 'cms_segid', 'cms_group_id', 'final_gender_code', 'age_level',  
13.       'shopping_level', 'occupation', 'new_user_class_level ', 'pvalue_level',  
14.       'time_stamp', 'date', 'time', 'hour', 'pid', 'nonclk', 'clk']  
15.data = data[columns]  
16.data = data.drop(['time_stamp','time','nonclk'],axis=1)  
17.data.to_csv('user_ad_data.csv',index=False,sep=',')  

处理数据中的时间戳信息,将时间戳转化为日期和时间形式以便分析。

8、读取behavior_log.csv

1.# 处理用户行为数据  
2.behavior_file = pd.read_csv('../data/behavior_log.csv',nrows=500000)  
3.print(behavior_file.head(12))  
4.behavior_file.info()  
5.behavior_file.shape  

因为样本数据量过大,所以在本次数据分析中仅读取了50万条数据。

9、处理behavior_log.csv

1.for i in behavior_file.columns:  
2.    null_rate = behavior_file[i].isna().sum() / len(behavior_file) * 100  
3.    if null_rate > 0:  
4.        print("{} null rate: {} %".format(i, round(null_rate, 2)))  
5.behavior_file.rename(columns={'user':'userid'},inplace=True)  
6.behavior_file.rename(columns={'cate':'cate_id'},inplace=True)  
7.behavior_file['time_stamp']=pd.to_datetime(behavior_file['time_stamp'],unit='s')  
8.# 从转换后的数据中分别提取:日期、时间、小时,组成新的列  
9.behavior_file['date'] = behavior_file['time_stamp'].dt.date  
10.behavior_file['time'] = behavior_file['time_stamp'].dt.time  
11.behavior_file['hour'] = behavior_file['time_stamp'].dt.hour  
12.behavior_file = behavior_file.drop(['time_stamp'],axis=1)  
13.behavior_file.to_csv('behavior_data.csv',index=False,sep=',')  

为方便后续合并表,修改user列为userid,修改cate列为cate_id。

10、使用HDFS存储数据


三、数据分析

1、读取数据

1.spark = SparkSession.builder.getOrCreate()  
2.df = spark.read.format("csv")\  
3..option("header","true")\  
4..option("inferSchema","true")\  
5..option('escape','\"')\  
6..load("user_ad_data.csv")  
7.         
8.df.createOrReplaceTempView("data")  
9.df.show(4) 

读取user_ad_data.csv数据

1.behavior_df = spark.read.format("csv")\  
2..option("header","true")\  
3..option("inferSchema","true")\  
4..option('escape','\"')\  
5..load("behavior_data.csv")  
6.         
7.behavior_df.createOrReplaceTempView("behavior")  
behavior_df.show(4)

读取behavior_data.csv数据

2、分析广告点击率

1.def clk_counts(df):  
2.    counts = df.groupBy("clk").count()  
3.    total = df.count()  
4.    data_clk = counts.withColumn("Ratio(%)", F.round((F.col("count") / total * 100),2))  
5.    return data_clk  
6.data_clk = clk_counts(df)  
7.data_clk.show(3)  
8.data_clk = data_clk.toPandas()  
9.data_clk.to_csv("data_clk.csv",index=False)



可以发现整体的广告点击率在4.8%

3、分析一段时间的广告点击情况

1.data_date = df.groupBy('date').agg(  
2.    F.count('clk').alias('展示量'),  
3.    F.sum('clk').alias('点击量'),  
4.    F.mean('clk').alias('点击率')  
5.)  
6.data_date = data_date.orderBy("date")  
7.data_date.show() 
data_date.to_csv("data_date.csv",index=False) 



通过分析可知每天的广告点击率在4.8%左右,但随着日期的增长,点击率有略微的下降。

4、分析一天中用户的点击情况

1.data_hour = df.groupBy('hour').agg(  
2.    F.count('clk').alias('展示量'),  
3.    F.sum('clk').alias('点击量'),  
4.    F.mean('clk').alias('点击率')  
5.)  
6.data_hour = data_hour.orderBy("hour")  
7.data_hour.show()  
8.data_hour.to_csv("data_hour.csv",index=False)



通过分析可知,用户在上午与中午与凌晨较为活跃。广告的点击率在傍晚17点有显著的提升。

5、查看点击量最多的广告

1.# 查看所有投放的广告并按点击量排序  
2.data_adgroup = df.groupBy('adgroup_id').agg(  
3.    F.count('clk').alias('展示量'),  
4.    F.sum('clk').alias('点击量'),  
5.    F.mean('clk').alias('点击率')  
6.)  
7.data_adgroup = data_adgroup.orderBy("点击量", ascending=False)  
8.data_adgroup.show(3)
data_adgroup.to_csv("data_adgroup.csv", index=False) 


可以从表中观察到广告点击量最多的ID为44952

1.1.  # 筛选adgroup_id为44952的广告    
2.2.  data_44952 = df[df['adgroup_id']==44952]    
3.3.  # 筛选【adgroup_id为44952】且【被点击】的广告    
4.4.  data_44952_clk = data_44952[data_44952['clk']==1]    
5.5.  data_44952_clk.show(3)   
6.# 根据【adgroup_id】、【userid】这两列去重  
7.data_44952_clk.drop_duplicates(subset=['userid','adgroup_id'])  
8.data_44952_cms = data_44952.groupby('cms_segid').agg(  
9.    F.sum('clk').alias('点击量'),  
10.).toPandas()  
11.data_44952_cms_group = data_44952.groupby('cms_group_id').agg(  
12.    F.sum('clk').alias('点击量'),  
13.).toPandas()  
14.data_44952_gender = data_44952.groupby('final_gender_code').agg(  
15.    F.sum('clk').alias('点击量'),  
16.).toPandas()  
17.data_44952_age = data_44952.groupby('age_level').agg(  
18.    F.sum('clk').alias('点击量'),  
19.).toPandas()  
20.data_44952_shopping = data_44952.groupby('shopping_level').agg(  
21.    F.sum('clk').alias('点击量'),  
22.).toPandas()  
23.data_44952_occupation = data_44952.groupby('occupation').agg(  
24.    F.sum('clk').alias('点击量'),  
25.).toPandas()  
26.data_44952_city = data_44952.groupby('new_user_class_level ').agg(  
27.    F.sum('clk').alias('点击量'),  
28.).toPandas()  
29.data_44952_pvalue = data_44952.groupby('pvalue_level').agg(  
30.    F.sum('clk').alias('点击量'),  
).toPandas()

统计观看此广告最多的用户特征

分析可得观看该广告最多的人群来源于微群2,其中女生略多与男生,绝大多数是年龄层次1与年龄层次5的人。他们大多数购物深度为中度,身份为非大学生,所在城市层次为4,消费等级为高档。

6、分析一天中用户的点击情况

1.merge_df = df.join(behavior_df,on='userid',how='left')  
2.merge_df = merge_df.dropna(subset=['btag'])  
3.md_btag = merge_df.groupBy('adgroup_id').pivot("btag").count()  
4.# 用0填充缺失值  
5.md_btag = md_btag.fillna(0)  
6.md_btag = md_btag.withColumn("total", F.col("cart") + F.col("buy") + F.col("fav") + F.col("pv"))  
7.# 计算购买率  
8.md_btag= md_btag.withColumn("purchase_rate", F.round(F.col("buy") / F.col("total"),4))  
9.# 计算加入购物车率  
10.md_btag = md_btag.withColumn("cart_rate", F.round(F.col("cart") / F.col("total"),4))  
11.# 计算加入喜欢率  
12.md_btag = md_btag.withColumn("favor_rate", F.round(F.col("fav") / F.col("total"),4))  
13.md_btag.show(4)  
14.md_btag.to_csv("md_btag.csv",index=False)



通过分析能够看到各个广告的实际起到的作用。