Spark入门:Apache Kafka作为DStream数据源

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]

Kafka是非常流行的日志采集系统,可以作为DStream的高级数据源。

Kafka的安装和准备工作

关于Kafka的概念和安装方法,请参考厦门大学数据库实验室博客文章《Kafka的安装和简单实例测试》。在安装的时候,要注意,到Kafka官网下载安装文件时,一定要选择和自己电脑上已经安装的scala版本号一致才可以,本教程安装的Spark版本号是1.6.2,scala版本号是2.10,所以,一定要选择Kafka版本号是2.10开头的。比如,到Kafka官网中,可以下载安装文件Kafka_2.10-0.10.1.0,前面的2.10就是支持的scala版本号,后面的0.10.1.0是Kafka自身的版本号。
这里假设你已经根据这篇博客文章安装成功了Kafka。
下面,我们启动Kafka。
请登录Linux系统(本教程统一使用hadoop用户登录),打开一个终端,输入下面命令启动Zookeeper服务:

cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties

注意,执行上面命令以后,终端窗口会返回一堆信息,然后就停住不动了,没有回到shell命令提示符状态,这时,千万不要错误认为死机了,而是Zookeeper服务器启动了,正在处于服务状态。所以,千万不要关闭这个终端窗口,一旦关闭,zookeeper服务就停止了,所以,不能关闭这个终端窗口。
请另外打开第二个终端,然后输入下面命令启动Kafka服务:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

同样,执行上面命令以后,终端窗口会返回一堆信息,然后就停住不动了,没有回到shell命令提示符状态,这时,千万不要错误认为死机了,而是Kafka服务器启动了,正在处于服务状态。所以,千万不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了,所以,不能关闭这个终端窗口。

当然了,还有一种方式是,采用下面加了“&”的命令:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties &

这样,Kafka就会在后台运行,即使你关闭了这个终端,Kafka也会一直在后台运行。不过,这样做,有时候我们往往就忘记了还有Kafa在后台运行,所以,建议暂时不要用&。

下面先测试一下Kafka是否可以正常使用。再另外打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的topic(关于什么是topic,请参考厦门大学数据库实验室博客文章《Kafka的安装和简单实例测试》):

cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest
//这个topic叫wordsendertest,2181是zookeeper默认的端口号,partition是topic里面的分区数,replication-factor是备份的数量,在kafka集群中使用,这里单机版就不用备份了
//可以用list列出所有创建的topics,来查看上面创建的topic是否存在
./bin/kafka-topics.sh --list --zookeeper localhost:2181

这个名称为“wordsendertest”的topic,就是专门负责采集发送一些单词的。
下面,我们需要用producer来产生一些数据,请在当前终端内继续输入下面命令:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsendertest

上面命令执行后,你就可以在当前终端内用键盘输入一些英文单词,比如我们可以输入:

hello hadoop
hello spark

这些单词就是数据源,这些单词会被Kafka捕捉到以后发送给消费者。我们现在可以启动一个消费者,来查看刚才producer产生的数据。请另外打开第四个终端,输入下面命令:

cd /usr/local/kafka
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wordsendertest --from-beginning

可以看到,屏幕上会显示出如下结果,也就是刚才你在另外一个终端里面输入的内容:

hello hadoop
hello spark

到这里,与Kafka相关的准备工作就顺利结束了。注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)。按照我们前面安装好的Spark版本,这些jar包都不在里面,为了证明这一点,我们现在可以测试一下。请打开一个新的终端,然后启动spark-shell:

cd /usr/local/spark
./bin/spark-shell

启动成功后,在spark-shell中执行下面import语句:

scala> import org.apache.spark.streaming.kafka._
<console>:25: error: object kafka is not a member of package org.apache.spark.streaming
         import org.apache.spark.streaming.kafka._
                                           ^

你可以看到,马上会报错,因为找不到相关的jar包。所以,现在我们就需要下载spark-streaming-kafka_2.10.jar。
现在请在Linux系统中,打开一个火狐浏览器,请点击这里访问Spark官网,里面有提供spark-streaming-kafka_2.10.jar文件的下载,其中,2.10表示scala的版本。这个下载页面会列出spark-streaming-kafka_2.10.jar的很多版本,我们这里选择1.6.2版本(因为本教程安装的Spark版本是1.6.2),你可以点击1.6.2版本的按钮,进入spark-streaming-kafka_2.10-1.6.2.jar的下载页面,点击下载。下载后的文件会被默认保存在当前Linux登录用户的下载目录下,本教程统一使用hadoop用户名登录Linux系统,所以,文件下载后会被保存到“/home/hadoop/下载”目录下面。现在,我们就把这个文件复制到Spark目录的lib目录下。请新打开一个终端,输入下面命令:

cd /usr/local/spark/lib
mkdir kafka
cd ~
cd 下载
cp ./spark-streaming-kafka_2.10-1.6.2.jar /usr/local/spark/lib/kafka

这样,我们就把spark-streaming-kafka_2.10-1.6.2.jar文件拷贝到了“/usr/local/spark/lib/kafka”目录下。
下面开始的过程,让笔者消耗了2个白天和2个黑夜,测试了网络上各种解决方案,反复失败,最终,只能自己进行“蛮力测试”,下载各种版本进行尝试,最终,得以解决,解决的秘诀是,第一个秘诀是,凡是遇到“java.lang.NoSuchMethodError”这种错误,一定是由版本不一致导致的,也就是spark、scala、spark streaming、Kafka这几个软件的版本不一致,所以,必须保证它们版本的一致性。比如,笔者电脑安装了Spark1.6.2版本,它包含的scala版本是2.10.5,那么,与之对应的straming的版本也必须是spark-streaming_2.10-1.6.2.jar,对应的Kafka也必须是spark-streaming-kafka_2.10-1.6.2.jar。第二个秘诀是,凡是遇到“Class Not Found”这种错误,一般是由缺少jar包引起的。到底缺少什么jar包,笔者在网络上找不到解决方案,也只能靠自己“蒙”,所以,下面笔者测试通过的方法,只是自己反复测试两天后“蒙”对了,但是,也说不出是什么道理。请你按照下面操作即可。

还需要在Linux系统中,打开火狐浏览器,到网络上下载另外一个文件spark-streaming_2.10-1.6.2.jar(下载地址),其中,2.10是scala版本号,1.6.2是Spark版本号。
spark-streaming_2.10-1.6.1.jar下载成功以后是被放到当前Linux登录用户的下载目录下,本教程统一使用hadoop用户名登录Linux系统,所以,文件下载后会被保存到“/home/hadoop/下载”目录下面。现在,我们就把这个文件复制到Spark目录的lib目录下。请新打开一个终端,输入下面命令:

cd /usr/local/spark/lib
cd ~
cd 下载
cp ./spark-streaming_2.10-1.6.1.jar /usr/local/spark/lib/kafka

还没完,下面还要继续把Kafka安装目录的lib目录下的所有jar文件复制到“/usr/local/spark/lib/kafka”目录下,请在终端中执行下面命令:

cd /usr/local/kafka/libs
ls
cp ./* /usr/local/spark/lib/kafka

为什么要拷贝所有的jar过来,因为笔者遇到错误以后,知道是因为缺少jar包,但是,实在不知道是缺少什么jar包,所以,就全部拷贝过来了。但是,全部拷贝过来以后,有些jar包会和spark中已经有的jar包发生冲突,程序一运行就会出现一堆的错误,根据错误提示信息,笔者大概猜测是因为冲突引起的,所以,就删除一些可能引起冲突的jar包,删除一些后,再测试程序,再出错,再删除一批,最后测试通过了,就发现,需要把下面这些jar包删除掉,这样才不会出现错误:

cd /usr/local/kafka/libs
ls
rm log4j*
rm jackson*

上面这些方法纯粹是靠蛮力反复测试才成功的,没有找到问题的本质,也没有网络资料可以帮笔者解决。但是,起码通过这种方式,程序可以顺利运行了。

现在,我们需要配置spark-env.sh文件,让spark能够在启动的时候找到spark-streaming-kafka_2.10-1.6.2.jar等5个jar文件。命令如下:

cd /usr/local/spark/conf
vim spark-env.sh

使用vim编辑器打开了spark-env.sh文件,因为这个文件之前已经反复修改过,目前里面的前面几行的内容应该是这样的:

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/spark/lib/hbase/*
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

我们只要简单修改一下,把“/usr/local/spark/lib/kafka/*“增加进去,修改后的内容如下:

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/spark/lib/hbase/*:/usr/local/spark/lib/kafka/*
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

保存该文件后,退出vim编辑器。然后,就可以启动spark-shell:

cd /usr/local/spark
./bin/spark-shell

启动成功后,再次执行命令:

scala> import org.apache.spark.streaming.kafka._
//会显示下面信息
import org.apache.spark.streaming.kafka._

说明导入成功了。这样,我们就已经准备好了Spark环境,它可以支持kafka相关编程了。

编写Spark程序使用Kafka数据源

下面,我们就可以进行程序编写了。请新打开一个终端,然后,执行命令创建代码目录:

cd /usr/local/spark/mycode
mkdir kafka
cd kafka
mkdir -p src/main/scala
cd src/main/scala
vim KafkaWordProducer.scala

使用vim编辑器新建了KafkaWordProducer.scala,它是产生一系列字符串的程序,会产生随机的整数序列,每个整数被当做一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object KafkaWordProducer {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
     Thread.sleep(1000)
    }
  }
}

保存后退出vim编辑器。然后,继续在当前目录下创建KafkaWordCount.scala代码文件:

vim KafkaWordCount.scala

KafkaWordCount.scala是用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

object KafkaWordCount{
def main(args:Array[String]){
StreamingExamples.setStreamingLogLevels()
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sc,Seconds(10))
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动hadoop
val zkQuorum = "localhost:2181" //Zookeeper服务器地址
val group = "1"  //topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group" 
val topics = "wordsender"  //topics的名称          
val numThreads = 1  //每个topic的分区数
val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) //这行代码的含义在下一节的窗口转换操作中会有介绍
wordCounts.print
ssc.start
ssc.awaitTermination
}
}        

保存后退出vim编辑器。然后,继续在当前目录下创建StreamingExamples.scala代码文件:

vim StreamingExamples.scala

下面是StreamingExamples.scala的代码,用于设置log4j:

import org.apache.spark.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

这样,我们在“/usr/local/spark/mycode/kafka/src/main/scala”目录下,就有了如下三个代码文件:

KafkaWordProducer.scala
KafkaWordCount.scala
StreamingExamples.scala

然后,请执行下面命令:

cd /usr/local/spark/mycode/kafka/
vim simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2"  

保存文件退出vim编辑器。然后执行下面命令,进行打包编译:

cd /usr/local/spark/mycode/kafka/
/usr/local/sbt/sbt package

打包成功后,就可以执行程序测试效果了。
首先,请启动hadoop(有可能你前面采用了ssc.checkpoint("/user/hadoop/checkpoint")这种形式,写入HDFS):

cd /usr/local/hadoop
./sbin/start-all.sh

启动hadoop成功以后,就可以测试我们刚才生成的词频统计程序了。
要注意,我们之前已经启动了zookeeper服务,启动了kafka服务,因为我们之前那些终端窗口都没有关闭,所以,这些服务都在运行。如果你不小心关闭了之前的终端窗口,那就请回到本文前面,再启动zookeeper服务,启动kafka服务。

首先,请新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

cd /usr/local/spark
/usr/local/spark/bin/spark-submit --class "KafkaWordProducer" /usr/local/spark/mycode/kafka/target/scala-2.10/simple-project_2.10-1.0.jar localhost:9092 wordsender 3 5

注意,上面命令中,"localhost:9092 wordsender 3 5"是提供给KafkaWordProducer程序的4个输入参数,第1个参数localhost:9092是Kafka的broker的地址,第2个参数wordsender是topic的名称,我们在KafkaWordCount.scala代码中已经把topic名称写死掉,所以,KafkaWordCount程序只能接收名称为"wordsender"的topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示,每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现新的单词,如下:

3 3 6 3 4
9 4 0 8 1
0 3 3 9 3
0 8 4 0 9
8 7 2 9 5
2 6 4 8 5
0 9 6 0 9
4 0 0 8 1
1 8 3 7 4
4 0 6 5 7
3 9 1 5 0
9 3 9 6 7
1 8 7 4 3
9 5 6 2 6
4 8 8 6 8
0 0 3 3 7

这个终端窗口就放在这里,不要关闭,千万不要关闭,就让它一直不断发送单词。
然后,请新打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd /usr/local/spark
/usr/local/spark/bin/spark-submit --class "KafkaWordCount" /usr/local/spark/mycode/kafka/target/scala-2.10/simple-project_2.10-1.0.jar

运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下信息:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark/lib/kafka/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
-------------------------------------------
Time: 1479789000000 ms
-------------------------------------------
(4,16)
(8,14)
(6,15)
(0,10)
(2,9)
(7,17)
(5,14)
(9,9)
(3,8)
(1,8)

恭喜你,顺利完成了Spark Streaming和Kafka的集成。

子雨大数据之Spark入门
扫一扫访问本博客