Flume_Kafka_SparkStreaming实现词频统计

大数据学习路线图

任务描述

配置Kafka和Flume,把Flume Source类别设置为netcat,绑定到localhost的33333端口,通过“telnet localhost 33333”命令向Flume Source发送消息,然后,让Flume把消息发送给Kafka,并让Kafa发送消息到Spark Streaming,Spark Streaming组件收到各种单词消息后,对单词进行词频统计,在屏幕上打印出每个单词出现了几次。

系统环境描述(有些没有使用到,但这里还是全部描述一下):
Ubuntu16.04
Java8(jdk1.8)
Hadoop2.7.1
Spark2.1.0
Scala2.11.8
Sbt0.13.11
Maven3.3.9
Kafka2.11-0.10.1.0
Flume1.7.0

一、Kafka的安装和准备

Kafka是非常流行的日志采集系统,可以作为DStream的高级数据源,也可以作为Flume的Sink端接收来自Flume的数据。本实验同时用到这两种功能。
关于Kafka的概念和安装方法,请参考厦门大学数据库实验室博客文章《Kafka的安装和简单实例测试》。在安装的时候,要注意,到Kafka官网下载安装文件时,一定要选择和自己电脑上已经安装的scala版本号一致才可以,我自己安装的Spark版本号是2.1.0,scala版本号是2.11,所以,一定要选择Kafka版本号是2.11开头的。这里下载安装文件Kafka_2.11-0.10.1.0.tgz,前面的2.11就是支持的scala版本号,后面的0.10.1.0是Kafka自身的版本号。我们安装稳定的0.10.1.0版本,假设你已经根据上面这篇博客文章安装成功了Kafka,并安装在“/usr/local/kafka”目录下。
关于Kafka,一定要清楚Topic、Producer、Consumer等相关概念,具体请查看厦门大学数据库实验室博客文章《Kafka的安装和简单实例测试》。
接下来,我们启动并测试一下Kafka是否安装成功。
首先,使用hadoop用户登录Linux系统,Ctrl+Alt+t打开一个终端,输入以下命令启动Zookeeper服务:

cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties

此时窗口会返回很长的信息,不会回到shelle命令提示符状态,不要认为卡住或死机了,而是Zookeeper服务器启动了,正在处于服务状态。所以,千万不要关闭这个终端窗口,一旦关闭,zookeeper服务就停止了。
打开第二个终端,然后输入以下命令启动Kafka服务:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

同样地,这里也不会返回shell命令提示符状态,这里是Kafka服务启动了,正在服务。这个窗口也不能关闭,不然Kafka服务就终止了。可以采用加了“&”的命令:

bin/kafka-server-start.sh config/server.properties &

这样,Kafka就会在后台运行,即使你关闭了这个终端,Kafka也会一直在后台运行。但是有时候我们往往就忘记了还有Kafka在后台运行,所以,建议暂时不要用&。
打开第三个终端,输入以下命令创建名为“test”的Topic,2181是zookeeper默认的端口号,输入命令:

cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

输入以下命令查看创建的topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181  

可以在结果中查看到dblab这个topic存在。
然后启动一个producer来产生数据,在当前终端(第三个终端输入)输入命令:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

上面命令执行后,你就可以在当前终端内用键盘输入一些英文单词,并按回车发送

hello flume kafka spark
hello dblab

这些单词就是数据源,会被发送给consumer。
开启第四个终端,输入以下命令创建consumer,这里端口2181就是之前创建topic时指定的,输入命令:

cd /usr/local/kafka
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

可以看到窗口中输出了刚才你在producer端输入的内容。

至此,Kafka的准备工作完成,并且检测了Kafka可以正确使用。按CTRL+C退出,可以只关闭producer终端和consumer终端,保留启动Zookeeper和Kafka服务的终端等待之后使用,也可以全部关闭,之后再开启服务即可,这里我选择了都关闭,让屏幕不那么乱。

二、Flume的安装和准备

Flume是非常流行的日志采集系统,可以发送消息给Kafka,这里我们介绍如何配置将消息发送给Kafka。
首先下载安装Flume。关于Flume的概念和安装方法,请参考厦门大学数据库实验室博客文章《日志采集工具Flume的安装与使用方法》,必须要理解什么是Flume Source、Flume Sink和Flume Agent等。本次实验从官网下载的安装文件是apache-flume-1.7.0-bin.tar.gz。这里假设读者已经按照博客文章成功安装了Flume,并且安装在“/usr/local/flume”目录下。这里注意如果安装了Hbase,在查看Flume版本信息的时候会出现错误“找不到或无法加载主类”,具体解决办法请查看上面的博客文章。
本次实验要使用netcat source发送消息,在实验之前,要先搞懂Flume是如何配置source、sink和channels的。之后测试Flume是否可用,这里假设笔者测试Flume可正确使用。

三、Spark准备工作

要通过Kafka连接Spark来进行Spark Streaming操作,Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)。也就是说Spark需要jar包让Kafka和Spark streaming相连。按照我们前面安装好的Spark版本,这些jar包都不在里面,为了证明这一点,我们现在可以测试一下。请打开一个新的终端,输入以下命令启动spark-shell:

cd /usr/local/spark
./bin/spark-shell

启动成功后,在spark-shell中执行下面import语句:

import org.apache.spark.streaming.kafka._

你可以看到,马上会报错如下,因为找不到相关的jar包。

<console>:23: error: object kafka is not a member of package org.apache.spark.streaming
       import org.apache.spark.streaming.kafka._
                                         ^

根据Spark官网的说明,对于Spark2.1.0版本,如果要使用Kafka,则需要下载spark-streaming-kafka-0-8_2.11相关jar包。
现在请在Linux系统中,打开火狐浏览器,请点击这里访问官网,里面有提供spark-streaming-kafka-0-8_2.11-2.1.0jar文件的下载,其中,2.11表示scala的版本,2.1.0表示Spark版本号。下载后的文件会被默认保存在当前Linux登录用户hadoop的下载目录下(“/home/hadoop/下载”)。
我们就把这个文件复制到Spark目录的jars目录下。请新打开一个终端,输入下面命令:

cd /usr/local/spark/jars
mkdir kafka
cd ~
cd 下载
cp ./spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars/kafka

这样,我们就把spark-streaming-kafka-0-8_2.11-2.1.1.jar文件拷贝到了“/usr/local/spark/jars/kafka”目录下。
下面还要继续把Kafka安装目录的libs目录下的所有jar文件复制到“/usr/local/spark/jars/kafka”目录下输入以下命令:

cd /usr/local/kafka/libs
ls
cp ./* /usr/local/spark/jars/kafka

至此,所有环境准备工作已全部完成,下面开始编写代码。

四、实验过程

1.编写Flume配置文件flume_to_kafka.conf
输入命令:

cd /usr/local/flume
cd conf
vim flume_to_kafka.conf

内容如下:

a1.sources=r1
a1.channels=c1
a1.sinks=k1
#Describe/configure the source 
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=33333
#Describe the sink
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink  
a1.sinks.k1.kafka.topic=test  
a1.sinks.k1.kafka.bootstrap.servers=localhost:9092  
a1.sinks.k1.kafka.producer.acks=1  
a1.sinks.k1.flumeBatchSize=20  
#Use a channel which buffers events in memory  
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=1000000
#Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2.编写Spark Streaming程序(进行词频统计的程序)
首先创建scala代码的目录结构。
输入命令:

cd /usr/local/spark/mycode
mkdir flume_to_kafka
cd flume_to_kafka
mkdir -p src/main/scala
cd src/main/scala

输入命令:

vim KafkaWordCounter.scala

KafkaWordCounter.scala是用于单词词频统计,它会把从kafka发送过来的单词进行词频统计,代码内容如下:

package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

object KafkaWordCounter{
def main(args:Array[String]){
StreamingExamples.setStreamingLogLevels()
val sc=new SparkConf().setAppName("KafkaWordCounter").setMaster("local[2]")
val ssc=new StreamingContext(sc,Seconds(10))
ssc.checkpoint("file:///usr/local/spark/mycode/flume_to_kafka/checkpoint") //设置检查点
val zkQuorum="localhost:2181" //Zookeeper服务器地址
val group="1"  //topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group" 
val topics="test" //topics的名称          
val numThreads=1 //每个topic的分区数
val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap=KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
val lines=lineMap.map(_._2)
val words=lines.flatMap(_.split(" "))
val pair=words.map(x => (x,1))
val wordCounts=pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) 
wordCounts.print
ssc.start
ssc.awaitTermination
}
}

reduceByKeyAndWindow函数作用解释如下:
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入);
此代码中就是一个窗口转换操作reduceByKeyAndWindow,其中,Minutes(2)是滑动窗口长度,Seconds(10)是滑动窗口时间间隔(每隔多长时间滑动一次窗口)。reduceByKeyAndWindow中就使用了加法和减法这两个reduce函数,加法和减法这两种reduce函数都是“可逆的reduce函数”,也就是说,当滑动窗口到达一个新的位置时,原来之前被窗口框住的部分数据离开了窗口,又有新的数据被窗口框住,但是,这时计算窗口内单词的词频时,不需要对当前窗口内的所有单词全部重新执行统计,而是只要把窗口内新增进来的元素,增量加入到统计结果中,把离开窗口的元素从统计结果中减去,这样,就大大提高了统计的效率。尤其对于窗口长度较大时,这种“逆函数”带来的效率的提高是很明显的。
3.创建StreamingExamples.scala
继续在当前目录(/usr/local/spark/mycode/flume_to_kafka/src/main/scala)下创建StreamingExamples.scala代码文件,用于设置log4j:
输入命令:

vim StreamingExamples.scala

代码内容如下:

package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
//Utility functions for Spark Streaming examples. 
object StreamingExamples extends Logging {
//Set reasonable logging levels for streaming if the user has not configured log4j. 
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

4.打包文件simple.sbt
输入命令:

cd /usr/local/spark/mycode/flume_to_kafka

输入命令:

vim simple.sbt

内容如下:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"

要注意版本号一定要设置正确。
在/usr/local/spark/mycode/flume_to_kafka目录下输入命令:

cd /usr/local/spark/mycode/flume_to_kafka
find .

打包之前,这条命令用来查看代码结构,目录结构如下所示:

.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/KafkaWordCounter.scala
./src/main/scala/StreamingExamples.scala

5.打包编译
一定要在/usr/local/spark/mycode/flume_to_kafka目录下运行打包命令
输入命令:

cd /usr/local/spark/mycode/flume_to_kafka
/usr/local/sbt/sbt package

第一次打包的过程可能会很慢,请耐心等待几分钟。打包成功后,会看到如下SUCCESS的提示。

hadoop@dblab-VirtualBox:/usr/local/spark/mycode/flume_to_kafka$ /usr/local/sbt/sbt package
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0
[info] Set current project to Simple Project (in build file:/usr/local/spark/mycode/flume_to_kafka/)
[info] Compiling 2 Scala sources to /usr/local/spark/mycode/flume_to_kafka/target/scala-2.11/classes...
[info] Packaging /usr/local/spark/mycode/flume_to_kafka/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 7 s, completed 2018-9-5 10:03:08

6.启动zookeeper和kafka
先启动zookeeper
输入命令:

cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties

屏幕显示如下,不要关闭这个终端。

打开第二个终端,然后输入下面命令启动Kafka服务:
输入命令:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

屏幕显示如下,不要关闭这个终端

7.运行程序KafkaWordCounter
打开第三个终端,我们已经创建过topic,名为test(这是你之前在flume_to_kafka.conf中设置的topic名字),端口号2181。在第三个终端运行“KafkaWordCounter”程序,进行词频统计,由于现在没有启动输入,所以只有提示信息,没有结果。
输入命令:

cd /usr/local/spark
/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCounter" /usr/local/spark/mycode/flume_to_kafka/target/scala-2.11/simple-project_2.11-1.0.jar

其中”/usr/local/spark/jars/“和”/usr/local/spark/jars/kafka/”用来指明引用的jar包,“org.apache.spark.examples.streaming.KafkaWordCounter”代表包名和类名,这是编写KafkaWordCounter.scala里面的包名和类名,最后一个参数用来说明打包文件的位置。
执行该命令后,屏幕上会显示程序运行的相关信息,并会每隔10秒钟刷新一次信息,用来输出词频统计的结果,此时还只有提示信息,如下所示:

-------------------------------------------
Time: 1536113420000 ms
-------------------------------------------

-------------------------------------------
Time: 1536113430000 ms
-------------------------------------------

-------------------------------------------
Time: 1536113440000 ms
-------------------------------------------

-------------------------------------------
Time: 1536113450000 ms
-------------------------------------------

-------------------------------------------
Time: 1536113460000 ms
-------------------------------------------

按Ctrl+z可以退出程序运行
在启动Flume之前,Zookeeper和Kafka要先启动成功,不然启动Flume会报连不上Kafka的错误。
8.启动flume agent
打开第四个终端,在这个新的终端中启动Flume Agent
输入命令:

cd /usr/local/flume
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume_to_kafka.conf --name a1 -Dflume.root.logger=INFO,console

启动agent以后,该agent就会一直监听localhost的33333端口,这样,我们下面就可以通过“telnet localhost 33333”命令向Flume Source发送消息。这个终端也不要关闭,让它一直处于监听状态。

9.打开第五个终端,发送消息
输入命令:

telnet localhost 33333

这个端口33333是在flume conf文件中设置的source
在这个窗口里面随便敲入若干个字符和若干个回车,这些消息都会被Flume监听到,Flume把消息采集到以后汇集到Sink,然后由Sink发送给Kafka的topic(test)。因为spark Streaming程序不断地在监控topic,在输入终端和前面运行词频统计程序那个终端窗口内看到类似如下的统计结果:
Flume发送之前,KafkaWordCounter检测不到消息,没有统计结果,只有提示信息。
在该窗口输入数据:(每一行结尾加一个空格再按回车发送)

hello dblab 
hello xmu 
hello spark  
大数据 

可以看到统计结果如下,其中“,4)”是对换行符的统计

-------------------------------------------
Time: 1536131370000 ms
-------------------------------------------
(hello,3)
(xmu,1)
,4)
(spark,1)
(大数据,1)
(dblab,1)

输入数据:只输入一个回车
统计结果如下:

-------------------------------------------
Time: 1536131400000 ms
-------------------------------------------
(hello,3)
(xmu,1)
,5)
(spark,1)
(大数据,1)
(dblab,1)

可以看到如上所示的统计结果,其他的都没变,“4)“变成“,5)”,这是换行符的数量加一。
可以看到,统计结果在不断的累积,但是随着运行屏幕显示其实还和窗口设置有关。我试过输完最后一个单词直接敲回车发送,但是接收统计端会把换行符和最后一个单词组合起来形成错误的格式,导致统计结果有误。所以在每行输入完成后先多敲一个空格再按换行符发送数据。
比如输入“hello spark”(spark末尾不加空格)
统计结果如下:

-------------------------------------------
Time: 1536131440000 ms
-------------------------------------------
(hello,4)
(xmu,1)
,1)ark
,5)
(spark,1)
(大数据,1)
(dblab,1)

可以看到"hello"被正确的统计了,数量加一,然而“spark”却和换行符结合变成不能识别的格式“,1)ark”。
所以,这里在发送每一行数据时都要在末尾输入一个空格再按下回车。
运行时应该有5个终端处于开启状态,分别是
Zookeeper服务\Kafka服务\Spark Streaming(KafkaWordCounter)\flume agent\telnet发送端
至此,就完成了Flume、Kafka和Spark Streaming整合进行词频统计的任务。

由于用netcatsource作为Flume的输入源统计时总会加上格式不正确的换行符,这里试验了使用Avro source作为Flume的输入源进行Flume_Kafka_SparkStreaming词频统计。
只需修改flume_to_kafka.conf文件为以下内容,并在启动时有所不同。

cd /usr/local/flume/conf
vim flume_to_kafka2.conf

配置flume的sources为avro

a1.sources=r1
a1.channels=c1
a1.sinks=k1
# Describe/configure the source
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4141
# Describe the sink
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=test
a1.sinks.k1.kafka.bootstrap.servers=localhost:9092
a1.sinks.k1.kafka.producer.acks=1
a1.sinks.k1.flumeBatchSize=20
# Use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=1000000
#Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动时,顺序如下:
1.启动zookeeper

cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties

2.启动kafka

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

3.启动Flume Agent
这里指定我们新编写的配置文件flume_to_kafka2.conf

cd /usr/local/flume
/usr/local/flume/bin/flume-ng agent -c . -f /usr/local/flume/conf/flume_to_kafka2.conf -n a1 -Dflume.root.logger=INFO,console #启动日志控制台

4.启动Spark Streaming应用程序

cd /usr/local/spark
/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCounter" /usr/local/spark/mycode/flume_to_kafka/target/scala-2.11/simple-project_2.11-1.0.jar

此时只显示提示信息

-------------------------------------------
Time: 1536117300000 ms
-------------------------------------------

-------------------------------------------
Time: 1536117310000 ms
-------------------------------------------

-------------------------------------------
Time: 1536117320000 ms
-------------------------------------------

5.创建指定文件
先打开另外一个终端,在/usr/local/flume下创建一个文件log.00

cd /usr/local/flume
sudo vim log.00

文件内容随意,这里输入

hello dblab
hello xmu
hello success
大数据

6.我们再打开另外一个终端,执行以下命令,4141是avro.conf文件里的端口名

cd /usr/local/flume
bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /usr/local/flume/log.00 

SparkStreaming程序统计显示如下,可以看到不存在换行符格式的问题,正确统计结果。

-------------------------------------------
Time: 1536130920000 ms
-------------------------------------------
(hello,3)
(xmu,1)
(大数据,1)
(success,1)
(dblab,1)

至此,使用Avro source作为Flume的输入源实现Flume_Kafka_SparkStreaming词频统计完成。