Kafka和Structured Streaming的组合使用(Spark 3.2.0)

大数据学习路线图

本文节选自林子雨编著《Spark编程基础(Scala版)》(教材官网:http://dblab.xmu.edu.cn/post/spark/

作者:厦门大学计算机科学与技术系 林子雨 博士/副教授
E-mail: ziyulin@xmu.edu.cn

Structured Streaming是用来进行流计算的组件,可以把Kafka(或Flume)作为数据源,让Kafka(或Flume)产生数据发送给Structured Streaming应用程序,Structured Streaming应用程序再对接收到的数据进行实时处理,从而完成一个典型的流计算过程。这里仅以Kafka为例进行介绍。这里使用的软件版本是:kafka_2.12-2.6.0,Spark3.2.0(Scala版本是2.12.15)。

1.Kafka简介

Kafka是一种高吞吐量的分布式发布订阅消息系统,为了更好地理解和使用Kafka,这里介绍一下Kafka的相关概念:
(1)Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
(2)Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic,即可生产或消费数据,而不必关心数据存于何处。
(3)Partition:是物理上的概念,每个Topic包含一个或多个Partition。
(4)Producer:负责发布消息到Kafka Broker。
(5)Consumer:消息消费者,向Kafka Broker读取消息的客户端。
(6)Consumer Group:每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定Group Name,若不指定Group Name,则属于默认的Group。

2. 安装Kafka

访问Kafka官网下载页面(https://kafka.apache.org/downloads),下载Kafka稳定版本kafka_2.12-2.6.0.tgz。为了让Spark Streaming应用程序能够顺利使用Kafka数据源,在下载Kafka安装文件的时候要注意,Kafka版本号一定要和自己电脑上已经安装的Scala版本号一致才可以。本教材安装的Spark版本号是3.2.0,Scala版本号是2.12,所以,一定要选择Kafka版本号是2.12开头的。例如,到Kafka官网中,可以下载安装文件kafka_2.12-2.6..0.tgz,前面的2.12就是支持的Scala版本号,后面的2.6.0是Kafka自身的版本号。假设下载后的文件被放在“~/Downloads”目录下。执行如下命令完成Kafka的安装:

cd ~/Downloads
sudo tar -zxf  kafka_2.12-2.6.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.6.0 kafka
sudo chown -R hadoop ./kafka

3.准备相关jar包

访问MVNREPOSITORY官网(https://mvnrepository.com),下载spark-sql-kafka-0-10_2.12-3.2.0.jar、kafka-clients-2.6.0.jar、commons-pool2-2.9.0.jar和spark-token-provider-kafka-0-10_2.12-3.2.0.jar文件,将其放到“/usr/local/spark/jars”目录下。
spark-sql-kafka-0-10_2.12-3.2.0.jar文件下载页面:https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.2.0
kafka-clients-2.6.0.jar文件下载页面:https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.6.0
commons-pool2-2.9.0.jar文件下载页面:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2/2.9.0
spark-token-provider-kafka-0-10_2.12-3.2.0.jar文件下载页面:https://mvnrepository.com/artifact/org.apache.spark/spark-token-provider-kafka-0-10_2.12/3.2.0

进入下载页面以后,如下图所示,点击红色箭头指向的“jar”,就可以下载JAR包了。

4. 启动Kafka

首先需要启动Kafka。在Linux系统中新建一个终端(记作“Zookeeper终端”),输入下面命令启动Zookeeper服务:

cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。另外打开第二个终端(记作“Kafka终端”),然后输入下面命令启动Kafka服务:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了。
再新开一个终端,创建名为wordcount-topic 的主题:

cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 \
> --topic wordcount-topic

创建名为wordcount-result-topic 的主题:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 \
> --topic wordcount-result-topic

再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:

cd /usr/local/kafka
bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 --topic wordcount-topic

再新开一个终端(记作“监控输出终端”),执行如下命令监控输出的结果文本:

cd /usr/local/kafka
bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 --topic wordcount-result-topic

5.编写生产者(Producer)程序

在终端中执行以下命令创建代码目录和代码文件:

cd /usr/local/spark/mycode/structuredstreaming
mkdir kafka
cd kafka
mkdir src/main/scala
cd src/main/scala
vim KafkaDataProducer.scala

在KafkaDataProducer.scala中输入以下代码:

package org.apache.spark.example.kafka
import java.util.{Properties, Random}
import org.apache.kafka.clients.producer._
import java.io._

object KafkaDataProducer{
    def main(args:Array[String]){
        val prop = new Properties
        prop.put("bootstrap.servers","localhost:9092")
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[Nothing, String](prop)
        while(true) {
            //生成字符串
            var lowercase:Array[Char] = new Array[Char](0)
            for( i <- 'a' to 'z') {
                lowercase = lowercase ++ Array(i)
            }
            var str = util.Random.shuffle(lowercase.toList).take(2)
            val word = new StringBuffer
            for(j <- str){
                word.append(j)
            }
            //word.toString是字符串
            var message = new ProducerRecord("wordcount-topic",word.toString)
            producer.send(message)
            Thread.sleep(100)
        }
    }
}

在上面的代码里,使用util.Random.shuffle随机选择小写字母列表中的两个字母,并通过StringBuffer的append操作把两个小写字母进行连接,再toString得到一个包含两个字母的单词。

6.编写消费者(Consumer)程序

在终端中执行以下命令创建KafkaDataConsumer.scala代码文件:

cd /usr/local/spark/mycode/structuredstreaming/kafka
cd src/main/scala
vim KafkaDataConsumer.scala

在KafkaDataConsumer.scala中输入以下代码:

package org.apache.spark.example.kafka
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object KafkaDataConsumer{
    def main(args: Array[String]) {
        val spark = SparkSession
            .builder
            .appName("StructuredKafkaWordCount")
            .getOrCreate()

        spark.sparkContext.setLogLevel("ERROR")

        val lines = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "wordcount-topic")
            .load()
            .selectExpr("CAST(value AS STRING)")

        import spark.implicits._
        val wordCounts = lines.groupBy("value").count()

        val query = wordCounts 
            .selectExpr("CAST(value AS STRING) as key", "CONCAT(CAST(value AS STRING), ':', CAST(count AS STRING)) as value") 
            .writeStream
            .outputMode("complete") 
            .format("kafka") 
            .option("kafka.bootstrap.servers", "localhost:9092") 
            .option("topic", "wordcount-result-topic") 
            .option("checkpointLocation", "file:///tmp/kafka-sink-cp") 
            .trigger(Trigger.ProcessingTime("8 seconds")) 
            .start()

        query.awaitTermination()
    }
}

KafkaDataConsumer.scala代码中存在2个selectExpr。第1个selectExpr将Kafka的主题wordcount-topic内的value转换成字符串,然后对其在一个触发周期(8秒)进行groupBy操作,并用count()操作得到统计结果。为了观察输出,将结果value写到主题wordcount-result-topic的key内,并通过CONCAT函数将value、冒号和计数拼接后写入Kafka的value内。最后,在“监控输出终端”内输出Kafka的主题wordcount-result-topic的value值。

7.编译、打包、运行程序

在终端中执行以下命令新建一个simple.sbt文件:

cd /usr/local/spark/mycode/structuredstreaming/kafka
vim simple.sbt

在simple.sbt文件中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.0"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.2.0" % Test
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"

然后,执行如下命令对程序进行编译打包:

cd /usr/local/spark/mycode/structuredstreaming/kafka
/usr/local/sbt/sbt package

打包成功后,新建一个终端(记作“生产者终端”),在终端中执行如下命令运行生产者程序:

cd /usr/local/spark/mycode/structuredstreaming/kafka
/usr/local/spark/bin/spark-submit \
> --jars "/usr/local/kafka/libs/*"
> --class "org.apache.spark.example.kafka.KafkaDataProducer" \
> ./target/scala-2.12/simple-project_2.12-1.0.jar

此时在“监控输入终端”就可以看到生产者程序持续产生的由两个单词组成的单词。
再新建一个终端(记作“流计算终端”),在终端中执行如下命令运行消费者程序:

cd /usr/local/spark/mycode/structuredstreaming/kafka
/usr/local/spark/bin/spark-submit \
> --jars "/usr/local/kafka/libs/*:/usr/local/spark/jars/*"
> --class "org.apache.spark.example.kafka.KafkaDataConsumer" \
> ./target/scala-2.12/simple-project_2.12-1.0.jar

消费者程序运行起来以后,可以在“监控输出终端”看到类似如下的输出结果:
sq:3
bl:6
lo:8