基于PyFlink的二手车交易数据处理与分析

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学计算机科学与技术系2023级研究生 李鑫
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
时间:2024年6月
相关教材:林子雨编著《Flink编程基础(Java版)》(访问教材官网
相关案例:Flink大数据处理分析案例集锦
本案例采用阿里天池大赛提供的某交易平台的二手车交易记录数据集。使用pandas进行数据清洗,保存到分布式文件系统HDFS中,接下来使用PyFlink进行数据分析,最后,采用matplotlib进行可视化。
数据集和代码下载:从百度网盘下载本案例的代码和数据集。(提取码是ziyu)

一、实验环境

软件环境:
(1)Linux: Ubuntu22.04.4
(2)Hadoop: 3.3.5
(3)Flink:1.17.0
(4)Python:3.8.19
(5)Scala:2.12.15
(6)JDK1.8

(一)搭建PyFlink环境:

1、搭建Anaconda3环境
请参考Anaconda的下载和使用方法完成Anaconda3的安装。


2、安装Python的开发工具Jupyter Notebook
可以参考这篇博客安装Jupyter Notebook,具体步骤如下:
使用conda创建python虚拟环境

查看python版本:

安装jupyter notebook:

设置密码:

3、下载PyFlink
直接pip总是超时警报,因此下载whl文件,参考离线下载python库whl安装文件的网址_whl安装官网-CSDN博客

验证flink下载完成:

验证在Jupyter Notebook上可以使用PyFlink:

(二)运行前开启环境:

打开ssh:ssh localhost
打开hadoop:

打开flink:

进入python3.8.19虚拟环境:

打开Jupyter Notebook:

二、数据集

本次实验使用的数据集是某交易平台的二手车交易记录,数据来自阿里云天池数据集,下载链接:https://tianchi.aliyun.com/dataset/175540
本次实验使用该数据集的训练集,总数据量150000条,为方便处理,取数据前50000条。该数据集汇聚了31列丰富的变量信息,其中包括15列匿名变量,同时,为保护用户的隐私和数据安全,数据也对name、model、brand和regionCode等敏感信息进行脱敏处理。

三、数据预处理

(一)取训练集的前5万行数据

训练集总数据量150000条,整个数据集过大,为方便处理,取数据前50000条。

(二)按列名对数据重新分列

查看原始数据发现,数据只有一列包含所有变量:

为方便后续进行数据分析,对数据进行分列:

(三)去除空值

(四)去除重复值

(五)去除匿名变量


(六)去用“-”表示的空值

观察数据集发现,数据中的空值使用“-”表示,因此去除包含“-”值的数据条

(七)数据格式统一


(八)保存清洗后的数据到文本文件


(九)以上步骤见data_preprocessing.ipynb文件
(十)将清洗过的数据集上传到HDFS中:
参照Hadoop3.3.5安装教程_单机/伪分布式配置_Hadoop3.3.5/Ubuntu22.04(20.04/18.04/16.04) 进行Hadoop伪分布式配置,配置成功后将清洗后的数据cleaned_used_car.csv上传到HDFS中:

四、使用Flink对HDFS中的数据进行处理和分析

(一)简单统计:

1.车身类型统计。根据车身类型统计二手车数量:
先过滤掉以“SaleID”开头的行。map函数用于提取bodyType字段,并将其与计数1配对作为键值对。然后,使用key_by函数根据bodyType进行分组,最后使用sum函数来对每个bodyType进行计数。

输出键值对,前者表示bodyType的类型,后者表示对该类型的计数。

存储输出到bodyType_count.txt文件中。

2.变速箱类型统计。根据变速箱类型统计自动挡和手动挡的二手车数量:
具体步骤与上一方法相同,代码如下:

输出键值对,前者表示gearbox的类型,后者表示对该类型的计数。

存储输出到gearbox_count.txt文件中。

(二)复杂统计:

按不同车身类型(bodyType)统计每个价格区间的车辆数量,并将结果输出到文件中。如统计豪华轿车bodyType=1的价格分布,将价格在0-1000,1000-5000,5000-10000,10000-15000,15000-20000,20000+的数量
1.实现过程:
①定义价格区间函数price_range,根据价格将其分类到不同区间。

②定义一个通用函数process_bodytype,用于处理不同车身类型的数据:
过滤掉以"SaleID"开头的行。按逗号分割行数据,转换为数组。过滤指定车身类型的行。根据价格列的值映射到价格区间,计数1。按价格区间分组并对数量求和。将结果输出到文件,使用文件前缀和后缀区分不同车身类型。

③循环处理所有车身类型:循环处理车身类型0到7,调用process_bodytype函数处理每种车身类型的数据。

3.分组统计:按照燃油类型对二手车进行分组,计算不同燃油类型的二手车交易的平均值
按照燃油类型对二手车进行分组统计总价格和总数量:

计算每种燃油类型的平均价格:

输出是一个三元组,其中第一项是fuelType的类型,第二项是该fuelType类型的二手车的价格平均值,第三项是该fuelType类型的二手车的总数量。并把结果存到txt文件中。

五、对分析结果可视化

1.按照车身类型分类的数量统计

(1)读取文件,并计算每个车身类型的最大值,也即每个车身类型的数量

(2)画柱状图:



可知:二手车交易中,豪华轿车共11222辆,微型车共9018辆,厢型车共8190辆,大巴车共3924辆,敞篷车共2706辆,双门汽车共2182辆,商务车共1966辆,搅拌车共278辆。
(3)绘制饼图:

2.按照变速器类型分类的数量统计:

(1)读取文件,计算每个变速器类型的最大值,也即每个变速器类型的数量

(2)画饼图:

可见,手动挡的二手车共30230辆,占比76.6%,自动挡的二手车共9256辆,占比23.4%

3.根据数据绘制不同车身类型的价格分布图

使用glob模块查找文件,根据前缀查找存储不同车身类型的价格分布数据文件,读取文件内容并处理成DataFrame,返回价格和计数的最大值。

定义价格区间,将不同车身类型的价格分布绘制成折线图:

输出不同车身类型的价格区间分布:



可见,豪华轿车,微型车,厢型车,大巴车,敞篷车,双门汽车,商务车,搅拌车的价格大都分布在5000-1000元内,且5000-10000元内的车,微型车最多。

4.各燃油类型的二手车价均值分布


可见,二手车中,汽油车最多,共25223辆,其次是柴油车,共13541辆。而对于二手车平均价格来说,混合动力的二手车平均价格最高,约为11477元,其次是柴油车,平均价格为9214元。