林子雨编著《Spark编程基础(Python版)》教材第6章的命令行和代码

大数据学习路线图

林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看所有章节代码

第6章 Spark Streaming

>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 1)
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
cd  /usr/local/spark/mycode
mkdir  streaming
cd  streaming
mkdir  logfile
>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 10)
>>> lines = ssc. \
... textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
>>> words = lines.flatMap(lambda line: line.split(' '))
>>> wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b)
>>> wordCounts.pprint()
>>> ssc.start()
>>> ssc.awaitTermination()

FileStreaming.py

#!/usr/bin/env python3

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 10)
lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
cd /usr/local/spark/mycode/streaming/logfile/
/usr/local/spark/bin/spark-submit FileStreaming.py

NetworkWordCount.py

#!/usr/bin/env python3

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" ")) \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()
nc  -lk  9999
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999

DataSourceSocket.py

#!/usr/bin/env python3

import socket

# 生成socket对象
server = socket.socket()
# 绑定ip和端口
server.bind(('localhost', 9999))
# 监听绑定的端口
server.listen(1)
while 1:
    # 为了方便识别,打印一个“我在等待”
    print("I'm waiting the connect...")
    # 这里用两个值接受,因为连接上之后使用的是客户端发来请求的这个实例
    # 所以下面的传输要使用conn实例操作
    conn,addr = server.accept()
    # 打印连接成功
    print("Connect success! Connection is from %s " % addr[0])
    # 打印正在发送数据
    print('Sending data...')
    conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())
    conn.close()
    print('Connection is broken.')
cd  /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit DataSourceSocket.py
cd  /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999

RDDQueueStream.py

#!/usr/bin/env python3

import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingQueueStream")
    ssc = StreamingContext(sc, 2)
    #创建一个队列,通过该队列可以把RDD推给一个RDD队列流
    rddQueue = []
    for i in range(5):
        rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
        time.sleep(1)
    #创建一个RDD队列流
    inputStream = ssc.queueStream(rddQueue)
    mappedStream = inputStream.map(lambda x: (x % 10, 1))
    reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
    reducedStream.pprint()
    ssc.start()
    ssc.stop(stopSparkContext=True, stopGraceFully=True)
cd  /usr/local/spark/mycode/streaming/rddqueue
/usr/local/spark/bin/spark-submit RDDQueueStream.py
cd  /usr/local/kafka
./bin/zookeeper-server-start.sh  config/zookeeper.properties
cd  /usr/local/kafka
./bin/kafka-server-start.sh  config/server.properties
cd  /usr/local/kafka
./bin/kafka-server-start.sh  config/server.properties  &
cd  /usr/local/kafka
./bin/kafka-topics.sh  --create  --zookeeper  localhost:2181  \
> --replication-factor  1  --partitions  1  --topic  wordsendertest
#这个Topic叫wordsendertest,2181是Zookeeper默认的端口号,--partitions是Topic里面的分区数,--replication-factor是备份的数量,在Kafka集群中使用,由于这里是单机版,所以不用备份
#可以用list列出所有创建的Topic,来查看上面创建的Topic是否存在
./bin/kafka-topics.sh  --list  --zookeeper  localhost:2181
./bin/kafka-console-producer.sh  --broker-list  localhost:9092 \
>  --topic  wordsendertest
cd /usr/local/kafka
./bin/kafka-console-consumer.sh  --zookeeper  localhost:2181  \
> --topic  wordsendertest  --from-beginning
cd  /usr/local/spark/jars
mkdir  kafka
cd  ~/下载
cp  ./spark-streaming-kafka-0-8_2.11-2.4.0.jar  /usr/local/spark/jars/kafka
cd  /usr/local/kafka/libs
ls
cp  ./*  /usr/local/spark/jars/kafka
cd  /usr/local/spark/conf
vim spark-env.sh
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*:/usr/local/spark/examples/jars/*:/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*

KafkaWordCount.py

#!/usr/bin/env python3

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils 

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: KafkaWordCount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)
    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils. \
createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()
cd  /usr/local/spark/mycode/streaming/kafka/
/usr/local/spark/bin/spark-submit  \
> ./KafkaWordCount.py localhost:2181 wordsendertest

WindowedNetworkWordCount.py

#!/usr/bin/env python3

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")
    ssc = StreamingContext(sc, 10)
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/socket/checkpoint")
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  . reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()
cd /usr/local/spark/mycode/streaming/socket/
nc -lk 9999
cd /usr/local/spark/mycode/streaming/socket/
/usr/local/spark/bin/spark-submit  WindowedNetworkWordCount.py localhost 9999

NetworkWordCountStateful.py

#!/usr/bin/env python3

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    ssc = StreamingContext(sc, 1)
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/") 
    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])

    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0) 

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    running_counts = lines.flatMap(lambda line: line.split(" "))\
                          .map(lambda word: (word, 1))\
                          .updateStateByKey(updateFunc, initialRDD=initialStateRDD) 
    running_counts.pprint()
    ssc.start()
    ssc.awaitTermination()
nc  -lk  9999
cd /usr/local/spark/mycode/streaming/stateful
/usr/local/spark/bin/spark-submit NetworkWordCountStateful.py localhost 9999

NetworkWordCountStatefulText.py

#!/usr/bin/env python3

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    ssc = StreamingContext(sc, 1)
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")
    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0)
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    running_counts = lines.flatMap(lambda line: line.split(" "))\
                          .map(lambda word: (word, 1))\
                          .updateStateByKey(updateFunc, initialRDD=initialStateRDD)
    running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/stateful/output")
    running_counts.pprint()
    ssc.start()
    ssc.awaitTermination()
mysql> use  spark;
mysql> create  table  wordcount (word char(20), count int(4));
sudo apt-get update
sudo apt-get install python3-pip
pip3 -V
sudo pip3 install PyMySQL

NetworkWordCountStatefulDB.py

#!/usr/bin/env python3

from __future__ import print_function 
import sys 
import pymysql 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext 

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: NetworkWordCountStateful <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    ssc = StreamingContext(sc, 1)
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful") 
    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) 

    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0) 

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    running_counts = lines.flatMap(lambda line: line.split(" "))\
                          .map(lambda word: (word, 1))\
                          .updateStateByKey(updateFunc, initialRDD=initialStateRDD) 
    running_counts.pprint() 

    def dbfunc(records):
        db = pymysql.connect("localhost","root","123456","spark")
        cursor = db.cursor() 
        def doinsert(p):
            sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()
        for item in records:
            doinsert(item) 

    def func(rdd):
        repartitionedRDD = rdd.repartition(3)
        repartitionedRDD.foreachPartition(dbfunc)

    running_counts.foreachRDD(func)
    ssc.start()
    ssc.awaitTermination()