林子雨编著《数据采集与预处理》教材配套代码(教材官网)
查看所有章节代码
第4章 分布式消息系统Kafka
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-server-start.bat .\config\server.properties
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_test
> .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
> .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic_test
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic_test --from-beginning
> pip install kafka-python
> pip list
producer_test.py
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092') # 连接Kafka
msg = "Hello World".encode('utf-8') # 发送内容,必须是bytes类型
producer.send('test', msg) # 发送的topic为test
producer.close()
consumer_test.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='smallest')
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print(recv)
score.csv
"Name","Score"
"Zhang San",99.0
"Li Si",45.5
"Wang Hong",82.5
"Liu Qian",76.0
"Ma Li",62.5
"Shen Teng",78.0
"Pu Wen",86.5
kafka_demo.py
# kafka_demo.py
import sys
import json
import pandas as pd
import os
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
KAFKA_HOST = "localhost" #服务器地址
KAFKA_PORT = 9092 #端口号
KAFKA_TOPIC = "topic0" #topic
data=pd.read_csv(os.getcwd()+'\\score.csv')
key_value=data.to_json()
class Kafka_producer():
def __init__(self, kafkahost, kafkaport, kafkatopic, key):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.key = key
self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort)
)
def sendjsondata(self, params):
try:
parmas_message = params
producer = self.producer
producer.send(self.kafkatopic, key=self.key, value=parmas_message.encode('utf-8'))
producer.flush()
except KafkaError as e:
print(e)
class Kafka_consumer():
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid,key):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.key = key
self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
bootstrap_servers='{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort)
)
def consume_data(self):
try:
for message in self.consumer:
yield message
except KeyboardInterrupt as e:
print(e)
def sortedDictValues(adict):
items = adict.items()
items=sorted(items,reverse=False)
return [value for key, value in items]
def main(xtype, group, key):
if xtype == "p":
# 生产模块
producer = Kafka_producer(KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC, key)
print("===========> producer:", producer)
params =key_value
producer.sendjsondata(params)
if xtype == 'c':
# 消费模块
consumer = Kafka_consumer(KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC, group,key)
print("===========> consumer:", consumer)
message = consumer.consume_data()
for msg in message:
msg=msg.value.decode('utf-8')
python_data=json.loads(msg) #字符串转换成字典
key_list=list(python_data)
test_data=pd.DataFrame()
for index in key_list:
if index=='Name':
a1=python_data[index]
data1 = sortedDictValues(a1)
test_data[index]=data1
else:
a2 = python_data[index]
data2 = sortedDictValues(a2)
test_data[index] = data2
print(test_data)
if __name__ == '__main__':
main(xtype='p',group='py_test',key=None)
main(xtype='c',group='py_test',key=None)
producer_json.py
# producer_json.py
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8')) # 连接kafka
data={
"sno":"95001",
"name":"John",
"sex":"M",
"age":23
}
producer.send('json_topic', data) # 发送的topic为json_topic
producer.close()
consumer_json.py
# consumer_json.py
from kafka import KafkaConsumer
import json
import pymysql.cursors
consumer = KafkaConsumer('json_topic', bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
for msg in consumer:
msg1=str(msg.value, encoding = "utf-8") #字节数组转成字符串
dict = json.loads(msg1) #字符串转换成字典
# 连接数据库
connect = pymysql.Connect(
host='localhost',
port=3306,
user='root', # 数据库用户名
passwd='123456', # 密码
db='school',
charset='utf8'
)
# 获取游标
cursor = connect.cursor()
# 插入数据
sql = "INSERT INTO student(sno,sname,ssex,sage) VALUES ('%s', '%s', '%s', %d)"
data = (dict['sno'],dict['name'],dict['sex'],dict['age'])
cursor.execute(sql % data)
connect.commit()
print('成功插入数据')
# 关闭数据库连接
connect.close()
mysql> CREATE DATABASE school;
mysql> USE school;
mysql>CREATE TABLE student(
-> sno char(5),
-> sname char(10),
-> ssex char(2),
-> sage int);
mysql> SHOW TABLES;
mysql> SELECT * FROM student;