Spark2.1.0入门:RDD队列流(DStream)(Python版)

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
返回Spark教程首页
推荐纸质教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》

在调试Spark Streaming应用程序的时候,我们可以使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream。

下面是参考Spark官网的QueueStream程序设计的程序,每隔1秒创建一个RDD,Streaming每隔2秒就对数据进行处理。
请登录Linux系统,打开一个终端,进入Shell命令提示符状态,然后执行下面命令新建代码文件:

  1. cd /usr/local/spark/mycode/streaming/ //这个目录在前面章节操作中已经创建好了
  2. vim TestRDDQueueStream.py
Shell 命令

上面用vim编辑器新建了一个TestRDDQueueStream.py文件,请在该文件中输入以下代码:

  1. import time
  2.  
  3. from pyspark import SparkContext
  4. from pyspark.streaming import StreamingContext
  5.  
  6. if __name__ == "__main__":
  7.  
  8. sc = SparkContext(appName="PythonStreamingQueueStream")
  9. ssc = StreamingContext(sc, 1)
  10.  
  11. # Create the queue through which RDDs can be pushed to
  12. # a QueueInputDStream
  13. rddQueue = []
  14. for i in range(5):
  15. rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
  16.  
  17. # Create the QueueInputDStream and use it do some processing
  18. inputStream = ssc.queueStream(rddQueue)
  19. mappedStream = inputStream.map(lambda x: (x % 10, 1))
  20. reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
  21. reducedStream.pprint()
  22.  
  23. ssc.start()
  24. time.sleep(6)
  25. ssc.stop(stopSparkContext=True, stopGraceFully=True)
Python

然后执行如下代码:

  1. python3 ./TestRDDQueueStream.py
Python

程序就开始运行,就可以看到类似下面的结果:

-------------------------------------------                                     
Time: 1479522100000 ms
-------------------------------------------
(4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)