在Windows10中使用Kafka采集数据保存到Redis数据库中

大数据学习路线图

作者:厦门大学计算机系林子雨副教授
说明:本博客是与林子雨编著《数据采集与预处理》教材配套的教学资料。
版权声明:本博客内容,未经同意,禁止转载。
操作系统:Windows10
Redis:7.0.8

一、Redis简介

Redis是一个键值(key-value)存储系统,即键值对非关系型数据库,和Memcached类似,目前正在被越来越多的互联网公司采用。Redis作为一个高性能的键值数据库,不仅在很大程度上弥补了memcached这类键值存储的不足,而且在部分场合下可以对关系数据库起到很好的补充作用。Redis提供了Python、Ruby、Erlang、PHP客户端,使用很方便。
  Redis支持存储的值(value)类型包括string(字符串)、list(链表)、set(集合)和zset(有序集合)。这些数据类型都支持push/pop、add/remove以及取交集、并集和差集等丰富的操作,而且这些操作都是原子性的。在此基础上,Redis支持各种不同方式的排序。与memcached一样,为了保证效率,Redis中的数据都是缓存在内存中的,它会周期性地把更新的数据写入磁盘,或者把修改操作写入追加的记录文件;此外,Redis还实现了主从(master-slave)同步。

二、安装Redis

到github网站下载Redis for Windows安装文件Redis-7.0.8-Windows-x64.tar.gz(下载地址:https://github.com/redis-windows/redis-windows/releases/tag/7.0.8 ),解压缩到指定目录,比如解压缩到C盘根目录下(如图1所示)。

图1 Redis安装目录
打开一个cmd窗口,输入如下命令:

> cd C:\Redis-7.0.8-Windows-x64
> redis-server redis.conf

执行上述命令以后,会出现如图1所示结果。这个cmd窗口不能关闭,如果关闭,Redis服务就停止了。

图2 Redis启动界面
新建一个cmd窗口,输入如下命令启动Redis客户端:

> cd C:\Redis-7.0.8-Windows-x64
> redis-cli.exe -h 127.0.0.1 -p 6379

启动以后的效果如图3所示。客户端连上服务器之后,会显示“127.0.0.1:6379>”的命令提示符信息,表示服务器的IP地址为127.0.0.1,端口为6379。

图3 Redis客户端启动后的效果

三、Redis操作实例

假设有三个表,即Student、Course和SC,三个表的字段(列)和数据如图4所示。

(a)Student表

(b)Course表

(c)SC表
图4 三个表的字段和数据
Redis数据库是以<key,value>的形式存储数据,把三个表的数据存入Redis数据库时,key和value的确定方法如下:

key=表名:主键值:列名
value=列值

例如,把每个表的第一行记录保存到Redis数据中,需要执行的命令和执行结果如图5所示。

图5 向Redis中插入数据
可以执行类似的命令,把三个表所有数据都插入到Redis数据库中,完整命令如下:

set Student:95001:Sname 李勇
set Student:95001:Ssex 男
set Student:95001:Sage 22
set Student:95001:Sdept CS

set Student:95002:Sname 刘晨
set Student:95002:Ssex 女
set Student:95002:Sage 19
set Student:95002:Sdept IS

set Student:95003:Sname 王敏
set Student:95003:Ssex 女
set Student:95003:Sage 18
set Student:95003:Sdept MA

set Student:95004:Sname 张立
set Student:95004:Ssex 男
set Student:95004:Sage 19
set Student:95004:Sdept IS

set Course:1:Cname 数据库
set Course:1:Credit 4

set Course:2:Cname 数学
set Course:2:Credit 2

set Course:3:Cname 信息系统
set Course:3:Credit 4

set Course:4:Cname 操作系统
set Course:4:Credit 3

set Course:5:Cname 数据结构
set Course:5:Credit 4

set Course:6:Cname 数据处理
set Course:6:Credit 2

set Course:7:Cname PASCAL语言
set Course:7:Credit 4

set SC:95001:1:Grade 92
set SC:95001:2:Grade 85
set SC:95001:3:Grade 88
set SC:95002:2:Grade 90
set SC:95002:3:Grade 80

然后,针对这些已经录入的数据,下面将简单演示如何进行增删改查操作。Redis支持5种数据类型,不同数据类型,增删改查可能不同,这里用最简单的数据类型字符串作为演示。

1. 插入数据

向Redis插入一条数据,只需要先设计好key和value,然后用set命令插入数据即可。例如,在Course表中插入一门新的课程“算法”,4学分,操作命令和结果如图6所示。

图6 插入数据

2. 修改数据

Redis并没有修改数据的命令,所以,如果在Redis中修改一条数据,只能采用一种变通的方式,即在使用set命令时,使用同样的key,然后用新的value值来覆盖旧的数据。例如,把刚才新添加的“算法”课程名称修改为“编译原理”,操作命令和结果如图7所示。

图7 修改数据

从图7可以看出,当存入中文时,查询结果是一个编码,并不是中文。此时, 新建一个cmd窗口,使用如下命令把值存储到一个文本文件1.txt中,则在1.txt中又可以看到中文的“编译原理”:

>  cd C:\Redis-7.0.8-Windows-x64
>  redis-cli get Course:8:Cname > 1.txt

3. 删除数据

Redis有专门删除数据的命令——del命令,命令格式为“del 键”。所以,如果要删除之前新增的课程“编译原理”,只需输入命令“del Course:8:Cname” ,如图8所示,当输入“del Course:8:Cname” 时,返回“1”,说明成功删除一条数据,当再次输入get命令时,输出为空,说明删除成功。

图8 删除数据

4. 查询数据

Redis最简单的查询方式是使用get命令,上面几个操作中都已经使用过get命令,这里不再赘述。

四、使用Python操作Redis

要使用python 操作Redis ,需要先安装python的redis组件,安装命令如下:

> pip install redis

打开IDLE,在命令提示符后面执行如下语句操作Redis:

>>> import redis
>>> r = redis.Redis(host='localhost', port=6379, db=0)
>>> r.set('foo', 'bar')
True
>>> r.get('foo')
b'bar'
>>>

也可以编写代码文件操作Redis,具体代码如下:

import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.set('university', 'XMU')
print(str(r.get('university'),"utf-8"))

五、使用Kafka采集数据保存到Redis数据库中

(一)任务描述

在“C:\Python38\mycode”目录下有一个student.txt文件,里面包含如下两行内容:

95009,Xiaoming,Male,21,CS
95010,Xiaowang,Female,22,MA

现在需要编写程序,使用Kafka采集student.txt中的数据,并对数据进行解析,然后保存到Redis数据库中。

(二)实现代码

要使用python 操作Redis ,需要先安装python的redis组件,安装命令如下:

> pip install redis

在“C:\Python38\mycode”目录下新建一个代码文件kafka-redis-producer.py,其内容如下:

from kafka import KafkaProducer
import json

print("this is producer")
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
txtFilePath = 'student.txt'

data = []
with open(txtFilePath, "rb") as txtfile:
    data = txtfile.readlines()
    for rec in data:
        # Topic为'redisTopic' 消息内容为读取的student.txt文件的一行
        list_data = str(rec,"utf-8").replace('\\n','').replace('\n','').replace('\r','').split(',')
        dict_data = {"sno":list_data[0],"sname":list_data[1],"ssex":list_data[2],"sage":list_data[3],"sdept":list_data[4]}
        print(dict_data)
value=json.dumps(dict_data).encode(encoding="utf-8")
        producer.send('redisTopic', key=b'123', value=value)

在“C:\Python38\mycode”目录下新建一个代码文件Kafka-redis-consumer.py,其内容如下:

import json
import redis
from kafka import KafkaConsumer

pool=redis.ConnectionPool(host='localhost',port=6379,db=0)
r=redis.Redis(connection_pool=pool, charset='UTF-8', encoding='UTF-8')

#写入Redis 
def dict2redis(d):
    print("store to redis")
    r.set("Student:"+d["sno"]+":Sname",d["sname"])
    r.set("Student:"+d["sno"]+":Ssex",d["ssex"])
    r.set("Student:"+d["sno"]+":Sage",d["sage"])
    r.set("Student:"+d["sno"]+":Sdept",d["sdept"])    

consumer = KafkaConsumer('redisTopic',bootstrap_servers=['localhost:9092'])
for message in consumer:    
    dict=json.loads(str(message.value.decode('utf-8')))
    print(dict)
    dict2redis(dict)

(三)执行过程

首先要确保Redis数据库服务已经启动。
在Windows系统中打开第1个cmd窗口,执行如下命令启动Zookeeper服务:

> cd c:\kafka_2.12-2.4.0
> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties

打开第2个cmd窗口,然后执行下面命令启动Kafka服务:

> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-server-start.bat .\config\server.properties

打开第3个cmd窗口,执行如下命令运行kafka-redis-consumer.py:

> cd C:\Python38\mycode
> python kafka-redis-consumer.py

打开IDLE,执行代码文件kafka-redis-producer.py,让生产者产生数据(如图9所示)。需要注意的是,运行kafka-redis-producer.py时,会自动创建redisTopic,不需要手动创建。这时,在消费者窗口,会打印出接收到的数据和程序执行情况(如图10所示)。

图9 生成数据

图10 消费者窗口
打开第4个cmd窗口,输入如下命令启动Redis客户端:

> cd C:\Redis-7.0.8-Windows-x64
> redis-cli.exe -h 127.0.0.1 -p 6379

在Redis数据库中执行get命令查询数据,如图11所示,数据已经成功更新到Redis数据库中。

图11 查询Redis数据库中的数据