厦门大学林子雨编著《Spark编程基础(Python版,第2版)》教材中的命令行和代码(教材官网)
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Spark编程基础(Python版,第2版)》教材中的所有命令行和代码
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir logfile
>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 10)
>>> lines = ssc. \
... textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
>>> words = lines.flatMap(lambda line: line.split(' '))
>>> wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b)
>>> wordCounts.pprint()
>>> ssc.start()
>>> ssc.awaitTermination()
FileStreaming.py代码文件
# coding:utf8
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
if __name__ == '__main__':
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 10)
lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
cd /usr/local/spark/mycode/streaming/logfile/
/usr/local/spark/bin/spark-submit FileStreaming.py
代码文件NetworkWordCount.py
# coding:utf8
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
nc -lk 9999
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
代码文件DataSourceSocket.py
# coding:utf8
import socket
# 生成socket对象
server = socket.socket()
# 绑定ip和端口
server.bind(('hadoop01', 9999))
# 监听绑定的端口
server.listen(1)
while 1:
# 为了方便识别,打印一个“I’m waiting the connect...”
print("I'm waiting the connect...")
# 这里用两个值接收,因为连接上之后使用的是客户端发来请求的这个实例
# 所以下面的传输要使用conn实例操作
conn,addr = server.accept()
# 打印连接成功
print("Connect success! Connection is from %s " % addr[0])
# 打印正在发送数据
print('Sending data...')
conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())
conn.close()
print('Connection is broken.')
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit DataSourceSocket.py
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py hadoop01 9999
代码文件RDDQueueStream.py
#!/usr/bin/env python3
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingQueueStream")
ssc = StreamingContext(sc, 2)
#创建一个队列,通过该队列可以把RDD推给一个RDD队列流
rddQueue = []
for i in range(5):
rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
time.sleep(1)
#创建一个RDD队列流
inputStream = ssc.queueStream(rddQueue)
mappedStream = inputStream.map(lambda x: (x % 10, 1))
reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
reducedStream.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
cd /usr/local/spark/mycode/streaming/rddqueue
/usr/local/spark/bin/spark-submit RDDQueueStream.py
代码文件WindowedNetworkWordCount.py
# coding:utf8
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")
ssc = StreamingContext(sc, 10)
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/socket/checkpoint")
lines = ssc.socketTextStream("hadoop01",9999)
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
. reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
counts.pprint()
ssc.start()
ssc.awaitTermination()
cd /usr/local/spark/mycode/streaming/socket/
nc -lk 9999
cd /usr/local/spark/mycode/streaming/socket/
/usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py
代码文件NetworkWordCount- Stateful.py
#coding:utf8
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream("hadoop01", 9999)
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
running_counts.pprint()
ssc.start()
ssc.awaitTermination()
nc -lk 9999
cd /usr/local/spark/mycode/streaming/stateful
/usr/local/spark/bin/spark-submit NetworkWordCountStateful.py
代码文件NetworkWordCountStatefulText.py
# coding:utf8
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream("hadoop01", 9999)
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
running_counts.saveAsTextFiles(\
"file:///usr/local/spark/mycode/streaming/stateful/output")
running_counts.pprint()
ssc.start()
ssc.awaitTermination()
mysql> use spark;
mysql> create table wordcount (word char(20), count int(4));
conda activate pyspark
pip install pymysql
代码文件NetworkWordCountStatefulDB.py
# coding:utf8
from __future__ import print_function
import sys
import pymysql
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful")
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream("hadoop01", 9999)
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
running_counts.pprint()
def dbfunc(records):
host = "localhost"
port = 3306
user = "root"
password = "123456"
database = "spark"
# 创建MySQL连接
db = pymysql.connect(host=host, port=port, user=user, password=password, database=database)
cursor = db.cursor()
def doinsert(p):
sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
for item in records:
doinsert(item)
def func(rdd):
repartitionedRDD = rdd.repartition(3)
repartitionedRDD.foreachPartition(dbfunc)
running_counts.foreachRDD(func)
ssc.start()
ssc.awaitTermination()