Spark2.1.0+入门:套接字流(DStream)(Python版)

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
返回Spark教程首页
推荐纸质教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。

请新打开一个Shell窗口,进入Shell命令提示符状态,然后执行下面命令:

cd /usr/local/spark/mycode
mkdir streaming #如果已经存在该目录,则不用创建
vim NetworkWordCount.py

上面用vim编辑器新建了NetworkWordCount.py代码文件,请在该文件中输入如下内容:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    conf = SparkConf()
    conf.setAppName('PythonStreamingNetworkWordCount')
    conf.setMaster('local[2]')
    sc = SparkContext(conf = conf)
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

保存成功后,执行如下代码:

sudo nc -lk 9999

然后打开第二个终端作为监听窗口,执行如下代码:

cd /usr/local/spark/mycode/streaming
python3 NetworkWordCount.py localhost 9999

这样,就可以在nc第一个终端窗口窗口中随意输入一些单词,监听窗口就会自动获得单词数据流信息,在监听窗口每隔1秒就会打印出词频统计信息,大概会再屏幕上出现类似如下的结果:

-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
(hello,1)
(world,1)
-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------
(hadoop,1)
-------------------------------------------
Time: 1479431140000 ms
-------------------------------------------
(spark,1)

如果要停止运行上述程序,只要按键盘Ctrl+Z键就可以了。