Spark入门:文件流(DStream)

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]

Spark支持从兼容HDFS API的文件系统中读取数据,创建数据流。

为了能够演示文件流的创建,我们需要首先创建一个日志目录,并在里面放置两个模拟的日志文件。请在Linux系统中打开另一个终端,进入Shell命令提示符状态:

cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir logfile
cd logfile

然后,在logfile中新建两个日志文件log1.txt和log2.txt,里面可以随便输入一些内容。
比如,我们在log1.txt中输入以下内容:

I love Hadoop
I love Spark
Spark is fast

下面我们就进入spark-shell创建文件流。请另外打开一个终端窗口,启动进入spark-shell。

scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(20))
scala> val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
scala> val words = lines.flatMap(_.split(" "))
scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) 
scala> wordCounts.print()  
scala> ssc.start()  //实际上,当你输入这行回车后,Spark Streaming就开始进行循环监听,下面的ssc.awaitTermination()是无法输入到屏幕上的,但是,为了程序完整性,这里还是给出ssc.awaitTermination()
scala> ssc.awaitTermination()

所以,上面在spark-shell中执行的程序,一旦你输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息,如下:

//这里省略若干屏幕信息
-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
//这里省略若干屏幕信息
-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------
//这里省略若干屏幕信息
-------------------------------------------
Time: 1479431140000 ms
-------------------------------------------

从上面的屏幕显示信息可以看出,Spark Streaming每隔20秒就监听一次。但是,你这时会感到奇怪,既然启动监听了,为什么程序没有把我们刚才放置在"/usr/local/spark/mycode/streaming/logfile"目录下的log1.txt和log2.txt这两个文件中的内容读取出来呢?原因是,监听程序只监听"/usr/local/spark/mycode/streaming/logfile"目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件。所以,为了能够让程序读取文件内容并显示到屏幕上,让我们能够看到效果,这时,我们需要到"/usr/local/spark/mycode/streaming/logfile"目录下再新建一个log3.txt文件,请打开另外一个终端窗口(我们称为shell窗口),当前正在执行监听工作的spark-shell窗口依然保留。请在shell窗口中执行Linux操作,在"/usr/local/spark/mycode/streaming/logfile"目录下再新建一个log3.txt文件,里面随便输入一些英文单词,创建完成以后,再切换回到spark-shell窗口。请等待20秒(因为我们刚才设置的是每隔20秒就监听一次,如果你创建文件动作很快,可能20秒还没到)。现在你会发现屏幕上不断输出新的信息,导致你无法看清楚单词统计结果是否已经被打印到屏幕上。所以,你现在必须停止这个监听程序,否则它一直在spark-shell窗口中不断循环监听,停止的方法是,按键盘Ctrl+D,或者Ctrl+C。停止以后,就彻底停止,并且退出了spark-shell状态,回到了Shell命令提示符状态。然后,你就可以看到屏幕上,在一大堆输出信息中,你可以找到打印出来的单词统计信息。

好了,上面我们是在spark-shell中直接执行代码,但是,很多时候,我们需要编写独立应用程序进行监听,所以,下面我们介绍如何采用独立应用程序的方式实现上述监听文件夹的功能。
我们采用scala语言编写程序,而且要采用sbt打包编译,因此,必须符合sbt打包的规范(可以点击这里参考前面章节内容复习一下如何使用sbt打包编译scala程序)。当然,不复习以前的知识,直接按照我们下面的步骤来,你也可以顺利实现sbt打包编译。
请打开一个Linux终端窗口,进入shell命令提示符状态,然后,执行下面命令:

cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir -p src/main/scala
cd src/main/scala
vim TestStreaming.scala

这样就用vim编辑器新建一个TestStreaming.scala代码文件,请在里面输入以下代码:

import org.apache.spark._ 
import org.apache.spark.streaming._

object WordCountStreaming {  
  def main(args: Array[String]) {  
    val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")//设置为本地运行模式,2个线程,一个监听,另一个处理数据    
    val ssc = new StreamingContext(sparkConf, Seconds(20))// 时间间隔为20秒    
    val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")  //这里采用本地文件,当然你也可以采用HDFS文件
    val words = lines.flatMap(_.split(" "))  
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)  
    wordCounts.print()  
    ssc.start()  
    ssc.awaitTermination()  
  }  
}  

代码文件写好后,就可以保存并退出vim编辑器。然后,执行下面命令:

cd /usr/local/spark/mycode/streaming
vim simple.sbt //注意,是simple.sbt,不是simple.txt

打开vim编辑器以后,在simple.sbt文件中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.2"

千万要注意,上面是经过笔者测试的正确代码,千万不要使用apache官网的代码(会出错,折腾了笔者4个小时)。下面是官网给出的格式,但是会编译错误:

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2"

上面这个官网给出的格式,有两个错误的地方,一个是"org.apache.spark"后面只有1个百分号,错了,应该是两个百分号。另外,"spark-streaming_2.10"包含了版本号,错误,不应该有版本号,如果加了版本号,编译就无法通过,会报错。

创建好simple.sbt文件后,退出vim编辑器。然后,执行下面命令:

cd /usr/local/spark/mycode/streaming
find .

屏幕上返回的信息,应该是类似下面的文件结构:

.
./src
./src/main
./src/main/scala
./src/main/scala/TestStreaming.scala
./simple.sbt

然后,就可以执行sbt打包编译了,命令如下:

cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package

打包成功以后,就可以输入以下命令启动这个程序:

cd /usr/local/spark/mycode/streaming
/usr/local/spark/bin/spark-submit --class "WordCountStreaming" /usr/local/spark/mycode/streaming/target/scala-2.10/simple-project_2.10-1.0.jar 

执行上面命令后,就进入了监听状态(我们把运行这个监听程序的窗口称为监听窗口),这时,你就可以像刚才一样,切换到另外一个Shell窗口,在"/usr/local/spark/mycode/streaming/logfile"目录下再新建一个log5.txt文件,文件里面随便输入一些单词,保存好文件退出vim编辑器。然后,再次切换回“监听窗口”,等待20秒以后,按键盘Ctrl+C或者Ctrl+D停止监听程序,就可以看到监听窗口的屏幕上会打印出单词统计信息。

子雨大数据之Spark入门
扫一扫访问本博客