返回本案例首页
查看前一步骤操作步骤一:实验环境准备
《Spark+Kafka构建实时分析Dashboard案例——步骤二:数据处理和Python操作Kafka》
开发团队:厦门大学数据库实验室 联系人:林子雨老师ziyulin@xmu.edu.cn
版权声明:版权归厦门大学数据库实验室所有,请勿用于商业用途;未经授权,其他网站请勿转载
本教程介绍大数据课程实验案例“Spark+Kafka构建实时分析Dashboard”的第二个步骤,数据处理和Python操作Kafka。在本篇博客中,首先介绍如何预处理数据,以及如何使用Python操作Kafka。
所需知识储备
简单使用Python,了解Kafka的使用
训练技能
Python基本使用,Python操作Kafka代码库kafka-python使用
任务清单
- 利用Python预处理数据
- Python操作Kafka
数据预处理
数据集介绍
本案例采用的数据集压缩包为data_format.zip点击这里下载data_format.zip数据集,该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个案例中只是用user_log.csv这个文件,下面列出文件user_log.csv的数据格式定义:
用户行为日志user_log.csv,日志中的字段定义如下:
1. user_id | 买家id
2. item_id | 商品id
3. cat_id | 商品类别id
4. merchant_id | 卖家id
5. brand_id | 品牌id
6. month | 交易时间:月
7. day | 交易事件:日
8. action | 行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示购买,3表示关注商品
9. age_range | 买家年龄分段:1表示年龄=50,0和NULL则表示未知
10. gender | 性别:0表示女性,1表示男性,2和NULL表示未知
11. province| 收获地址省份
数据具体格式如下:
user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,323294,833,2882,2661,08,29,0,0,1,内蒙古
328862,844400,1271,2882,2661,08,29,0,1,1,山西
328862,575153,1271,2882,2661,08,29,0,2,1,山西
328862,996875,1271,2882,2661,08,29,0,1,1,内蒙古
328862,1086186,1271,1253,1049,08,29,0,0,2,浙江
328862,623866,1271,2882,2661,08,29,0,0,2,黑龙江
328862,542871,1467,2882,2661,08,29,0,5,2,四川
328862,536347,1095,883,1647,08,29,0,7,1,吉林
这个案例实时统计每秒中男女生购物人数,因此针对每条购物日志,我们只需要获取gender即可,然后发送给Kafka,接下来Structured Streaming再接收gender进行处理。
数据预处理
本案例使用Python对数据进行预处理,并将处理后的数据直接通过Kafka生产者发送给Kafka,这里需要先安装Python操作Kafka的代码库,请在Ubuntu中打开一个命令行终端,执行如下Shell命令来安装Python操作Kafka的代码库(备注:如果之前已经安装过,则这里不需要安装):
conda activate env_name #这里env_name应替换为创建的conda环境名称
conda install kafka-python
接着可以写如下Python代码,文件名为producer.py:(具体的工程文件结构参照步骤一)
# coding: utf-8
import csv
import time
from kafka import KafkaProducer
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 打开数据文件
csvfile = open("../data/user_log.csv","r")
# 生成一个可用于读取csv文件的reader
reader = csv.reader(csvfile)
for line in reader:
gender = line[9] # 性别在每行日志代码的第9个元素
if gender == 'gender':
continue # 去除第一行表头
time.sleep(0.1) # 每隔0.1秒发送一行数据
# 发送数据,topic为'sex'
producer.send('sex',line[9].encode('utf8'))
上述代码很简单,首先是先实例化一个Kafka生产者。然后读取用户日志文件,每次读取一行,接着每隔0.1秒发送给Kafka,这样1秒发送10条购物日志。这里发送给Kafka的topic为'sex'。
Python操作Kafka
我们可以写一个KafkaConsumer测试数据是否投递成功,代码如下,文件名为consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('sex')
for msg in consumer:
print((msg.value).decode('utf8'))
在开启上述KafkaProducer和KafkaConsumer之前,需要先开启Kafka,命令如下:
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
打开一个新的命令行窗口,输入命令如下:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
在Kafka开启之后,即可开启KafkaProducer和KafkaConsumer。开启方法如下:
请在Ubuntu中,打开一个命令行 终端窗口,执行如下命令:
cd /home/hadoop/mydir/labproject/scripts #进入到代码目录
python3 producer.py #启动生产者发送消息给Kafaka
然后,请在Ubuntu中,打开另外一个命令行 终端窗口,执行如下命令:
cd /home/hadoop/mydir/labproject/scripts #进入到代码目录
python3 consumer.py #启动消费者从Kafaka接收消息
运行上面这条命令以后,这时,你会看到屏幕上会输出一行又一行的数字,类似下面的样子:
2
1
1
1
2
0
2
1
……
当然,也可以不用上面这种命令行方式来启动生产者和消费者,如果你目前使用的是开发工具PyCharm,则也可以直接在代码区域右键,点击Run 'producer'以及Run 'consumer',来运行生产者和消费者。如果生产者和消费者运行成功,则在consumer窗口会输出如下信息:
如果有上述的输出,恭喜你,Python操作Kafka运行成功。接下来,下篇文章将分析Structured Streaming如何处理Kafka的实时数据。
下篇文章的链接为Spark+Kafka构建实时分析Dashboard案例——步骤三:Structured Streaming实时处理数据(scala版本)
下篇文章的链接为Spark+Kafka构建实时分析Dashboard案例——步骤三:Structured Streaming实时处理数据(python版本)