Spark2.1.0+入门:文件流(DStream)(Python版)

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
返回Spark教程首页
推荐纸质教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》

Spark支持从兼容HDFS API的文件系统中读取数据,创建数据流。

为了能够演示文件流的创建,我们需要首先创建一个日志目录,并在里面放置两个模拟的日志文件。请在Linux系统中打开另一个终端,进入Shell命令提示符状态:

cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir logfile
cd logfile

然后,在logfile中新建两个日志文件log1.txt和log2.txt,里面可以随便输入一些内容。
比如,我们在log1.txt中输入以下内容:

I love Hadoop
I love Spark
Spark is fast

下面我们就进入pyspark创建文件流。请另外打开一个终端窗口,启动进入pyspark。

>>> from operator import add
>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 20)
>>> lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
>>> words = lines.flatMap(lambda line: line.split(' '))
>>> wordCounts = words.map(lambda x : (x,1)).reduceByKey(add)
>>> wordCounts.pprint()
>>> ssc.start() 
>>>#实际上,当你输入这行回车后,Spark Streaming就开始进行循环监听,下面的ssc.awaitTermination()是无法输入到屏幕上的,但是,为了程序完整性,这里还是给出ssc.awaitTermination()
>>> ssc.awaitTermination()

所以,上面在pyspark中执行的程序,一旦你输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息,如下:

//这里省略若干屏幕信息
-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
//这里省略若干屏幕信息
-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------
//这里省略若干屏幕信息
-------------------------------------------
Time: 1479431140000 ms
-------------------------------------------

从上面的屏幕显示信息可以看出,Spark Streaming每隔20秒就监听一次。但是,你这时会感到奇怪,既然启动监听了,为什么程序没有把我们刚才放置在"/usr/local/spark/mycode/streaming/logfile"目录下的log1.txt和log2.txt这两个文件中的内容读取出来呢?原因是,监听程序只监听"/usr/local/spark/mycode/streaming/logfile"目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件。所以,为了能够让程序读取文件内容并显示到屏幕上,让我们能够看到效果,这时,我们需要到"/usr/local/spark/mycode/streaming/logfile"目录下再新建一个log3.txt文件,请打开另外一个终端窗口(我们称为shell窗口),当前正在执行监听工作的spark-shell窗口依然保留。请在shell窗口中执行Linux操作,在"/usr/local/spark/mycode/streaming/logfile"目录下再新建一个log3.txt文件,里面随便输入一些英文单词,创建完成以后,再切换回到spark-shell窗口。请等待20秒(因为我们刚才设置的是每隔20秒就监听一次,如果你创建文件动作很快,可能20秒还没到)。现在你会发现屏幕上不断输出新的信息,导致你无法看清楚单词统计结果是否已经被打印到屏幕上。所以,你现在必须停止这个监听程序,否则它一直在pyspark窗口中不断循环监听,停止的方法是,按键盘Ctrl+D,或者Ctrl+C。停止以后,就彻底停止,并且退出了pyspark状态,回到了Shell命令提示符状态。然后,你就可以看到屏幕上,在一大堆输出信息中,你可以找到打印出来的单词统计信息。

好了,上面我们是在spark-shell中直接执行代码,但是,很多时候,我们需要编写独立应用程序进行监听,所以,下面我们介绍如何采用独立应用程序的方式实现上述监听文件夹的功能。

请打开一个Linux终端窗口,进入shell命令提示符状态,然后,执行下面命令:

cd /usr/local/spark/mycode/streaming
vim TestStreaming.py

在TestStreaming.py中输入如下代码:

from operator import add
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 20)
lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x : (x,1)).reduceByKey(add)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

保存成功后,执行在命令行中执行如下代码:

python3 TestStreaming.py