Kafka和Spark Streaming的组合使用(Spark 3.2.0)

大数据学习路线图

本文节选自林子雨编著《Spark编程基础(Scala版)》(教材官网:http://dblab.xmu.edu.cn/post/spark/

作者:厦门大学计算机科学与技术系 林子雨 博士/副教授
E-mail: ziyulin@xmu.edu.cn

Spark Streaming是用来进行流计算的组件,可以把Kafka(或Flume)作为数据源,让Kafka(或Flume)产生数据发送给Spark Streaming应用程序,Spark Streaming应用程序再对接收到的数据进行实时处理,从而完成一个典型的流计算过程。这里仅以Kafka为例进行介绍。这里使用的软件版本是:kafka_2.12-2.6.0,Spark3.2.0(Scala版本是2.12.15)。

1.Kafka简介

Kafka是一种高吞吐量的分布式发布订阅消息系统,为了更好地理解和使用Kafka,这里介绍一下Kafka的相关概念:
(1)Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
(2)Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic,即可生产或消费数据,而不必关心数据存于何处。
(3)Partition:是物理上的概念,每个Topic包含一个或多个Partition。
(4)Producer:负责发布消息到Kafka Broker。
(5)Consumer:消息消费者,向Kafka Broker读取消息的客户端。
(6)Consumer Group:每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定Group Name,若不指定Group Name,则属于默认的Group。

2.Kafka准备工作

2.1 安装Kafka

访问Kafka官网下载页面(https://kafka.apache.org/downloads),下载Kafka稳定版本kafka_2.12-2.6.0.tgz。为了让Spark Streaming应用程序能够顺利使用Kafka数据源,在下载Kafka安装文件的时候要注意,Kafka版本号一定要和自己电脑上已经安装的Scala版本号一致才可以。本教材安装的Spark版本号是3.2.0,Scala版本号是2.12,所以,一定要选择Kafka版本号是2.12开头的。例如,到Kafka官网中,可以下载安装文件kafka_2.12-2.6..0.tgz,前面的2.12就是支持的Scala版本号,后面的2.6.0是Kafka自身的版本号。假设下载后的文件被放在“~/Downloads”目录下。执行如下命令完成Kafka的安装:

cd ~/Downloads
sudo tar -zxf  kafka_2.12-2.6.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.6.0 kafka
sudo chown -R hadoop ./kafka

2.2 启动Kafka

首先需要启动Kafka。请登录Linux系统(本教材统一使用hadoop用户登录),打开一个终端,输入下面命令启动Zookeeper服务:

cd  /usr/local/kafka
./bin/zookeeper-server-start.sh  config/zookeeper.properties

注意,执行上面命令以后,终端窗口会返回一堆信息,然后就停住不动了,没有回到Shell命令提示符状态,这时,不要误以为死机了,而是Zookeeper服务器已经启动,正在处于服务状态。所以,不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。
请另外打开第二个终端,然后输入下面命令启动Kafka服务:

cd  /usr/local/kafka
./bin/kafka-server-start.sh  config/server.properties

同样,执行上面命令以后,终端窗口会返回一堆信息,然后就会停住不动,没有回到Shell命令提示符状态,这时,同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。所以,不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了。
当然,还有一种方式是采用下面加了“&”的命令:

cd  /usr/local/kafka
bin/kafka-server-start.sh  config/server.properties  &

这样,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。不过,采用这种方式时,有时候我们往往就忘记了还有Kafka在后台运行,所以,建议暂时不要用这种命令形式。

2.3 创建Topic

再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:

cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 \
> --topic wordsender

然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:

./bin/kafka-topics.sh --list --zookeeper localhost:2181

再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:

cd /usr/local/kafka
bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 --topic wordsender

到这里,与Kafka相关的准备工作就顺利结束了。注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

3.Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.2.0.jar和spark-token-provider-kafka-0-10_2.12-3.2.0.jar文件,其中,2.12表示Scala的版本号,3.2.0表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark/jars”目录)。此外,还需要把“/usr/local/kafka/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。
spark-streaming-kafka-0-10_2.12-3.2.0.jar的下载页面:https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.12/3.2.0
spark-token-provider-kafka-0-10_2.12-3.2.0.jar的下载页面:https://mvnrepository.com/artifact/org.apache.spark/spark-token-provider-kafka-0-10_2.12/3.2.0
进入下载页面以后,如下图所示,点击红色箭头指向的“jar”,就可以下载JAR包了。

4.编写Spark Streaming程序使用Kafka数据源

4.1 编写生产者(Producer)程序

请新打开一个终端,然后,执行如下命令创建代码目录和代码文件:

cd  /usr/local/spark/mycode
mkdir  kafka
cd  kafka
mkdir  -p  src/main/scala
cd  src/main/scala
vim  KafkaWordProducer.scala

这里使用vim编辑器新建了KafkaWordProducer.scala,它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
   // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString)
          .mkString(" ")
                    print(str)
                    println()
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
     Thread.sleep(1000)
    }
  }
}

4.2 编写消费者(Consumer)程序

在“/usr/local/spark/mycode/kafka/src/main/scala”目录下创建代码文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafkaWordCount{
  def main(args:Array[String]){
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc,Seconds(10))
    ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topics = Array("wordsender")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    stream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
      val lines = maped.map(_._2)
      val words = lines.flatMap(_.split(" "))
      val pair = words.map(x => (x,1))
      val wordCounts = pair.reduceByKey(_+_)
      wordCounts.foreach(println)
    })
    ssc.start
    ssc.awaitTermination
  }
}

在KafkaWordCount.scala代码中,ssc.checkpoint()用于创建检查点,实现容错功能。在Spark Streaming中,如果是文件流类型的数据源,Spark自身的容错机制可以保证数据不会发生丢失。但是,对于Flume和Kafka等数据源,当数据源源不断到达时,会首先被放入到缓存中,尚未被处理,可能会发生丢失。为了避免系统失败时发生数据丢失,可以通过ssc.checkpoint()创建检查点。但是,需要注意的是,检查点之后的数据仍然可能发生丢失,如果要保证数据不发生丢失,可以开启Spark Streaming的预写式日志(WAL:Write Ahead Logs)功能,当采用预写式日志以后,接收数据的正确性只在数据被预写到日志以后Receiver才会确认,这样,当系统发生失败导致缓存中的数据丢失时,就可以从日志中恢复丢失的数据。预写式日志需要额外的开销,因此,在默认情况下,Spark Streaming的预写式日志功能是关闭的,如果要开启该功能,需要设置SparkConf的属性"spark.streaming.receiver.writeAheadLog.enable"为"true"。ssc.checkpoint()在创建检查点的同时,系统也把检查点的文件写入路径“file:///usr/local/spark/mycode/kafka/checkpoint”作为预写式日志的存放路径。

4.3 编译打包程序

经过前面的步骤,现在在“/usr/local/spark/mycode/kafka/src/main/scala”目录下,就有了如下3个代码文件:
KafkaWordProducer.scala
KafkaWordCount.scala
StreamingExamples.scala
然后,请执行下面命令新建一个simple.sbt文件:

cd  /usr/local/spark/mycode/kafka/
vim  simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.2.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.2.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"

然后执行下面命令,进行编译打包:

cd  /usr/local/spark/mycode/kafka/
/usr/local/sbt/sbt  package

打包成功后,就可以执行程序测试效果了。

4.5 运行程序

首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:

cd  /usr/local/hadoop
./sbin/start-dfs.sh

启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

cd  /usr/local/spark/mycode/kafka/
/usr/local/spark/bin/spark-submit  \
> --class "KafkaWordProducer"   \
> ./target/scala-2.12/simple-project_2.12-1.0.jar  \
> localhost:9092  wordsender  3  5

注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:
3 3 6 3 4
9 4 0 8 1
0 3 3 9 3
0 8 4 0 9
8 7 2 9 5
……

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd  /usr/local/spark/mycode/kafka/
/usr/local/spark/bin/spark-submit  \
> --class "KafkaWordCount"  \
> ./target/scala-2.12/simple-project_2.12-1.0.jar

运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:
(4,134)
(8,117)
(7,144)
(5,128)
(6,137)
(0,158)
(2,128)
(9,134)
(3,139)
(1,131)
……
这些信息说明,Spark Streaming程序顺利接收到了Kafka发来的单词信息,并进行词频统计得到结果。