该版本是原先教程的python版本。
查看前一步骤操作步骤二:数据处理和Python操作Kafka
查看scala版本scala版本:Structured Streaming实时处理数据
《Spark+Kafka构建实时分析Dashboard案例——步骤三:Structured Streaming实时处理数据(python版本)》
开发团队:厦门大学数据库实验室 联系人:林子雨老师ziyulin@xmu.edu.cn
版权声明:版权归厦门大学数据库实验室所有,请勿用于商业用途;未经授权,其他网站请勿转载
本教程介绍大数据课程实验案例“Spark+Kafka构建实时分析Dashboard”的第三个步骤,Structured Streaming实时处理数据。在本篇博客中,将介绍如何利用Structed Streaming实时接收处理Kafka数据以及将处理后的结果发给的Kafka。
所需知识储备
会使用python编写Structured Streaming程序,Kafka原理。
训练技能
编写Structured Streaming程序,熟悉pySpark操作Kafka。
任务清单
- Structured Streaming实时处理Kafka数据;
- 将处理后的结果发送给Kafka;
编程思想
本案例在于实时统计每秒中男女生购物人数,而Structured Streaming接收的数据为1,1,0,2...,其中0代表女性,1代表男性,所以对于2或者null值,则不考虑。其实通过分析,可以发现这个就是典型的wordcount问题,而且是基于Spark流计算。女生的数量,即为0的个数,男生的数量,即为1的个数。
因此利用Structured Streaming的groupBy接口,设置窗口大小为1,滑动步长为1,这样统计出的0和1的个数即为每秒男生女生的人数。
编程实现
配置Spark开发Kafka环境
kafka的安装可以参考Kafka的安装和简单实例测试。下面主要介绍配置Spark开发Kafka环境。首先点击下载,下载Spark连接Kafka的代码库。然后把下载的代码库放到目录/usr/local/spark/jars目录下,命令如下:
sudo mv ~/下载/spark-streaming_2.12-3.2.0.jar /usr/local/spark/jars
sudo mv ~/下载/spark-streaming-kafka-0-10_2.12-3.2.0.jar /usr/local/spark/jars
然后在/usr/local/spark/jars目录下新建kafka目录,把/usr/local/kafka/libs下所有函数库复制到/usr/local/spark/jars/kafka目录下,命令如下
cd /usr/local/spark/jars
mkdir kafka
cd kafka
cp /usr/local/kafka/libs/* .
然后,修改 Spark 配置文件,命令如下
cd /usr/local/spark/conf
sudo vim spark-env.sh
把 Kafka 相关 jar 包的路径信息增加到 spark-env.sh,修改后的 spark-env.sh 类似如下:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoopclasspath):/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*
因为我使用的是anaconda中创建的python环境,所以介绍一下,怎么为spark设置python环境。
需要修改conf目录下的spark_env.sh:在这个文件的开头添加:
export PYSPARK_PYTHON=/home/hadoop/anaconda3/envs/py37/bin/python
#其中,py37应该为自行创建的环境名称
这里的py37是我在anaconda中自行创建的环境,读者在自己实验的时候,要找到你自己创建的环境。
如果后续运行程序时候还出现缺少模块的,请继续参照kafka安装:
conda activate env_name #这里env_name应替换为创建的conda环境名称
conda install kafka
执行上述步骤之后,Spark开发Kafka环境即已配置好,下面介绍如何编码实现。
建立pySpark项目
首先在/usr/local/spark/mycode新建项目目录
cd /usr/local/spark/mycode
mkdir kafka
然后在kafka这个目录下创建一个kafka_test.py文件。
from kafka import KafkaProducer
from pyspark.streaming import StreamingContext
#from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
import json
import sys
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType
from pyspark.sql.functions import col, column, expr
def KafkaWordCount(zkQuorum, group, topics, numThreads):
spark = SparkSession \
.builder \
.appName("KafkaWordCount") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
topicAry = topics.split(",")
# 将topic转换为hashmap形式,而python中字典就是一种hashmap
topicMap = {}
for topic in topicAry:
topicMap[topic] = numThreads
#lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(lambda x : x[1])
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sex") \
.load()
df.selectExpr( "CAST(timestamp AS timestamp)","CAST(value AS STRING)")
#lines = df.selectExpr("CAST(value AS STRING)")
windowedCounts = df \
.withWatermark("timestamp", "1 seconds") \
.groupBy(
window(col("timestamp"), "1 seconds" ,"1 seconds"),
col("value")) \
.count()
wind = windowedCounts.selectExpr( "CAST(value AS STRING)","CAST(count AS STRING)")
query = wind.writeStream.option("checkpointLocation", "/check").outputMode("append").foreach(sendmsg).start()
query.awaitTermination()
query.stop()
# 格式转化,将格式变为[{1: 3}]
def Get_dic(row):
res = []
#for elm in row:
tmp = {row[0]: row[1]}
res.append(tmp)
print(res)
return json.dumps(res)
def sendmsg(row):
print(row)
if row.count != 0:
msg = Get_dic(row)
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send("result", msg.encode('utf8'))
# 很重要,不然不会更新
producer.flush()
if __name__ == '__main__':
# 输入的四个参数分别代表着
# 1.zkQuorum为zookeeper地址
# 2.group为消费者所在的组
# 3.topics该消费者所消费的topics
# 4.numThreads开启消费topic线程的个数
if (len(sys.argv) < 5):
print("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
exit(1)
zkQuorum = sys.argv[1]
group = sys.argv[2]
topics = sys.argv[3]
numThreads = int(sys.argv[4])
print(group, topics)
KafkaWordCount(zkQuorum, group, topics, numThreads)
上述代码注释已经也很清楚了,下面在简要说明下:
1. 首先按每秒的频率读取Kafka消息;
2. 然后对每秒的数据执行wordcount算法,统计出0的个数,1的个数,2的个数;
3. 最后将上述结果封装成json发送给Kafka。
另外,需要注意,上面代码中有一段如下代码:
.option("checkpointLocation", "/check")
这行代码表示把检查点文件写入分布式文件系统HDFS,所以一定要事先启动Hadoop。如果没有启动Hadoop,则后面运行时会出现“拒绝连接”的错误提示。如果你还没有启动Hadoop,则可以现在在Ubuntu终端中,使用如下Shell命令启动Hadoop:
cd /usr/local/hadoop #这是hadoop的安装目录
./sbin/start-dfs.sh
运行项目
编写好程序之后,接下来编写运行脚本,在/usr/local/spark/mycode/kafka目录下新建startup.sh文件,输入如下内容:
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 /usr/local/spark/mycode/kafka/kafka_test.py 127.0.0.1:2181 1 sex 1
其中最后四个为输入参数,含义如下
1. 127.0.0.1:2181为Zookeeper地址
2. 1 为consumer group标签
3. sex为消费者接收的topic
4. 1 为消费者线程数
最后在/usr/local/spark/mycode/kafka目录下,运行如下命令即可执行刚编写好的Structured Streaming程序
sh startup.sh
程序运行成功之后,下面通过步骤二的KafkaProducer和KafkaConsumer来检测程序。
测试程序
下面开启之前编写的KafkaProducer投递消息,然后将KafkaConsumer中接收的topic改为result,验证是否能接收topic为result的消息,更改之后的KafkaConsumer为
from kafka import KafkaConsumer
consumer = KafkaConsumer('result')
for msg in consumer:
print((msg.value).decode('utf8'))
在同时开启Structured Streaming项目,KafkaProducer以及KafkaConsumer之后,可以在KafkaConsumer运行窗口看到如下输出:
到此为止,Structured Streaming程序编写完成,下篇文章将分析如何处理得到的最终结果。