Spark2.1.0入门:把Flume作为DStream数据源

大数据技术原理与应用

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]

Flume是非常流行的日志采集系统,可以作为DStream的高级数据源。本部分将介绍如何让Flume推送消息给Spark Streaming,Spark Streaming收到消息后进行处理。

任务描述

把Flume Source设置为netcat类型,从终端上不断给Flume Source发送各种消息,Flume把消息汇集到Sink,这里把Sink类型设置为avro,由Sink把消息推送给Spark Streaming,由我们编写的Spark Streaming应用程序对消息进行处理。

Flume的安装和准备工作

下载安装Flume

请登录Linux系统(本教程全部统一采用hadoop用户登录),下载安装Flume。关于Flume的概念和安装方法,请参考厦门大学数据库实验室博客文章《日志采集工具Flume的安装与使用方法》,必须要理解什么是Flume Source、Flume Sink和Flume Agent等。这里我们从官网下载的安装文件是apache-flume-1.7.0-bin.tar.gz(官网下载地址)。这里假设读者已经按照博客文章成功安装了Flume,并且安装在“/usr/local/flume”目录下。

配置Flume数据源

请登录Linux系统,打开一个终端,执行如下命令新建一个Flume配置文件flume-to-spark.conf:

cd /usr/local/flume
cd conf
vim flume-to-spark.conf

在flume-to-spark.conf文件中写入如下内容:

#flume-to-spark.conf: A single-node Flume configuration
        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 33333

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = localhost
        a1.sinks.k1.port =44444

        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000000
        a1.channels.c1.transactionCapacity = 1000000

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

在上面的配置文件中,我们把Flume Source类别设置为netcat,绑定到localhost的33333端口,这样,我们后面就可以通过“telnet localhost 33333”命令向Flume Source发送消息。
同时,我们把Flume Sink类别设置为avro,绑定到localhost的44444端口,这样,Flume Source把采集到的消息汇集到Flume Sink以后,Sink会把消息推送给localhost的44444端口,而我们编写的Spark Streaming程序一直在监听localhost的44444端口,一旦有消息到达,就会被Spark Streaming应用程序取走进行处理。

特别要强调的是,上述配置文件完成以后,暂时“不要”启动Flume Agent,如果这个时候使用“flume-ng agent”命令启动agent,就会出现错误提示“localhost:44444拒绝连接”,也就是Flume Sink要发送消息给localhost的44444端口,但是,无法连接上localhost的44444端口。为什么会出现这个错误呢?因为,这个时候我们还没有启动Spark Streaming应用程序,也就没有启动localhost的44444端口,所以,Sink是无法向这个端口发送消息的。

Spark的准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)。按照我们前面安装好的Spark版本,这些jar包都不在里面,为了证明这一点,我们现在可以测试一下。请打开一个新的终端,然后启动spark-shell:

cd /usr/local/spark
./bin/spark-shell

启动成功后,在spark-shell中执行下面import语句:

scala> import org.apache.spark.streaming.flume._
<console>:25: error: object flume is not a member of package org.apache.spark.streaming
         import org.apache.spark.streaming.flume._
                                           ^

你可以看到,马上会报错,因为找不到相关的jar包。所以,现在我们就需要下载spark-streaming-flume_2.11-2.1.0.jar,其中2.11表示对应的Scala版本号,2.1.0表示Spark版本号。现在请在Linux系统中,打开一个火狐浏览器,请点击这里访问官网,里面有提供spark-streaming-flume_2.11-2.1.0.jar文件的下载。

下载后的文件会被默认保存在当前Linux登录用户的下载目录下,本教程统一使用hadoop用户名登录Linux系统,所以,文件下载后会被保存到“/home/hadoop/下载”目录下面。现在,我们在“/usr/local/spark/jars”目录下新建一个“flume”目录,就把这个文件复制到Spark目录的“/usr/local/spark/jars/flume”目录下。请新打开一个终端,输入下面命令:

cd /usr/local/spark/jars
mkdir flume
cd ~
cd 下载
cp ./spark-streaming-flume_2.11-2.1.0.jar /usr/local/spark/jars/flume

这样,我们就成功地把spark-streaming-flume_2.11-2.1.0.jar文件拷贝到了“/usr/local/spark/jars/flume”目录下。

下面还要继续把Flume安装目录的lib目录下的所有jar文件复制到“/usr/local/spark/jars/flume”目录下,请在终端中执行下面命令:

cd /usr/local/flume/lib
ls
cp ./* /usr/local/spark/jars/flume

这样,我们就已经准备好了Spark环境,它可以支持Flume相关编程了。

编写Spark程序使用Flume数据源

下面,我们就可以进行程序编写了。请新打开一个终端,然后,执行命令创建代码目录:

cd /usr/local/spark/mycode
mkdir flume
cd flume
mkdir -p src/main/scala
cd src/main/scala
vim FlumeEventCount.scala

请在FlumeEventCount.scala代码文件中输入以下代码:

package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
object FlumeEventCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumeEventCount <host> <port>")
      System.exit(1)
    }
    StreamingExamples.setStreamingLogLevels()
    val Array(host, IntParam(port)) = args
    val batchInterval = Milliseconds(2000)
    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

保存FlumeEventCount.scala文件并退出vim编辑器。FlumeEventCount.scala程序在编译后运行时,需要我们提供host和port两个参数,程序会对指定的host和指定的port进行监听,Milliseconds(2000)设置了时间间隔为2秒,所以,该程序每隔2秒就会从指定的端口中获取由Flume Sink发给该端口的消息,然后进行处理,对消息进行统计,打印出“Received 0 flume events.”这样的信息。
然后再使用vim编辑器新建StreamingExamples.scala文件,输入如下代码,用于控制日志输出格式:

package org.apache.spark.examples.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

保存StreamingExamples.scala文件并退出vim编辑器。

这样,我们在“/usr/local/spark/mycode/flume/src/main/scala”目录下,就有了如下两个代码文件:

FlumeEventCount.scala
StreamingExamples.scala

然后,请执行下面命令新建一个simple.sbt文件:

cd /usr/local/spark/mycode/flume/
vim simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.1.0"  

保存文件退出vim编辑器。然后执行下面命令,进行打包编译:

cd /usr/local/spark/mycode/flume/
/usr/local/sbt/sbt package

如果看到类似如下的屏幕信息,就表示打包成功了:

hadoop@dblab-VirtualBox:/usr/local/spark/mycode/flume$ /usr/local/sbt/sbt package
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0
[info] Set current project to Simple Project (in build file:/usr/local/spark/mycode/flume/)
[info] Compiling 1 Scala source to /usr/local/spark/mycode/flume/target/scala-2.11/classes...
[info] Packaging /usr/local/spark/mycode/flume/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 19 s, completed 2017-2-25 20:47:15
hadoop@dblab-VirtualBox:/usr/local/spark/mycode/flume$

打包成功后,就可以执行程序测试效果了。

测试程序效果

关闭之前打开的所有终端。首先,请新建第1个Linux终端,启动Spark Streaming应用程序,命令如下:

cd /usr/local/spark
./bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/flume/* --class "org.apache.spark.examples.streaming.FlumeEventCount" /usr/local/spark/mycode/flume/target/scala-2.11/simple-project_2.11-1.0.jar localhost 44444

通过上面命令,我们为应用程序提供host和port两个参数的值分别为localhost和44444,程序会对localhost的44444端口进行监听,Milliseconds(2000)设置了时间间隔为2秒,所以,该程序每隔2秒就会从指定的端口中获取由Flume Sink发给该端口的消息,然后进行处理,对消息进行统计,打印出“Received 0 flume events.”这样的信息。

执行该命令后,屏幕上会显示程序运行的相关信息,并会每隔2秒钟刷新一次信息,大量信息中会包含如下重要信息:

-------------------------------------------
Time: 1488029430000 ms
-------------------------------------------
Received 0 flume events.

因为目前Flume还没有启动,没有给FlumeEventCount发送任何消息,所以Flume Events的数量是0。
第1个终端不要关闭,让它一直处于监听状态。

现在,我们可以再另外新建第2个终端,在这个新的终端中启动Flume Agent,命令如下:

cd /usr/local/flume
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

启动agent以后,该agent就会一直监听localhost的33333端口,这样,我们下面就可以通过“telnet localhost 33333”命令向Flume Source发送消息。第2个终端也不要关闭,让它一直处于监听状态。

请另外新建第3个终端,执行如下命令:

telnet localhost 33333

执行该命令以后,就可以在这个窗口里面随便敲入若干个字符和若干个回车,这些消息都会被Flume监听到,Flume把消息采集到以后汇集到Sink,然后由Sink发送给Spark的FlumeEventCount程序进行处理。然后,你就可以在运行FlumeEventCount的前面那个终端窗口内看到类似如下的统计结果:

-------------------------------------------
Time: 1488029430000 ms
-------------------------------------------
Received 0 flume events.
#这里省略了其他屏幕信息
-------------------------------------------
Time: 1488029432000 ms
-------------------------------------------
Received 8 flume events.
#这里省略了其他屏幕信息
-------------------------------------------
Time: 1488029434000 ms
-------------------------------------------
Received 21 flume events.

从屏幕信息中可以看出,我们在telnet那个终端内发送的消息,都被成功发送到Spark进行处理了。

至此,本实验顺利完成。实验结束后,要关闭各个终端,只要切换到该终端窗口,然后按键盘的Ctrl+C组合键,就可以结束程序运行。

子雨大数据之Spark入门
扫一扫访问本博客