Kafka采集数据保存到MongoDB中

大数据学习路线图

作者:厦门大学计算机系林子雨副教授
说明:本博客是与林子雨编著《数据采集与预处理》教材配套的教学资料。
版权声明:本博客内容,未经同意,禁止转载。
操作系统:Windows10
Kafka:kafka_2.12-2.4.0
MongoDB6.0:请参考教程《在Windows10系统中安装和使用MongDB6.0

一、任务描述

在“C:\Python38\mycode”目录下有一个stat.csv文件,里面包含如下两行内容:

1,2023-1-21,Ziyu,Lin
2,2023-1-22,Shufan,Lin

现在需要编写程序,使用Kafka采集stat.csv中的数据,并对数据进行解析,然后保存到MongoDB数据库中。

二、实现代码

在“C:\Python38\mycode”目录下新建一个代码文件producer.py,其内容如下:

from kafka import KafkaProducer

print("this is producer")
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
csvFilePath = 'stat.csv'

data = []
with open(csvFilePath, "rb") as csvfile:
    data = csvfile.readlines()
    for rec in data:
        # Topic为'csvdata' 消息内容为读取的CSV文件的一行        
        producer.send('csvDataTopic', rec)        

在“C:\Python38\mycode”目录下新建一个代码文件consumer.py,其内容如下:

from kafka import KafkaConsumer
from pymongo import MongoClient

print("this is consumer")
consumer = KafkaConsumer('csvDataTopic',bootstrap_servers=['localhost:9092'])
client = MongoClient(port=27017)
db = client.db

result = {}
csv_data = []
header_arr = ['id','timestamp','first_name','last_name']

for message in consumer:
    # 收到的订阅消息处理
    print(str(message.value))
    csv_data = str(message.value).split(',')
    data = {header_arr[i] : str(csv_data[i]) for i in range(len(header_arr))}
    result = db.csvstats.insert_one(data)   

三、执行过程

在Windows系统中打开第1个cmd窗口,执行如下命令启动Zookeeper服务:

> cd c:\kafka_2.12-2.4.0
> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties

打开第2个cmd窗口,然后执行下面命令启动Kafka服务:

> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-server-start.bat .\config\server.properties

打开第3个cmd窗口,执行如下命令创建一个名为test的Topic:

> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic csvDataTopic

执行如下命令查看csvDataTopic是否创建成功:

> .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

打开第4个cmd窗口,执行如下命令启动消费者:

> cd C:\Python38\mycode
> python ./consumer.py

打开IDLE,在IDLE中打开文件producer.py并执行,从而让生产者产生数据。
打开第5个cmd窗口,执行如下命令进入MongoDB Shell执行环境:

> mongosh

然后,在MongoDB Shell环境中执行如下命令查看数据:

>show dbs;
>use db;
>show collections;
>db.csvstats.find();

查询结果图如1所示。

图1 MongoDB数据库查询结果