Spark2.1.0入门:RDD队列流(DStream)(Python版)

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
返回Spark教程首页
推荐纸质教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》

在调试Spark Streaming应用程序的时候,我们可以使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream。

下面是参考Spark官网的QueueStream程序设计的程序,每隔1秒创建一个RDD,Streaming每隔2秒就对数据进行处理。
请登录Linux系统,打开一个终端,进入Shell命令提示符状态,然后执行下面命令新建代码文件:

cd /usr/local/spark/mycode/streaming/ //这个目录在前面章节操作中已经创建好了
vim TestRDDQueueStream.py

上面用vim编辑器新建了一个TestRDDQueueStream.py文件,请在该文件中输入以下代码:

import time

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":

    sc = SparkContext(appName="PythonStreamingQueueStream")
    ssc = StreamingContext(sc, 1)

    # Create the queue through which RDDs can be pushed to
    # a QueueInputDStream
    rddQueue = []
    for i in range(5):
        rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]

    # Create the QueueInputDStream and use it do some processing
    inputStream = ssc.queueStream(rddQueue)
    mappedStream = inputStream.map(lambda x: (x % 10, 1))
    reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
    reducedStream.pprint()

    ssc.start()
    time.sleep(6)
    ssc.stop(stopSparkContext=True, stopGraceFully=True)

然后执行如下代码:

python3 ./TestRDDQueueStream.py

程序就开始运行,就可以看到类似下面的结果:

-------------------------------------------                                     
Time: 1479522100000 ms
-------------------------------------------
(4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)