本文节选自林子雨编著《Spark编程基础(Scala版)》(教材官网:http://dblab.xmu.edu.cn/post/spark/)
作者:厦门大学计算机科学与技术系 林子雨 博士/副教授
E-mail: ziyulin@xmu.edu.cn
Structured Streaming是用来进行流计算的组件,可以把Kafka(或Flume)作为数据源,让Kafka(或Flume)产生数据发送给Structured Streaming应用程序,Structured Streaming应用程序再对接收到的数据进行实时处理,从而完成一个典型的流计算过程。这里仅以Kafka为例进行介绍。这里使用的软件版本是:kafka_2.12-2.6.0,Spark3.2.0(Scala版本是2.12.15)。
1.Kafka简介
Kafka是一种高吞吐量的分布式发布订阅消息系统,为了更好地理解和使用Kafka,这里介绍一下Kafka的相关概念:
(1)Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
(2)Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic,即可生产或消费数据,而不必关心数据存于何处。
(3)Partition:是物理上的概念,每个Topic包含一个或多个Partition。
(4)Producer:负责发布消息到Kafka Broker。
(5)Consumer:消息消费者,向Kafka Broker读取消息的客户端。
(6)Consumer Group:每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定Group Name,若不指定Group Name,则属于默认的Group。
2. 安装Kafka
访问Kafka官网下载页面(https://kafka.apache.org/downloads),下载Kafka稳定版本kafka_2.12-2.6.0.tgz。为了让Spark Streaming应用程序能够顺利使用Kafka数据源,在下载Kafka安装文件的时候要注意,Kafka版本号一定要和自己电脑上已经安装的Scala版本号一致才可以。本教材安装的Spark版本号是3.2.0,Scala版本号是2.12,所以,一定要选择Kafka版本号是2.12开头的。例如,到Kafka官网中,可以下载安装文件kafka_2.12-2.6..0.tgz,前面的2.12就是支持的Scala版本号,后面的2.6.0是Kafka自身的版本号。假设下载后的文件被放在“~/Downloads”目录下。执行如下命令完成Kafka的安装:
cd ~/Downloads
sudo tar -zxf kafka_2.12-2.6.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.6.0 kafka
sudo chown -R hadoop ./kafka
3.准备相关jar包
访问MVNREPOSITORY官网(https://mvnrepository.com),下载spark-sql-kafka-0-10_2.12-3.2.0.jar、kafka-clients-2.6.0.jar、commons-pool2-2.9.0.jar和spark-token-provider-kafka-0-10_2.12-3.2.0.jar文件,将其放到“/usr/local/spark/jars”目录下。
spark-sql-kafka-0-10_2.12-3.2.0.jar文件下载页面:https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.2.0
kafka-clients-2.6.0.jar文件下载页面:https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.6.0
commons-pool2-2.9.0.jar文件下载页面:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2/2.9.0
spark-token-provider-kafka-0-10_2.12-3.2.0.jar文件下载页面:https://mvnrepository.com/artifact/org.apache.spark/spark-token-provider-kafka-0-10_2.12/3.2.0
进入下载页面以后,如下图所示,点击红色箭头指向的“jar”,就可以下载JAR包了。
4. 启动Kafka
首先需要启动Kafka。在Linux系统中新建一个终端(记作“Zookeeper终端”),输入下面命令启动Zookeeper服务:
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。另外打开第二个终端(记作“Kafka终端”),然后输入下面命令启动Kafka服务:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了。
再新开一个终端,创建名为wordcount-topic 的主题:
cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 \
> --topic wordcount-topic
创建名为wordcount-result-topic 的主题:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 \
> --topic wordcount-result-topic
再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
cd /usr/local/kafka
bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 --topic wordcount-topic
再新开一个终端(记作“监控输出终端”),执行如下命令监控输出的结果文本:
cd /usr/local/kafka
bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 --topic wordcount-result-topic
5.编写生产者(Producer)程序
在终端中执行以下命令创建代码目录和代码文件:
cd /usr/local/spark/mycode/structuredstreaming
mkdir kafka
cd kafka
mkdir src/main/scala
cd src/main/scala
vim KafkaDataProducer.scala
在KafkaDataProducer.scala中输入以下代码:
package org.apache.spark.example.kafka
import java.util.{Properties, Random}
import org.apache.kafka.clients.producer._
import java.io._
object KafkaDataProducer{
def main(args:Array[String]){
val prop = new Properties
prop.put("bootstrap.servers","localhost:9092")
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[Nothing, String](prop)
while(true) {
//生成字符串
var lowercase:Array[Char] = new Array[Char](0)
for( i <- 'a' to 'z') {
lowercase = lowercase ++ Array(i)
}
var str = util.Random.shuffle(lowercase.toList).take(2)
val word = new StringBuffer
for(j <- str){
word.append(j)
}
//word.toString是字符串
var message = new ProducerRecord("wordcount-topic",word.toString)
producer.send(message)
Thread.sleep(100)
}
}
}
在上面的代码里,使用util.Random.shuffle随机选择小写字母列表中的两个字母,并通过StringBuffer的append操作把两个小写字母进行连接,再toString得到一个包含两个字母的单词。
6.编写消费者(Consumer)程序
在终端中执行以下命令创建KafkaDataConsumer.scala代码文件:
cd /usr/local/spark/mycode/structuredstreaming/kafka
cd src/main/scala
vim KafkaDataConsumer.scala
在KafkaDataConsumer.scala中输入以下代码:
package org.apache.spark.example.kafka
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object KafkaDataConsumer{
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "wordcount-topic")
.load()
.selectExpr("CAST(value AS STRING)")
import spark.implicits._
val wordCounts = lines.groupBy("value").count()
val query = wordCounts
.selectExpr("CAST(value AS STRING) as key", "CONCAT(CAST(value AS STRING), ':', CAST(count AS STRING)) as value")
.writeStream
.outputMode("complete")
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "wordcount-result-topic")
.option("checkpointLocation", "file:///tmp/kafka-sink-cp")
.trigger(Trigger.ProcessingTime("8 seconds"))
.start()
query.awaitTermination()
}
}
KafkaDataConsumer.scala代码中存在2个selectExpr。第1个selectExpr将Kafka的主题wordcount-topic内的value转换成字符串,然后对其在一个触发周期(8秒)进行groupBy操作,并用count()操作得到统计结果。为了观察输出,将结果value写到主题wordcount-result-topic的key内,并通过CONCAT函数将value、冒号和计数拼接后写入Kafka的value内。最后,在“监控输出终端”内输出Kafka的主题wordcount-result-topic的value值。
7.编译、打包、运行程序
在终端中执行以下命令新建一个simple.sbt文件:
cd /usr/local/spark/mycode/structuredstreaming/kafka
vim simple.sbt
在simple.sbt文件中输入以下代码:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.0"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.2.0" % Test
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"
然后,执行如下命令对程序进行编译打包:
cd /usr/local/spark/mycode/structuredstreaming/kafka
/usr/local/sbt/sbt package
打包成功后,新建一个终端(记作“生产者终端”),在终端中执行如下命令运行生产者程序:
cd /usr/local/spark/mycode/structuredstreaming/kafka
/usr/local/spark/bin/spark-submit \
> --jars "/usr/local/kafka/libs/*"
> --class "org.apache.spark.example.kafka.KafkaDataProducer" \
> ./target/scala-2.12/simple-project_2.12-1.0.jar
此时在“监控输入终端”就可以看到生产者程序持续产生的由两个单词组成的单词。
再新建一个终端(记作“流计算终端”),在终端中执行如下命令运行消费者程序:
cd /usr/local/spark/mycode/structuredstreaming/kafka
/usr/local/spark/bin/spark-submit \
> --jars "/usr/local/kafka/libs/*:/usr/local/spark/jars/*"
> --class "org.apache.spark.example.kafka.KafkaDataConsumer" \
> ./target/scala-2.12/simple-project_2.12-1.0.jar
消费者程序运行起来以后,可以在“监控输出终端”看到类似如下的输出结果:
sq:3
bl:6
lo:8
…