代码-第4章 分布式消息系统Kafka-林子雨编著《数据采集与预处理》

大数据学习路线图

林子雨编著《数据采集与预处理》教材配套代码(教材官网
查看所有章节代码

第4章 分布式消息系统Kafka

> cd c:\kafka_2.12-2.4.0
> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-server-start.bat .\config\server.properties
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_test
> .\bin\windows\kafka-topics.bat  --list  --zookeeper  localhost:2181
> .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic_test
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic_test --from-beginning
> pip install kafka-python
> pip list

producer_test.py

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')  # 连接Kafka

msg = "Hello World".encode('utf-8')  # 发送内容,必须是bytes类型
producer.send('test', msg)  # 发送的topic为test
producer.close()

consumer_test.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='smallest')
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print(recv)

score.csv

"Name","Score"
"Zhang San",99.0
"Li Si",45.5
"Wang Hong",82.5
"Liu Qian",76.0
"Ma Li",62.5
"Shen Teng",78.0
"Pu Wen",86.5

kafka_demo.py

# kafka_demo.py
import sys
import json
import pandas as pd
import os
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

KAFKA_HOST = "localhost" #服务器地址
KAFKA_PORT = 9092    #端口号
KAFKA_TOPIC = "topic0"  #topic

data=pd.read_csv(os.getcwd()+'\\score.csv')
key_value=data.to_json()

class Kafka_producer():
    def __init__(self, kafkahost, kafkaport, kafkatopic, key):
      self.kafkaHost = kafkahost
      self.kafkaPort = kafkaport
      self.kafkatopic = kafkatopic
      self.key = key
      self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format(
          kafka_host=self.kafkaHost,
          kafka_port=self.kafkaPort)
       )
    def sendjsondata(self, params):
         try:
            parmas_message = params
            producer = self.producer
            producer.send(self.kafkatopic, key=self.key, value=parmas_message.encode('utf-8'))
            producer.flush()
         except KafkaError as e:
             print(e)

class Kafka_consumer():
    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid,key):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.key = key
        self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
           bootstrap_servers='{kafka_host}:{kafka_port}'.format(
           kafka_host=self.kafkaHost,
           kafka_port=self.kafkaPort)
         )
    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
        except KeyboardInterrupt as e:
                print(e)

def sortedDictValues(adict):
    items = adict.items()
    items=sorted(items,reverse=False)
    return [value for key, value in items]

def main(xtype, group, key):
    if xtype == "p":
        # 生产模块
        producer = Kafka_producer(KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC, key)
        print("===========> producer:", producer)
        params =key_value        
        producer.sendjsondata(params)
    if xtype == 'c':
        # 消费模块
        consumer = Kafka_consumer(KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC, group,key)
        print("===========> consumer:", consumer)
        message = consumer.consume_data()
        for msg in message:
            msg=msg.value.decode('utf-8')
            python_data=json.loads(msg) #字符串转换成字典
            key_list=list(python_data)
            test_data=pd.DataFrame()
            for index in key_list: 
                if index=='Name':
                    a1=python_data[index]
                    data1 = sortedDictValues(a1)
                    test_data[index]=data1
                else:
                    a2 = python_data[index]
                    data2 = sortedDictValues(a2)
                    test_data[index] = data2
            print(test_data)

if __name__ == '__main__':
    main(xtype='p',group='py_test',key=None)
    main(xtype='c',group='py_test',key=None)

producer_json.py

# producer_json.py
from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))  # 连接kafka

data={
  "sno":"95001",
  "name":"John",
  "sex":"M",
  "age":23
 }

producer.send('json_topic', data)  # 发送的topic为json_topic
producer.close()

consumer_json.py

# consumer_json.py
from kafka import KafkaConsumer
import json
import pymysql.cursors

consumer = KafkaConsumer('json_topic', bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
for msg in consumer:     
    msg1=str(msg.value, encoding = "utf-8")    #字节数组转成字符串
    dict = json.loads(msg1)    #字符串转换成字典
    # 连接数据库
    connect = pymysql.Connect(
    host='localhost',
    port=3306,
    user='root',  # 数据库用户名
    passwd='123456',  # 密码
    db='school',
    charset='utf8'
    )

    # 获取游标
    cursor = connect.cursor()

    # 插入数据
    sql = "INSERT INTO student(sno,sname,ssex,sage) VALUES ('%s', '%s', '%s', %d)"
    data = (dict['sno'],dict['name'],dict['sex'],dict['age'])
    cursor.execute(sql % data)
    connect.commit()
    print('成功插入数据')

    # 关闭数据库连接
    connect.close()
mysql> CREATE DATABASE school;
mysql> USE school;
mysql>CREATE TABLE student(
    -> sno char(5),
    -> sname char(10),
    -> ssex char(2),
    -> sage int);
mysql> SHOW TABLES;
mysql> SELECT * FROM student;