作者:厦门大学计算机系林子雨副教授
说明:本博客是与林子雨编著《数据采集与预处理》教材配套的教学资料。
版权声明:本博客内容,未经同意,禁止转载。
操作系统:Windows10
Kafka:kafka_2.12-2.4.0
MongoDB6.0:请参考教程《在Windows10系统中安装和使用MongDB6.0》
一、任务描述
在“C:\Python38\mycode”目录下有一个stat.csv文件,里面包含如下两行内容:
1,2023-1-21,Ziyu,Lin
2,2023-1-22,Shufan,Lin
现在需要编写程序,使用Kafka采集stat.csv中的数据,并对数据进行解析,然后保存到MongoDB数据库中。
二、实现代码
在“C:\Python38\mycode”目录下新建一个代码文件producer.py,其内容如下:
from kafka import KafkaProducer
print("this is producer")
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
csvFilePath = 'stat.csv'
data = []
with open(csvFilePath, "rb") as csvfile:
data = csvfile.readlines()
for rec in data:
# Topic为'csvdata' 消息内容为读取的CSV文件的一行
producer.send('csvDataTopic', rec)
在“C:\Python38\mycode”目录下新建一个代码文件consumer.py,其内容如下:
from kafka import KafkaConsumer
from pymongo import MongoClient
print("this is consumer")
consumer = KafkaConsumer('csvDataTopic',bootstrap_servers=['localhost:9092'])
client = MongoClient(port=27017)
db = client.db
result = {}
csv_data = []
header_arr = ['id','timestamp','first_name','last_name']
for message in consumer:
# 收到的订阅消息处理
print(str(message.value))
csv_data = str(message.value).split(',')
data = {header_arr[i] : str(csv_data[i]) for i in range(len(header_arr))}
result = db.csvstats.insert_one(data)
三、执行过程
在Windows系统中打开第1个cmd窗口,执行如下命令启动Zookeeper服务:
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties
打开第2个cmd窗口,然后执行下面命令启动Kafka服务:
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-server-start.bat .\config\server.properties
打开第3个cmd窗口,执行如下命令创建一个名为test的Topic:
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic csvDataTopic
执行如下命令查看csvDataTopic是否创建成功:
> .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
打开第4个cmd窗口,执行如下命令启动消费者:
> cd C:\Python38\mycode
> python ./consumer.py
打开IDLE,在IDLE中打开文件producer.py并执行,从而让生产者产生数据。
打开第5个cmd窗口,执行如下命令进入MongoDB Shell执行环境:
> mongosh
然后,在MongoDB Shell环境中执行如下命令查看数据:
>show dbs;
>use db;
>show collections;
>db.csvstats.find();
查询结果图如1所示。
图1 MongoDB数据库查询结果