【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
返回Spark教程首页
推荐纸质教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》
在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。
准备工作
这里依然以前面的“DStream转换操作”章节中介绍的NetworkWordCountStateful.py为基础进行修改。下面,让我们先调试一遍这个程序,复习一下整个程序的细节。
请登录Linux系统,打开一个终端:
cd /usr/local/spark/mycode/streaming/
ls
可以看到目录下有NetworkWordCountStateful.py文件
其中,NetworkWordCountStateful.py的代码如下:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/")
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
running_counts.pprint()
ssc.start()
ssc.awaitTermination()
运行此程序,执行如下代码:
python3 NetworkWordCountStateful.py localhost 9999
启动后屏幕上就会显示类似下面的程序运行信息:
-------------------------------------------
Time: 1479890485000 ms
-------------------------------------------
-------------------------------------------
Time: 1479890490000 ms
-------------------------------------------
下面,请打开另外一个终端,作为单词产生的源头,提供给NetworkWordCountStateful程序进行词频统计:
nc -lk 9999
//请手动输入一些单词,可以随便输入,输入一个单词就敲入回车,比如下面是笔者输入的单词
hadoop
spark
hadoop
spark
hadoop
spark
这个时候,你再去看刚才运行NetworkWordCountStateful程序的监听窗口,就可以看到类似下面的词频统计结果:
-------------------------------------------
Time: 1479890485000 ms
-------------------------------------------
(spark,1)
(hadoop,1)
-------------------------------------------
Time: 1479890490000 ms
-------------------------------------------
(spark,2)
(hadoop,3)
现在可以结束程序运行了,可以按Ctrl+Z结束当前程序的运行。
好了,以前的知识复习结束。现在,请把刚才打开的这两个终端窗口都关闭掉。有了这个基础,我们就可以顺利开展本节内容的学习了,也就是如何把DStream输出到外部系统中。
把DStream输出到文本文件中
下面请新打开一个终端。为了不破坏以前的代码,我们单独复制上面这些代码到新的文件中,执行如下代码
cp ./NetworkWordCountStateful.py ./DstreamToText.py
首先,我们尝试把DStream内容保存到文本文件中,可以使用如下语句:
running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/output.txt")
下面使用vim编辑器打开DstreamToText.py代码文件:
vim ./DstreamToText.py
我们要把这条保存数据的语句stateDstream.saveAsTextFiles()放入到DstreamToText.py代码中,修改后的代码如下(或者你可以把NetworkWordCountStateful.scala原来的代码内容全部删除,直接把下面修改后的代码复制进去):
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 5)
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/")
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" ")) .map(lambda word: (word, 1)).updateStateByKey(updateFunc, initialRDD=initialStateRD)
running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/output.txt")
running_counts.pprint()
ssc.start()
ssc.awaitTermination()
运行此程序,执行如下命令:
python3 ./DstreamToText.py localhost 9999
程序运行以后,屏幕上就会显示类似下面的程序运行信息:
-------------------------------------------
Time: 1479890485000 ms
-------------------------------------------
-------------------------------------------
Time: 1479890490000 ms
-------------------------------------------
下面,请打开另外一个终端,作为单词产生的源头,提供给NetworkWordCountStateful程序进行词频统计:
nc -lk 9999
//请手动输入一些单词,可以随便输入,比如下面是笔者输入的单词
hadoop
spark
hadoop
spark
hadoop
spark
这个时候,你再去看刚才运行NetworkWordCountStateful程序的监听窗口,就可以看到类似下面的词频统计结果:
-------------------------------------------
Time: 1479890485000 ms
-------------------------------------------
(spark,1)
(hadoop,1)
-------------------------------------------
Time: 1479890490000 ms
-------------------------------------------
(spark,2)
(hadoop,3)
然后,我们停止程序运行,在当前终端内按键盘Ctrlz+Z组合键,就可以结束程序运行。现在我们就看一下,这些词频结果是否被成功地输出到“/usr/local/spark/mycode/streaming/output.txt”文件中了。请执行如下命令:
cd /usr/local/spark/mycode/streaming/
ls
可以发现,在这个目录下,生成了很多文本文件,如下:
output.txt-1479951955000
output.txt-1479951960000
output.txt-1479951965000
output.txt-1479951970000
output.txt-1479951975000
output.txt-1479951980000
output.txt-1479951985000
可以看出,由于我们在代码中有一句" ssc = StreamingContext(sc, 5)”,也就是说,每隔5秒钟统计一次词频,所以,每隔5秒钟就会生成一次词频统计结果,并输出到“/usr/local/spark/mycode/streaming/output.txt”中,每次生成的output.txt后面会自动被加上时间标记(比如1479951955000)。这里要注意,虽然我们把DStream输出到“/usr/local/spark/mycode/streaming/output.txt”中,output.txt的命名看起来像一个文件,但是,实际上,spark会生成名称为output.txt的目录,而不是文件。这个问题,我们在之前“文件数据读写”章节的学习中已经讨论过,这里不再赘述。
我们可以查看一下某个output.txt下面的内容:
cat output.txt-1479951980000/*
会得到类似下面的结果:
(hello,1)
(spark,2)
(hadoop,2)
说明我们已经成功地把DStream输出到文本文件了。
把DStream写入到MySQL数据库中
请执行下面命令在Linux中启动MySQL数据库,并完成数据库和表的创建:
service mysql start
mysql -u root -p
输入密码后,你就可以进入“mysql>”命令提示符状态,然后就可以输入下面的SQL语句完成数据库和表的创建。在我们之前“通过JDBC连接数据库(DataFrame)”章节的内容中,我们已经在MySQL数据库中创建了一个名称为“spark”的数据库和名称为“student”的表。这里,我们可以直接使用这个已经创建好的“spark”数据库,然后,在这个数据库中创建一个名称为“wordcount”的表,命令如下:
mysql> use spark
mysql> create table wordcount (word char(20), count int(4));
mysql> select * from wordcount
//这个时候wordcount表是空的,没有任何记录
由于需要python连接到mysql的模块,所以请执行如下命令:
sudo pip3 install PyMySQL
下面,请新打开另外一个Linux终端,我们就可以对NetworkWordCountStateful.py代码文件进行修改,为了方便起见,你可以直接执行下面命令删除该文件,再用vim编辑器新建该文件:
cd /usr/local/spark/mycode/streaming/
rm NetworkWordCountStateful.py
vim NetworkWordCountStateful.py
然后,在NetworkWordCountStateful.py文件中加入下面代码:
from __future__ import print_function
import sys
import pymysql
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/")
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
running_counts.pprint()
def dbfunc(records):
db = pymysql.connect("localhost","root","root","spark")
cursor = db.cursor()
def doinsert(p):
sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
for item in records:
doinsert(item)
def func(rdd):
repartitionedRDD = rdd.repartition(3)
repartitionedRDD.foreachPartition(dbfunc)
running_counts.foreachRDD(func)
ssc.start()
ssc.awaitTermination()
保存该文件,退出vim编辑器。这个代码的具体含义,我们过一会儿再解释,现在我们先执行一下程序,看看效果。
python3 ./NetworkWordCountStateful.py localhost 9999
执行上面命令以后,就进入监听状态,下面我们打开另外一个终端,运行下面命令产生单词提供给NetworkWordCountStateful进行词频统计:
nc -lk 9999
//现在你就可以在当前窗口内随意输入单词,输入一个单词就回车,比如输入下面单词
hello
hadoop
spark
hello
spark
输入一些单词以后,就可以按Ctrl+Z停止nc程序。然后切换到刚才运行NetworkWordCountStateful程序的监听窗口,也按Ctrl+Z停止NetworkWordCountStateful程序运行。然后,再切换到刚才运行MySQL的终端窗口,在mysql命令提示符下输入查看命令,就可以查看spark.wordcount表内当前最新的数据:
mysql> select * from wordcount;
+--------+-------+
| word | count |
+--------+-------+
| hello | 1 |
| hadoop | 1 |
| hello | 2 |
| spark | 1 |
| hadoop | 2 |
| hello | 2 |
| spark | 3 |
| hadoop | 2 |
+--------+-------+
我们重点来分析其中的这段代码(其他代码之前我们都已经介绍过含义):
def dbfunc(records):
db = pymysql.connect("localhost","root","root","spark")
cursor = db.cursor()
def doinsert(p):
sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
for item in records:
doinsert(item)
def func(rdd):
repartitionedRDD = rdd.repartition(3)
repartitionedRDD.foreachPartition(dbfunc)
running_counts.foreachRDD(func)
也就是说,对于running_counts,为了把它保存到MySQL数据库中,我们采用了如下的形式:
running_counts.foreachRDD(func)
其中,func就是一个RDD[T]=>Unit类型的函数,对于本程序而言,就是RDD[(String,Int)] : Unit类型的函数,也就是说,stateDstream中的每个RDD都是RDD[(String,Int)]类型(想象一下,统计结果的形式是("hadoop",3))。这样,对stateDstream中的每个RDD都会执行function中的操作(即把该RDD保存到MySQL的操作)。
下面看function的处理逻辑,在function部分,函数体要执行的处理逻辑实际上是下面的形式:
val repartitionedRDD = rdd.repartition(3)
repartitionedRDD.foreachPartition(dbfunc)
也就是说,这里定义了一个内部函数dbfunc,它的功能是,接收records,然后把records保存到MySQL中。到这里,你可能会有疑问?为什么不是把stateDstream中的每个RDD直接拿去保存到MySQL中,还要调用rdd.repartition(3)对这些RDD重新设置分区数为3呢?这是因为,每次保存RDD到MySQL中,都需要启动数据库连接,如果RDD分区数量太大,那么就会带来多次数据库连接开销,为了减少开销,就有必要把RDD的分区数量控制在较小的范围内,所以,这里就把RDD的分区数量重新设置为3。然后,对于每个RDD分区,就调用repartitionedRDD.foreachPartition(dbfunc),把每个分区的数据通过dbfunc保存到MySQL中,这时,传递给dbfunc的输入参数就是Iterator[(String,Int)]类型的records。如果你不好理解下面这种调用形式:
repartitionedRDD.foreachPartition(dbfunc)
# 这种形式func没有带任何参数,可能不太好理解,不是那么直观
实际上,这句语句和下面的语句是等价的,下面的语句形式你可能会更好理解:
repartitionedRDD.foreachPartition(lambda records : dbfunc(records))
上面这种等价的形式比较直观,为func()函数传入了一个records参数,这就正好和 def func(records: Iterator[(String,Int)])定义对应起来了,方便理解。