林子雨编著《Spark编程基础(Python版)》教材第7章的命令行和代码

大数据学习路线图

林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看所有章节代码

第7章 Structured Streaming

StructuredNetworkWordCount.py

#!/usr/bin/env python3

from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()

    words = lines.select(
        explode(
            split(lines.value, " ")
        ).alias("word")
    )

    wordCounts = words.groupBy("word").count()

    query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .trigger(processingTime="8 seconds") \
        .start()

    query.awaitTermination()
cd /usr/local/hadoop
sbin/start-dfs.sh
nc -lk 9999
cd /usr/local/spark/mycode/structuredstreaming/
/usr/local/spark/bin/spark-submit StructuredNetworkWordCount.py

spark_ss_filesource_generate.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 导入需要用到的模块
import os
import shutil
import random
import time


TEST_DATA_TEMP_DIR = '/tmp/'
TEST_DATA_DIR = '/tmp/testdata/'

ACTION_DEF = ['login', 'logout', 'purchase']
DISTRICT_DEF = ['fujian', 'beijing', 'shanghai', 'guangzhou']
JSON_LINE_PATTERN = '{{"eventTime": {}, "action": "{}", "district": "{}"}}\n'


# 测试的环境搭建,判断文件夹是否存在,如果存在则删除旧数据,并建立文件夹
def test_setUp():
    if os.path.exists(TEST_DATA_DIR):
        shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
    os.mkdir(TEST_DATA_DIR)


# 测试环境的恢复,对文件夹进行清理
def test_tearDown():
    if os.path.exists(TEST_DATA_DIR):
        shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)


# 生成测试文件
def write_and_move(filename, data):
    with open(TEST_DATA_TEMP_DIR + filename,
              "wt", encoding="utf-8") as f:
        f.write(data)

    shutil.move(TEST_DATA_TEMP_DIR + filename,
                TEST_DATA_DIR + filename)


if __name__ == "__main__":
    test_setUp()

    for i in range(1000):
        filename = 'e-mall-{}.json'.format(i)

        content = ''
        rndcount = list(range(100))
        random.shuffle(rndcount)
        for _ in rndcount:
            content += JSON_LINE_PATTERN.format(
                str(int(time.time())),
                random.choice(ACTION_DEF),
                random.choice(DISTRICT_DEF))
        write_and_move(filename, content)

        time.sleep(1)

    test_tearDown()

spark_ss_filesource.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 导入需要用到的模块
import os
import shutil
from pprint import pprint

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, asc
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType


# 定义JSON文件的路径常量
TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'


if __name__ == "__main__":
    # 定义模式,为时间戳类型的eventTime、字符串类型的操作和省份组成
    schema = StructType([
        StructField("eventTime", TimestampType(), True),
        StructField("action", StringType(), True),
        StructField("district", StringType(), True)])

    spark = SparkSession \
        .builder \
        .appName("StructuredEMallPurchaseCount") \
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    lines = spark \
        .readStream \
        .format("json") \
        .schema(schema) \
        .option("maxFilesPerTrigger", 100) \
        .load(TEST_DATA_DIR_SPARK)

    # 定义窗口
    windowDuration = '1 minutes'

    windowedCounts = lines \
        .filter("action = 'purchase'") \
        .groupBy('district', window('eventTime', windowDuration)) \
        .count() \
        .sort(asc('window'))

    query = windowedCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .option('truncate', 'false') \
        .trigger(processingTime="10 seconds") \
        .start()

    query.awaitTermination()
cd /usr/local/hadoop
sbin/start-dfs.sh
cd /usr/local/spark/mycode/structuredstreaming/file
python3 spark_ss_filesource_generate.py
cd /usr/local/spark/mycode/structuredstreaming/file
/usr/local/spark/bin/spark-submit spark_ss_filesource.py
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
cd /usr/local/kafka
bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 --topic wordcount-topic
cd /usr/local/kafka
bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 --topic wordcount-result-topic

spark_ss_kafka_producer.py

#!/usr/bin/env python3

import string
import random
import time

from kafka import KafkaProducer


if __name__ == "__main__":
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

    while True:
        s2 = (random.choice(string.ascii_lowercase) for _ in range(2))
        word = ''.join(s2)
        value = bytearray(word, 'utf-8')

        producer.send('wordcount-topic', value=value) \
            .get(timeout=10)

        time.sleep(0.1)

kafka/spark_ss_kafka_consumer.py

#!/usr/bin/env python3

from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredKafkaWordCount") \
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    lines = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", 'wordcount-topic') \
        .load() \
        .selectExpr("CAST(value AS STRING)")

    wordCounts = lines.groupBy("value").count()

    query = wordCounts \
        .selectExpr("CAST(value AS STRING) as key", "CONCAT(CAST(value AS STRING), ':', CAST(count AS STRING)) as value") \
        .writeStream \
        .outputMode("complete") \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "wordcount-result-topic") \
        .option("checkpointLocation", "file:///tmp/kafka-sink-cp") \
        .trigger(processingTime="8 seconds") \
        .start()

    query.awaitTermination()
cd /usr/local/spark/mycode/structuredstreaming/kafka/
/usr/local/spark/bin/spark-submit \
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
> spark_ss_kafka_consumer.py

spark_ss_rate.py

#!/usr/bin/env python3

from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("TestRateStreamSource") \
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    lines = spark \
        .readStream \
        .format("rate") \
        .option('rowsPerSecond', 5) \
        .load()

    print(lines.schema)

    query = lines \
        .writeStream \
        .outputMode("update") \
        .format("console") \
        .option('truncate', 'false') \
        .start()

    query.awaitTermination()
cd /usr/local/spark/mycode/structuredstreaming/rate/
/usr/local/spark/bin/spark-submit spark_ss_rate.py

StructuredNetworkWordCountFileSink.py

#!/usr/bin/env python3

from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
from pyspark.sql.functions import length


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCountFileSink") \
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()

    words = lines.select(
        explode(
            split(lines.value, " ")
        ).alias("word")
    )

    all_length_5_words = words.filter(length("word") == 5)

    query = all_length_5_words \
        .writeStream \
        .outputMode("append") \
        .format("parquet") \
        .option("path", "file:///tmp/filesink") \
        .option("checkpointLocation", "file:///tmp/file-sink-cp") \
        .trigger(processingTime="8 seconds") \
        .start()

    query.awaitTermination()
nc -lk 9999
cd /usr/local/spark/mycode/structuredstreaming
/usr/local/spark/bin/spark-submit StructuredNetworkWordCountFileSink.py
cd /tmp/filesink
ls

spark_ss_test_delay.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 导入需要用到的模块
import os
import shutil
from functools import partial

from pyspark.sql import SparkSession
from pyspark.sql.functions import window
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType

# 定义CSV文件的路径常量
TEST_DATA_DIR = '/tmp/testdata/'
TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'


# 测试的环境搭建,判断CSV文件夹是否存在,如果存在则删除旧数据,并建立文件夹
def test_setUp():
    if os.path.exists(TEST_DATA_DIR):
        shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
    os.mkdir(TEST_DATA_DIR)


# 测试环境的恢复,对CSV文件夹进行清理
def test_tearDown():
    if os.path.exists(TEST_DATA_DIR):
        shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)


# 写模拟输入的函数,传入CSV文件名和数据。注意写入应当是原子性的
# 如果写入时间较长,应当先写到临时文件再移动到CSV目录内。
# 这里采取直接写入的方式。
def write_to_csv(filename, data):
    with open(TEST_DATA_DIR + filename, "wt", encoding="utf-8") as f:
        f.write(data)


if __name__ == "__main__":
    test_setUp()

    # 定义模式,为字符串类型的word和时间戳类型的eventTime两个列组成
    schema = StructType([
        StructField("word", StringType(), True),
        StructField("eventTime", TimestampType(), True)])

    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCountWindowedDelay") \
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    lines = spark \
        .readStream \
        .format('csv') \
        .schema(schema) \
        .option("sep", ";") \
        .option("header", "false") \
        .load(TEST_DATA_DIR_SPARK)

    # 定义窗口
    windowDuration = '1 hour'

    windowedCounts = lines \
        .withWatermark("eventTime", "1 hour") \
        .groupBy('word', window('eventTime', windowDuration)) \
        .count()

    query = windowedCounts \
        .writeStream \
        .outputMode("update") \
        .format("console") \
        .option('truncate', 'false') \
        .trigger(processingTime="8 seconds") \
        .start()

    # 写入测试文件file1.csv
    write_to_csv('file1.csv', """
正常;2018-10-01 08:00:00
正常;2018-10-01 08:10:00
正常;2018-10-01 08:20:00
""")

    # 处理当前数据
    query.processAllAvailable()

    # 这时候事件时间更新到上次看到的最大的2018-10-01 08:20:00

    write_to_csv('file2.csv', """
正常;2018-10-01 20:00:00
一小时以内延迟到达;2018-10-01 10:00:00
一小时以内延迟到达;2018-10-01 10:50:00
""")

    # 处理当前数据
    query.processAllAvailable()

    # 这时候事件时间更新到上次看到的最大的2018-10-01 20:00:00

    write_to_csv('file3.csv', """
正常;2018-10-01 20:00:00
一小时外延迟到达;2018-10-01 10:00:00
一小时外延迟到达;2018-10-01 10:50:00
一小时以内延迟到达;2018-10-01 19:00:00
""")

    # 处理当前数据
    query.processAllAvailable()

    query.stop()

    test_tearDown()
cd /usr/local/spark/mycode/structuredstreaming/watermark/
/usr/local/spark/bin/spark-submit spark_ss_test_delay.py

StructuredNetworkWordCountWithMonitor.py

#!/usr/bin/env python3

from pprint import pprint
import time

from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()

    words = lines.select(
        explode(
            split(lines.value, " ")
        ).alias("word")
    )

    wordCounts = words.groupBy("word").count()

    query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .queryName('write_to_console') \
        .trigger(processingTime="8 seconds") \
        .start()

    while True:
        if query.lastProgress:
            if query.lastProgress['numInputRows'] > 0:
                pprint(query.lastProgress)

        pprint(query.status)

        time.sleep(5)
cd /usr/local/hadoop
sbin/start-dfs.sh
nc -lk 9999
cd /usr/local/spark/mycode/structuredstreaming/monitor
/usr/local/spark/bin/spark-submit StructuredNetworkWordCountWithMonitor.py