访问林子雨编著《数据采集与预处理》教材官网
本文介绍在Linux系统中安装和使用Kafka的具体方法。
1.Kafka简介
Kafka是一种高吞吐量的分布式发布订阅消息系统,为了更好地理解和使用Kafka,这里介绍一下Kafka的相关概念:
(1)Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
(2)Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic,即可生产或消费数据,而不必关心数据存于何处。
(3)Partition:是物理上的概念,每个Topic包含一个或多个Partition。
(4)Producer:负责发布消息到Kafka Broker。
(5)Consumer:消息消费者,向Kafka Broker读取消息的客户端。
(6)Consumer Group:每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定Group Name,若不指定Group Name,则属于默认的Group。
2.安装Kafka
访问Kafka官网下载页面(https://kafka.apache.org/downloads),下载Kafka稳定版本kafka_2.12-2.4.0.tgz,前面的2.12就是支持的Scala版本号,后面的2.4.0是Kafka自身的版本号。假设下载后的文件被放在“~/Downloads”目录下。执行如下命令完成Kafka的安装:
cd ~/Downloads
sudo tar -zxf kafka_2.12-2.4.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.4.0 kafka
sudo chown -R hadoop ./kafka
3.启动Kafka
首先需要启动Kafka。请登录Linux系统(本教材统一使用hadoop用户登录),打开一个终端,输入下面命令启动Zookeeper服务:
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
注意,执行上面命令以后,终端窗口会返回一堆信息,然后就停住不动了,没有回到Shell命令提示符状态,这时,不要误以为死机了,而是Zookeeper服务器已经启动,正在处于服务状态。所以,不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。
请另外打开第二个终端,然后输入下面命令启动Kafka服务:
cd /usr/local/kafka
./bin/kafka-server-start.sh config/server.properties
同样,执行上面命令以后,终端窗口会返回一堆信息,然后就会停住不动,没有回到Shell命令提示符状态,这时,同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。所以,不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了。
当然,还有一种方式是采用下面加了“&”的命令:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties &
这样,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。不过,采用这种方式时,有时候我们往往就忘记了还有Kafka在后台运行,所以,建议暂时不要用这种命令形式。
4.创建Topic
再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:
cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 \
> --topic wordsender
然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181
5.创建生产者
下面用生产者(Producer)来产生一些数据,请在当前终端内继续输入下面命令:
./bin/kafka-console-producer.sh --broker-list localhost:9092 \
> --topic wordsender
上面命令执行后,就可以在当前终端(假设名称为“生产者终端”)内用键盘输入一些英文单词,比如可以输入:
hello hadoop
hello flink
这些单词就是数据源,会被Kafka捕捉到以后发送给消费者。
6.创建消费者
再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
cd /usr/local/kafka
bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 --topic wordsender --from-beginning
可以看到,屏幕上会显示出如下结果,也就是刚才在另外一个终端里面输入的内容:
hello hadoop
hello flink