Spark2.0入门:Structured Streaming操作网络流

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]

在Spark1.6版本,DStream是Spark Streaming的编程模型,而Spark2.0将流计算也统一到DataFrame里去了,提出了Structured Streaming编程模型;将数据源映射为类似于关系数据库中的表,然后将经过计算得到的结果映射为另一张表,完全以结构化的方式去操作流式数据,这种编程模型非常有利于处理分析结构化的数据;

Structured Streaming操作概述

由前面的章节《Spark运行架构》和《Structured Streaming简介》,我们已经了解了Spark整体的运行流程以及Structured Streaming的编程模型;Structured Streaming用DataFrame代替RDD,整体编程模型和RDD类似,每隔一段时间间隔,对输入的实时数据进行处理分析;二者最主要的区别在于,RDD的实时数据为分布式对象的集合,而DataFrame的实时数据为Row对象集合(每个Row对象代表一行记录),提供了详细的结构信息,也就是我们常说的模式,Spark可以清楚知道该数据集中包含哪些列,每列的名称和类型。

Structured Streaming程序基本步骤

编写Structured Streaming程序的基本步骤是:
1.创建SparkSession实例;
2.创建DataFrame表示从数据源输入的每一行数据;
3.DataFrame转换,类似于RDD转换操作;
4.创建StreamingQuery开启流查询;
5.调用StreamingQuery.awaitTermination()方法,等待流查询结束。

Structured Streaming操作示例

这里还是用统计单词个数的示例,来展示如何进行Structured Streaming操作;

获取SparkSession

要操作Structured Streaming,首先要获取SparkSession实例,我们可以如下创建一个SparkSession;登录Linux系统,启动spark-shell。然后输入一下代码:

scala> import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.SparkSession
val spark = SparkSession.
     | builder.
     | appName("StructuredNetworkWordCount").
     | getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d1bf7bf

scala> import spark.implicits._

首先引入两个包,接着实例化一个SparkSession对象;appName方法设置spark应用名称,getOrCreate方法表示如果已存在一个SparkSession,则直接用现成的SparkSession,否则创建一个SparkSession,其实spark就是已经存在的SparkSession,因为每次开启spark-shell时,就默认开启了一个SparkSession;千万别忘了引入spark.implicits._包,否则会报错。

创建DataFrame

创建好SparkSession后,即可用SparkSession.readStream方法创建DataFrame,在spark-shell输入一下代码:

scala> val lines = spark.readStream.
     | format("socket").
     | option("host","localhost").
     | option("port",9999).
     | load()
lines: org.apache.spark.sql.DataFrame = [value: string]

SparkSession.readStream方法返回一个DataStreamReader实例,接着DataStreamReader.format设置数据源为网络套接字,这里还可以设置从文件获取数据;然后调用两次DataStreamReader.option分别设置套接字的主机和端口;最后调用DataStreamReader.load方法,返回一个DataFrame实例;

此时,lines可以理解为之前Spark2.0入门:Structured Streaming简介提到的"unbound table",实时到来的数据,即添加到这个"unbound table";由输出可以看出,这个DataFrame每行包含一个字符串;

DataFrame转换

获取lines这个DataFrame之后,接下来就要处理这个DataFrame;类似于RDD转换操作,DataFrame也是通过转换成新的DataFrame来处理数据;因此由之前lines获取到每行数据之后,接下来要进行分隔并进行统计;

scala> val words = lines.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val wordCounts = words.groupBy("value").count()
wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

lines.as[String]将DataFrame转换成DataSet,其实DataFrame只是DataSet的特例,在官方的API文档上有如下声明

type DataFrame = Dataset[Row]

接着调用flatMap将一行实时数据按空格切割成单词集合;groupBy方法表示按"value"这个属性合并,count方法表示统计出每个单词出现的次数;此时的wordCounts又转换为DataFrame,且这个DataFrame有两个属性,value和count,即每次处理完一行实时数据时,都会输出单词和该单词出现的次数;

执行流查询

如果DataFrame转换操作定义结束,接下来即可开启流查询,在spark-shell输入如下:

scala> val query = wordCounts.writeStream.
     | outputMode("complete").
     | format("console").
     | start()
query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - query-0 [state = ACTIVE]

scala> query.awaitTermination()

wordCounts.writeStream返回一个 DataStreamWriter实例,该实例定义了将实时流查询产生结果输出到外部存储的接口;outputMode设置了'complete'模式,即每次都输出全部结果数据;format定义输出媒介,这里为控制台;最后调用start方法开启流查询并返回一个StreamingQuery实例;

最后调用StreamingQuery.awaitTermination等待查询结束;

流查询结果:

在开启查询之前,需要先在另外一个终端上开启一个NetCat简单服务程序,用于向spark流查询产生数据;在终端输入如下即可:

nc -lk 9999

表示监听本地9999端口,这样在该终端上输入数据,即可传输给spark流查询;我们先输入简单字符串"hello spark":

hadoop@charles-Aspire-4741:/usr/local/spark$ nc -lk 9999
hello spark

即可在之前开启spark流查询的端口看到如下结果:

scala> query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+                                                                   
|value|count|
+-----+-----+
|hello|    1|
|spark|    1|
+-----+-----+

我们就得到了每个单词以及次数;当我们在NetCat终端再输入一行字符串"hello apache",得到如下结果:

scala> query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+                                                                   
|value|count|
+-----+-----+
|hello|    1|
|spark|    1|
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+                                                                  
| value|count|
+------+-----+
| hello|    2|
|apache|    1|
| spark|    1|
+------+-----+

我们可以看到,每处理一行数据,即输出一个结果;因此当数据源传来源源不断的实时数据时,Structured streaming可以按固定时间间隔读取若干行数据,并执行流查询,输出结果;因为这个示例程序用的是complete模式,因此每次都将结果全部输出。当然也可以设置成append(增量)模式,这样每次输出即为新增的结果行;

编写独立应用

之前是在spark-shell中一行一行的执行代码,现在我们可以把程序写成单独一个scala文件,然后提交给spark执行;首先打开一个终端,输入以下命令:

cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir -p src/main/scala
cd src/main/scala
vim TestStructuredStreaming.scala

这样就用vim打开一个scala文件,在该文件输入以下内容:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object WordCountStructuredStreaming{
        def main(args: Array[String]){
                val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
                import spark.implicits._
                val lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load()
                val words = lines.as[String].flatMap(_.split(" "))
                val wordCounts = words.groupBy("value").count()
                val query = wordCounts.writeStream.outputMode("complete").format("console").start()
                query.awaitTermination()
        }
}

代码写好之后,退出终端,然后在/usr/lcoal/spark/mycode/streaming目录下创建simple.sbt文件:

cd /usr/local/spark/mycode/streaming
vim simple.sbt # 注意,是simple.sbt,不是simple.txt

打开vim编辑器以后,输入以下内容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"

然后就可以执行sbt打包编译了:

cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package

打包成功以后,就可以输入以下命令启动这个程序:

cd /usr/local/spark/mycode/streaming
/usr/local/spark/bin/spark-submit --class "WordCountStructuredStreaming" ./target/scala-2.11/simple-project_2.11-1.0.jar 

执行上输出程序之后,就开启了监听状态,当我们在NetCat终端输入"hello world"之后,spark应用终端即可输出"hello"和"world"单词出现的次数,和spark-shell输出一致;

以上就是Structured Streaming操作网络流所有内容.
子雨大数据之Spark入门
扫一扫访问本博客