代码-第8章 Structured Streaming-林子雨编著《Spark编程基础(Python版,第2版)》

大数据学习路线图

厦门大学林子雨编著《Spark编程基础(Python版,第2版)》教材中的命令行和代码(教材官网
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Spark编程基础(Python版,第2版)》教材中的所有命令行和代码

代码文件StructuredNetworkWordCount.py

  1. #!/usr/bin/env python3
  2.  
  3. from pyspark.sql import SparkSession
  4. from pyspark.sql.functions import split
  5. from pyspark.sql.functions import explode
  6.  
  7. if __name__ == "__main__":
  8. spark = SparkSession \
  9. .builder \
  10. .appName("StructuredNetworkWordCount") \
  11. .getOrCreate()
  12.  
  13. spark.sparkContext.setLogLevel('WARN')
  14.  
  15. lines = spark \
  16. .readStream \
  17. .format("socket") \
  18. .option("host", "localhost") \
  19. .option("port", 9999) \
  20. .load()
  21.  
  22. words = lines.select(
  23. explode(
  24. split(lines.value, " ")
  25. ).alias("word")
  26. )
  27.  
  28. wordCounts = words.groupBy("word").count()
  29.  
  30. query = wordCounts \
  31. .writeStream \
  32. .outputMode("complete") \
  33. .format("console") \
  34. .trigger(processingTime="8 seconds") \
  35. .start()
  36.  
  37. query.awaitTermination()
language-python
  1. cd /usr/local/hadoop
  2. sbin/start-dfs.sh
language-shell
  1. nc -lk 9999
language-shell
  1. cd /usr/local/spark/mycode/structuredstreaming/
  2. /usr/local/spark/bin/spark-submit StructuredNetworkWordCount.py
language-shell
csvDF = spark \
    .readStream \ 
    .format("csv") \
    .option("seq", ";") \
    .load("SOME_DIR")

代码文件spark_ss_filesource_generate.py

  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3.  
  4. # 导入需要用到的模块
  5. import os
  6. import shutil
  7. import random
  8. import time
  9.  
  10. TEST_DATA_TEMP_DIR = '/tmp/'
  11. TEST_DATA_DIR = '/tmp/testdata/'
  12.  
  13. ACTION_DEF = ['login', 'logout', 'purchase']
  14. DISTRICT_DEF = ['fujian', 'beijing', 'shanghai', 'guangzhou']
  15. JSON_LINE_PATTERN = '{{"eventTime": {}, "action": "{}", "district": "{}"}}\n'
  16.  
  17. # 测试的环境搭建,判断文件夹是否存在,如果存在则删除旧数据,并建立文件夹
  18. def test_setUp():
  19. if os.path.exists(TEST_DATA_DIR):
  20. shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
  21. os.mkdir(TEST_DATA_DIR)
  22.  
  23. # 测试环境的恢复,对文件夹进行清理
  24. def test_tearDown():
  25. if os.path.exists(TEST_DATA_DIR):
  26. shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
  27.  
  28. # 生成测试文件
  29. def write_and_move(filename, data):
  30. with open(TEST_DATA_TEMP_DIR + filename,
  31. "wt", encoding="utf-8") as f:
  32. f.write(data)
  33.  
  34. shutil.move(TEST_DATA_TEMP_DIR + filename,
  35. TEST_DATA_DIR + filename)
  36.  
  37. if __name__ == "__main__":
  38. test_setUp()
  39.  
  40. for i in range(1000):
  41. filename = 'e-mall-{}.json'.format(i)
  42.  
  43. content = ''
  44. rndcount = list(range(100))
  45. random.shuffle(rndcount)
  46. for _ in rndcount:
  47. content += JSON_LINE_PATTERN.format(
  48. str(int(time.time())),
  49. random.choice(ACTION_DEF),
  50. random.choice(DISTRICT_DEF))
  51. write_and_move(filename, content)
  52.  
  53. time.sleep(1)
  54.  
  55. test_tearDown()
language-python

代码文件spark_ss_filesource.py

  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3.  
  4. # 导入需要用到的模块
  5. import os
  6. import shutil
  7. from pprint import pprint
  8.  
  9. from pyspark.sql import SparkSession
  10. from pyspark.sql.functions import window, asc
  11. from pyspark.sql.types import StructType, StructField
  12. from pyspark.sql.types import TimestampType, StringType
  13.  
  14. # 定义JSON文件的路径常量
  15. TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'
  16.  
  17. if __name__ == "__main__":
  18. # 定义模式,为时间戳类型的eventTime、字符串类型的操作和省份组成
  19. schema = StructType([
  20. StructField("eventTime", TimestampType(), True),
  21. StructField("action", StringType(), True),
  22. StructField("district", StringType(), True)])
  23.  
  24. spark = SparkSession \
  25. .builder \
  26. .appName("StructuredEMallPurchaseCount") \
  27. .getOrCreate()
  28.  
  29. spark.sparkContext.setLogLevel('WARN')
  30.  
  31. lines = spark \
  32. .readStream \
  33. .format("json") \
  34. .schema(schema) \
  35. .option("maxFilesPerTrigger", 100) \
  36. .load(TEST_DATA_DIR_SPARK)
  37.  
  38. # 定义窗口
  39. windowDuration = '1 minutes'
  40.  
  41. windowedCounts = lines \
  42. .filter("action = 'purchase'") \
  43. .groupBy('district', window('eventTime', windowDuration)) \
  44. .count() \
  45. .sort(asc('window'))
  46.  
  47. query = windowedCounts \
  48. .writeStream \
  49. .outputMode("complete") \
  50. .format("console") \
  51. .option('truncate', 'false') \
  52. .trigger(processingTime="10 seconds") \
  53. .start()
  54.  
  55. query.awaitTermination()
language-python
  1. cd /usr/local/hadoop
  2. sbin/start-dfs.sh
language-shell
  1. cd /usr/local/spark/mycode/structuredstreaming/file
  2. python3 spark_ss_filesource_generate.py
language-shell
  1. cd /usr/local/spark/mycode/structuredstreaming/file
  2. /usr/local/spark/bin/spark-submit spark_ss_filesource.py
language-shell
  1. cd /usr/local/kafka
  2. bin/zookeeper-server-start.sh config/zookeeper.properties
language-shell
  1. cd /usr/local/kafka
  2. bin/kafka-server-start.sh config/server.properties
language-shell
  1. cd /usr/local/kafka
  2. bin/kafka-console-consumer.sh \
  3. > --bootstrap-server localhost:9092 --topic wordcount-topic
language-shell
  1. cd /usr/local/kafka
  2. bin/kafka-console-consumer.sh \
  3. > --bootstrap-server localhost:9092 --topic wordcount-result-topic
language-shell

代码文件/spark_ss_kafka_producer.py

  1. #!/usr/bin/env python3
  2.  
  3. import string
  4. import random
  5. import time
  6.  
  7. from kafka import KafkaProducer
  8.  
  9. if __name__ == "__main__":
  10. producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
  11.  
  12. while True:
  13. s2 = (random.choice(string.ascii_lowercase) for _ in range(2))
  14. word = ''.join(s2)
  15. value = bytearray(word, 'utf-8')
  16.  
  17. producer.send('wordcount-topic', value=value) \
  18. .get(timeout=10)
  19.  
  20. time.sleep(0.1)
language-shell
  1. sudo apt-get install pip3
language-shell
  1. sudo pip3 install kafka-python
language-shell
  1. cd /usr/local/spark/mycode/structuredstreaming/kafka/
  2. python3 spark_ss_kafka_producer.py
language-shell

代码文件spark_ss_kafka_consumer.py

  1. #!/usr/bin/env python3
  2.  
  3. from pyspark.sql import SparkSession
  4.  
  5. if __name__ == "__main__":
  6. spark = SparkSession \
  7. .builder \
  8. .appName("StructuredKafkaWordCount") \
  9. .getOrCreate()
  10.  
  11. spark.sparkContext.setLogLevel('WARN')
  12.  
  13. lines = spark \
  14. .readStream \
  15. .format("kafka") \
  16. .option("kafka.bootstrap.servers", "localhost:9092") \
  17. .option("subscribe", 'wordcount-topic') \
  18. .load() \
  19. .selectExpr("CAST(value AS STRING)")
  20.  
  21. wordCounts = lines.groupBy("value").count()
  22.  
  23. query = wordCounts \
  24. .selectExpr("CAST(value AS STRING) as key", "CONCAT(CAST(value AS STRING), ':', CAST(count AS STRING)) as value") \
  25. .writeStream \
  26. .outputMode("complete") \
  27. .format("kafka") \
  28. .option("kafka.bootstrap.servers", "localhost:9092") \
  29. .option("topic", "wordcount-result-topic") \
  30. .option("checkpointLocation", "file:///tmp/kafka-sink-cp") \
  31. .trigger(processingTime="8 seconds") \
  32. .start()
  33.  
  34. query.awaitTermination()
language-shell
  1. cd /usr/local/spark/mycode/structuredstreaming/kafka/
  2. /usr/local/spark/bin/spark-submit \
  3. > --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 \
  4. > spark_ss_kafka_consumer.py
language-shell

代码文件spark_ss_rate.py

  1. #!/usr/bin/env python3
  2.  
  3. from pyspark.sql import SparkSession
  4.  
  5. if __name__ == "__main__":
  6. spark = SparkSession \
  7. .builder \
  8. .appName("TestRateStreamSource") \
  9. .getOrCreate()
  10.  
  11. spark.sparkContext.setLogLevel('WARN')
  12.  
  13. lines = spark \
  14. .readStream \
  15. .format("rate") \
  16. .option('rowsPerSecond', 5) \
  17. .load()
  18.  
  19. print(lines.schema)
  20.  
  21. query = lines \
  22. .writeStream \
  23. .outputMode("update") \
  24. .format("console") \
  25. .option('truncate', 'false') \
  26. .start()
  27.  
  28. query.awaitTermination()
language-python

代码文件StructuredNetworkWordCountFileSink.py

  1. #!/usr/bin/env python3
  2.  
  3. from pyspark.sql import SparkSession
  4. from pyspark.sql.functions import split
  5. from pyspark.sql.functions import explode
  6. from pyspark.sql.functions import length
  7.  
  8. if __name__ == "__main__":
  9. spark = SparkSession \
  10. .builder \
  11. .appName("StructuredNetworkWordCountFileSink") \
  12. .getOrCreate()
  13.  
  14. spark.sparkContext.setLogLevel('WARN')
  15.  
  16. lines = spark \
  17. .readStream \
  18. .format("socket") \
  19. .option("host", "localhost") \
  20. .option("port", 9999) \
  21. .load()
  22.  
  23. words = lines.select(
  24. explode(
  25. split(lines.value, " ")
  26. ).alias("word")
  27. )
  28.  
  29. all_length_5_words = words.filter(length("word") == 5)
  30.  
  31. query = all_length_5_words \
  32. .writeStream \
  33. .outputMode("append") \
  34. .format("parquet") \
  35. .option("path", "file:///tmp/filesink") \
  36. .option("checkpointLocation", "file:///tmp/file-sink-cp") \
  37. .trigger(processingTime="8 seconds") \
  38. .start()
  39.  
  40. query.awaitTermination()
language-python
  1. nc -lk 9999
language-shell
  1. cd /usr/local/spark/mycode/structuredstreaming
  2. /usr/local/spark/bin/spark-submit StructuredNetworkWordCountFileSink.py
language-shell
  1. cd /tmp/filesink
  2. ls
language-shell

代码文件spark_ss_test_delay.py

  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3.  
  4. # 导入需要用到的模块
  5. import os
  6. import shutil
  7. from functools import partial
  8.  
  9. from pyspark.sql import SparkSession
  10. from pyspark.sql.functions import window
  11. from pyspark.sql.types import StructType, StructField
  12. from pyspark.sql.types import TimestampType, StringType
  13.  
  14. # 定义CSV文件的路径常量
  15. TEST_DATA_DIR = '/tmp/testdata/'
  16. TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'
  17.  
  18. # 搭建测试环境,判断CSV文件夹是否存在,如果存在则删除旧数据,并建立文件夹
  19. def test_setUp():
  20. if os.path.exists(TEST_DATA_DIR):
  21. shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
  22. os.mkdir(TEST_DATA_DIR)
  23.  
  24. # 恢复测试环境,对CSV文件夹进行清理
  25. def test_tearDown():
  26. if os.path.exists(TEST_DATA_DIR):
  27. shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
  28.  
  29. # 编写模拟输入的函数,传入CSV文件名和数据。注意写入应当是原子性的
  30. # 如果写入时间较长,应当先写入临时文件再移动到CSV目录内
  31. # 这里采取直接写入的方式
  32. def write_to_csv(filename, data):
  33. with open(TEST_DATA_DIR + filename, "wt", encoding="utf-8") as f:
  34. f.write(data)
  35.  
  36. if __name__ == "__main__":
  37. test_setUp()
  38.  
  39. # 定义模式,该模式为字符串类型的word和时间戳类型的eventTime两个列组成
  40. schema = StructType([
  41. StructField("word", StringType(), True),
  42. StructField("eventTime", TimestampType(), True)])
  43.  
  44. spark = SparkSession \
  45. .builder \
  46. .appName("StructuredNetworkWordCountWindowedDelay") \
  47. .getOrCreate()
  48.  
  49. spark.sparkContext.setLogLevel('WARN')
  50.  
  51. lines = spark \
  52. .readStream \
  53. .format('csv') \
  54. .schema(schema) \
  55. .option("sep", ";") \
  56. .option("header", "false") \
  57. .load(TEST_DATA_DIR_SPARK)
  58.  
  59. # 定义窗口
  60. windowDuration = '1 hour'
  61.  
  62. windowedCounts = lines \
  63. .withWatermark("eventTime", "1 hour") \
  64. .groupBy('word', window('eventTime', windowDuration)) \
  65. .count()
  66.  
  67. query = windowedCounts \
  68. .writeStream \
  69. .outputMode("update") \
  70. .format("console") \
  71. .option('truncate', 'false') \
  72. .trigger(processingTime="8 seconds") \
  73. .start()
  74.  
  75. # 写入测试文件file1.csv
  76. write_to_csv('file1.csv', """
  77. 正常;2023-10-01 08:00:00
  78. 正常;2023-10-01 08:10:00
  79. 正常;2023-10-01 08:20:00
  80. """)
  81.  
  82. # 处理当前数据
  83. query.processAllAvailable()
  84.  
  85. # 此时事件时间更新到上次看到的最大的2023-10-01 08:20:00
  86.  
  87. write_to_csv('file2.csv', """
  88. 正常;2023-10-01 20:00:00
  89. 一小时以内延迟到达;2023-10-01 10:00:00
  90. 一小时以内延迟到达;2023-10-01 10:50:00
  91. """)
  92.  
  93. # 处理当前数据
  94. query.processAllAvailable()
  95.  
  96. # 此时事件时间更新到上次看到的最大的2023-10-01 20:00:00
  97.  
  98. write_to_csv('file3.csv', """
  99. 正常;2023-10-01 20:00:00
  100. 一小时以外延迟到达;2023-10-01 10:00:00
  101. 一小时以外延迟到达;2023-10-01 10:50:00
  102. 一小时以内延迟到达;2023-10-01 19:00:00
  103. """)
  104.  
  105. # 处理当前数据
  106. query.processAllAvailable()
  107.  
  108. query.stop()
  109.  
  110. test_tearDown()
language-python
  1. cd /usr/local/spark/mycode/structuredstreaming/watermark/
  2. /usr/local/spark/bin/spark-submit spark_ss_test_delay.py
language-shell

代码文件StructuredNetworkWordCountWithMonitor.py

  1. #!/usr/bin/env python3
  2.  
  3. from pprint import pprint
  4. import time
  5.  
  6. from pyspark.sql import SparkSession
  7. from pyspark.sql.functions import split
  8. from pyspark.sql.functions import explode
  9.  
  10. if __name__ == "__main__":
  11. spark = SparkSession \
  12. .builder \
  13. .appName("StructuredNetworkWordCount") \
  14. .getOrCreate()
  15.  
  16. spark.sparkContext.setLogLevel('WARN')
  17.  
  18. lines = spark \
  19. .readStream \
  20. .format("socket") \
  21. .option("host", "localhost") \
  22. .option("port", 9999) \
  23. .load()
  24.  
  25. words = lines.select(
  26. explode(
  27. split(lines.value, " ")
  28. ).alias("word")
  29. )
  30.  
  31. wordCounts = words.groupBy("word").count()
  32.  
  33. query = wordCounts \
  34. .writeStream \
  35. .outputMode("complete") \
  36. .format("console") \
  37. .queryName('write_to_console') \
  38. .trigger(processingTime="8 seconds") \
  39. .start()
  40.  
  41. while True:
  42. if query.lastProgress:
  43. if query.lastProgress['numInputRows'] > 0:
  44. pprint(query.lastProgress)
  45.  
  46. pprint(query.status)
  47.  
  48. time.sleep(5)
language-python
  1. cd /usr/local/hadoop
  2. sbin/start-dfs.sh
language-shell
  1. nc -lk 9999
language-shell
  1. cd /usr/local/spark/mycode/structuredstreaming/monitor
  2. /usr/local/spark/bin/spark-submit StructuredNetworkWordCountWithMonitor.py
language-shell