厦门大学林子雨编著《Spark编程基础(Python版,第2版)》教材中的命令行和代码(教材官网)
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Spark编程基础(Python版,第2版)》教材中的所有命令行和代码
代码文件StructuredNetworkWordCount.py
- #!/usr/bin/env python3
- from pyspark.sql import SparkSession
- from pyspark.sql.functions import split
- from pyspark.sql.functions import explode
- if __name__ == "__main__":
- spark = SparkSession \
- .builder \
- .appName("StructuredNetworkWordCount") \
- .getOrCreate()
- spark.sparkContext.setLogLevel('WARN')
- lines = spark \
- .readStream \
- .format("socket") \
- .option("host", "localhost") \
- .option("port", 9999) \
- .load()
- words = lines.select(
- explode(
- split(lines.value, " ")
- ).alias("word")
- )
- wordCounts = words.groupBy("word").count()
- query = wordCounts \
- .writeStream \
- .outputMode("complete") \
- .format("console") \
- .trigger(processingTime="8 seconds") \
- .start()
- query.awaitTermination()
- cd /usr/local/hadoop
- sbin/start-dfs.sh
- nc -lk 9999
- cd /usr/local/spark/mycode/structuredstreaming/
- /usr/local/spark/bin/spark-submit StructuredNetworkWordCount.py
csvDF = spark \
.readStream \
.format("csv") \
.option("seq", ";") \
.load("SOME_DIR")
代码文件spark_ss_filesource_generate.py
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # 导入需要用到的模块
- import os
- import shutil
- import random
- import time
- TEST_DATA_TEMP_DIR = '/tmp/'
- TEST_DATA_DIR = '/tmp/testdata/'
- ACTION_DEF = ['login', 'logout', 'purchase']
- DISTRICT_DEF = ['fujian', 'beijing', 'shanghai', 'guangzhou']
- JSON_LINE_PATTERN = '{{"eventTime": {}, "action": "{}", "district": "{}"}}\n'
- # 测试的环境搭建,判断文件夹是否存在,如果存在则删除旧数据,并建立文件夹
- def test_setUp():
- if os.path.exists(TEST_DATA_DIR):
- shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
- os.mkdir(TEST_DATA_DIR)
- # 测试环境的恢复,对文件夹进行清理
- def test_tearDown():
- if os.path.exists(TEST_DATA_DIR):
- shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
- # 生成测试文件
- def write_and_move(filename, data):
- with open(TEST_DATA_TEMP_DIR + filename,
- "wt", encoding="utf-8") as f:
- f.write(data)
- shutil.move(TEST_DATA_TEMP_DIR + filename,
- TEST_DATA_DIR + filename)
- if __name__ == "__main__":
- test_setUp()
- for i in range(1000):
- filename = 'e-mall-{}.json'.format(i)
- content = ''
- rndcount = list(range(100))
- random.shuffle(rndcount)
- for _ in rndcount:
- content += JSON_LINE_PATTERN.format(
- str(int(time.time())),
- random.choice(ACTION_DEF),
- random.choice(DISTRICT_DEF))
- write_and_move(filename, content)
- time.sleep(1)
- test_tearDown()
代码文件spark_ss_filesource.py
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # 导入需要用到的模块
- import os
- import shutil
- from pprint import pprint
- from pyspark.sql import SparkSession
- from pyspark.sql.functions import window, asc
- from pyspark.sql.types import StructType, StructField
- from pyspark.sql.types import TimestampType, StringType
- # 定义JSON文件的路径常量
- TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'
- if __name__ == "__main__":
- # 定义模式,为时间戳类型的eventTime、字符串类型的操作和省份组成
- schema = StructType([
- StructField("eventTime", TimestampType(), True),
- StructField("action", StringType(), True),
- StructField("district", StringType(), True)])
- spark = SparkSession \
- .builder \
- .appName("StructuredEMallPurchaseCount") \
- .getOrCreate()
- spark.sparkContext.setLogLevel('WARN')
- lines = spark \
- .readStream \
- .format("json") \
- .schema(schema) \
- .option("maxFilesPerTrigger", 100) \
- .load(TEST_DATA_DIR_SPARK)
- # 定义窗口
- windowDuration = '1 minutes'
- windowedCounts = lines \
- .filter("action = 'purchase'") \
- .groupBy('district', window('eventTime', windowDuration)) \
- .count() \
- .sort(asc('window'))
- query = windowedCounts \
- .writeStream \
- .outputMode("complete") \
- .format("console") \
- .option('truncate', 'false') \
- .trigger(processingTime="10 seconds") \
- .start()
- query.awaitTermination()
- cd /usr/local/hadoop
- sbin/start-dfs.sh
- cd /usr/local/spark/mycode/structuredstreaming/file
- python3 spark_ss_filesource_generate.py
- cd /usr/local/spark/mycode/structuredstreaming/file
- /usr/local/spark/bin/spark-submit spark_ss_filesource.py
- cd /usr/local/kafka
- bin/zookeeper-server-start.sh config/zookeeper.properties
- cd /usr/local/kafka
- bin/kafka-server-start.sh config/server.properties
- cd /usr/local/kafka
- bin/kafka-console-consumer.sh \
- > --bootstrap-server localhost:9092 --topic wordcount-topic
- cd /usr/local/kafka
- bin/kafka-console-consumer.sh \
- > --bootstrap-server localhost:9092 --topic wordcount-result-topic
代码文件/spark_ss_kafka_producer.py
- #!/usr/bin/env python3
- import string
- import random
- import time
- from kafka import KafkaProducer
- if __name__ == "__main__":
- producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
- while True:
- s2 = (random.choice(string.ascii_lowercase) for _ in range(2))
- word = ''.join(s2)
- value = bytearray(word, 'utf-8')
- producer.send('wordcount-topic', value=value) \
- .get(timeout=10)
- time.sleep(0.1)
- sudo apt-get install pip3
- sudo pip3 install kafka-python
- cd /usr/local/spark/mycode/structuredstreaming/kafka/
- python3 spark_ss_kafka_producer.py
代码文件spark_ss_kafka_consumer.py
- #!/usr/bin/env python3
- from pyspark.sql import SparkSession
- if __name__ == "__main__":
- spark = SparkSession \
- .builder \
- .appName("StructuredKafkaWordCount") \
- .getOrCreate()
- spark.sparkContext.setLogLevel('WARN')
- lines = spark \
- .readStream \
- .format("kafka") \
- .option("kafka.bootstrap.servers", "localhost:9092") \
- .option("subscribe", 'wordcount-topic') \
- .load() \
- .selectExpr("CAST(value AS STRING)")
- wordCounts = lines.groupBy("value").count()
- query = wordCounts \
- .selectExpr("CAST(value AS STRING) as key", "CONCAT(CAST(value AS STRING), ':', CAST(count AS STRING)) as value") \
- .writeStream \
- .outputMode("complete") \
- .format("kafka") \
- .option("kafka.bootstrap.servers", "localhost:9092") \
- .option("topic", "wordcount-result-topic") \
- .option("checkpointLocation", "file:///tmp/kafka-sink-cp") \
- .trigger(processingTime="8 seconds") \
- .start()
- query.awaitTermination()
- cd /usr/local/spark/mycode/structuredstreaming/kafka/
- /usr/local/spark/bin/spark-submit \
- > --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 \
- > spark_ss_kafka_consumer.py
代码文件spark_ss_rate.py
- #!/usr/bin/env python3
- from pyspark.sql import SparkSession
- if __name__ == "__main__":
- spark = SparkSession \
- .builder \
- .appName("TestRateStreamSource") \
- .getOrCreate()
- spark.sparkContext.setLogLevel('WARN')
- lines = spark \
- .readStream \
- .format("rate") \
- .option('rowsPerSecond', 5) \
- .load()
- print(lines.schema)
- query = lines \
- .writeStream \
- .outputMode("update") \
- .format("console") \
- .option('truncate', 'false') \
- .start()
- query.awaitTermination()
代码文件StructuredNetworkWordCountFileSink.py
- #!/usr/bin/env python3
- from pyspark.sql import SparkSession
- from pyspark.sql.functions import split
- from pyspark.sql.functions import explode
- from pyspark.sql.functions import length
- if __name__ == "__main__":
- spark = SparkSession \
- .builder \
- .appName("StructuredNetworkWordCountFileSink") \
- .getOrCreate()
- spark.sparkContext.setLogLevel('WARN')
- lines = spark \
- .readStream \
- .format("socket") \
- .option("host", "localhost") \
- .option("port", 9999) \
- .load()
- words = lines.select(
- explode(
- split(lines.value, " ")
- ).alias("word")
- )
- all_length_5_words = words.filter(length("word") == 5)
- query = all_length_5_words \
- .writeStream \
- .outputMode("append") \
- .format("parquet") \
- .option("path", "file:///tmp/filesink") \
- .option("checkpointLocation", "file:///tmp/file-sink-cp") \
- .trigger(processingTime="8 seconds") \
- .start()
- query.awaitTermination()
- nc -lk 9999
- cd /usr/local/spark/mycode/structuredstreaming
- /usr/local/spark/bin/spark-submit StructuredNetworkWordCountFileSink.py
- cd /tmp/filesink
- ls
代码文件spark_ss_test_delay.py
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # 导入需要用到的模块
- import os
- import shutil
- from functools import partial
- from pyspark.sql import SparkSession
- from pyspark.sql.functions import window
- from pyspark.sql.types import StructType, StructField
- from pyspark.sql.types import TimestampType, StringType
- # 定义CSV文件的路径常量
- TEST_DATA_DIR = '/tmp/testdata/'
- TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'
- # 搭建测试环境,判断CSV文件夹是否存在,如果存在则删除旧数据,并建立文件夹
- def test_setUp():
- if os.path.exists(TEST_DATA_DIR):
- shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
- os.mkdir(TEST_DATA_DIR)
- # 恢复测试环境,对CSV文件夹进行清理
- def test_tearDown():
- if os.path.exists(TEST_DATA_DIR):
- shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
- # 编写模拟输入的函数,传入CSV文件名和数据。注意写入应当是原子性的
- # 如果写入时间较长,应当先写入临时文件再移动到CSV目录内
- # 这里采取直接写入的方式
- def write_to_csv(filename, data):
- with open(TEST_DATA_DIR + filename, "wt", encoding="utf-8") as f:
- f.write(data)
- if __name__ == "__main__":
- test_setUp()
- # 定义模式,该模式为字符串类型的word和时间戳类型的eventTime两个列组成
- schema = StructType([
- StructField("word", StringType(), True),
- StructField("eventTime", TimestampType(), True)])
- spark = SparkSession \
- .builder \
- .appName("StructuredNetworkWordCountWindowedDelay") \
- .getOrCreate()
- spark.sparkContext.setLogLevel('WARN')
- lines = spark \
- .readStream \
- .format('csv') \
- .schema(schema) \
- .option("sep", ";") \
- .option("header", "false") \
- .load(TEST_DATA_DIR_SPARK)
- # 定义窗口
- windowDuration = '1 hour'
- windowedCounts = lines \
- .withWatermark("eventTime", "1 hour") \
- .groupBy('word', window('eventTime', windowDuration)) \
- .count()
- query = windowedCounts \
- .writeStream \
- .outputMode("update") \
- .format("console") \
- .option('truncate', 'false') \
- .trigger(processingTime="8 seconds") \
- .start()
- # 写入测试文件file1.csv
- write_to_csv('file1.csv', """
- 正常;2023-10-01 08:00:00
- 正常;2023-10-01 08:10:00
- 正常;2023-10-01 08:20:00
- """)
- # 处理当前数据
- query.processAllAvailable()
- # 此时事件时间更新到上次看到的最大的2023-10-01 08:20:00
- write_to_csv('file2.csv', """
- 正常;2023-10-01 20:00:00
- 一小时以内延迟到达;2023-10-01 10:00:00
- 一小时以内延迟到达;2023-10-01 10:50:00
- """)
- # 处理当前数据
- query.processAllAvailable()
- # 此时事件时间更新到上次看到的最大的2023-10-01 20:00:00
- write_to_csv('file3.csv', """
- 正常;2023-10-01 20:00:00
- 一小时以外延迟到达;2023-10-01 10:00:00
- 一小时以外延迟到达;2023-10-01 10:50:00
- 一小时以内延迟到达;2023-10-01 19:00:00
- """)
- # 处理当前数据
- query.processAllAvailable()
- query.stop()
- test_tearDown()
- cd /usr/local/spark/mycode/structuredstreaming/watermark/
- /usr/local/spark/bin/spark-submit spark_ss_test_delay.py
代码文件StructuredNetworkWordCountWithMonitor.py
- #!/usr/bin/env python3
- from pprint import pprint
- import time
- from pyspark.sql import SparkSession
- from pyspark.sql.functions import split
- from pyspark.sql.functions import explode
- if __name__ == "__main__":
- spark = SparkSession \
- .builder \
- .appName("StructuredNetworkWordCount") \
- .getOrCreate()
- spark.sparkContext.setLogLevel('WARN')
- lines = spark \
- .readStream \
- .format("socket") \
- .option("host", "localhost") \
- .option("port", 9999) \
- .load()
- words = lines.select(
- explode(
- split(lines.value, " ")
- ).alias("word")
- )
- wordCounts = words.groupBy("word").count()
- query = wordCounts \
- .writeStream \
- .outputMode("complete") \
- .format("console") \
- .queryName('write_to_console') \
- .trigger(processingTime="8 seconds") \
- .start()
- while True:
- if query.lastProgress:
- if query.lastProgress['numInputRows'] > 0:
- pprint(query.lastProgress)
- pprint(query.status)
- time.sleep(5)
- cd /usr/local/hadoop
- sbin/start-dfs.sh
- nc -lk 9999
- cd /usr/local/spark/mycode/structuredstreaming/monitor
- /usr/local/spark/bin/spark-submit StructuredNetworkWordCountWithMonitor.py