Spark2.0入门:Structured Streaming操作文件流

大数据技术原理与应用

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]

Structured Streaming目前的支持的数据源有两种,一种是文件,另一种是网络套接字;Spark2.0入门:Structured Streaming操作概述这篇教程已经分析了如何从网络套接字读取并分析数据。因此,这篇文章来分析下,如何从文件流读取数据进行Structured Streaming。

基于文件数据源的Structured Streaming

类似于创建静态的DataFrame,我们可以通过设置data format,schema,option等等来创建流式DataFrame;对于网络套接字,客户端可以产生实时数据,并通过网络传输给spark进行实时处理;那么对于文件是如何进行实时产生数据了?我们可以设想这样一个场景,我们的spark应用监听着计算机上一个目录,而web服务在这个目录上实时产生日志文件,这样对于spark应用来说,日志文件就是实时数据;

下面用一个例子来解释如何进行基于文件数据源的Structured Streaming。

在开始编写spark应用之前,需要先创建一个目录,用于存放处理数据文件;先cd到/usr/local/spark目录,然后创建resource目录,命令如下:

cd /usr/local/spark
mkdir resource

Structured Streaming支持的文件类型有text,csv,json,parquet;这篇文章分析如何从json文件读取分析数据;我们先在resource目录下新建一个json文件,定义数据类型,如下:

cd resource
vim people.json
//在people.json文件输入如下数据:
{"name":"json","age":23,"hobby":"running"}
{"name":"charles","age":32,"hobby":"basketball"}
{"name":"tom","age":28,"hobby":"football"}
{"name":"lili","age":24,"hobby":"running"}
{"name":"bob","age":20,"hobby":"swimming"}

这个示例数据主要描述一个人的姓名,年龄和爱好;示例程序用于统计年龄小于25岁的people爱好排行榜。接下来就开始Structured Streaming编程;

创建DataFrame

因为输入的数据是结构化数据,因此在创建DataFrame时,需要先定义好schema,这样spark程序才知道如何解析json数据;定义模式如下:

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val userSchema = new StructType().add("name","string").add("age","integer").add("hobby","string")
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(hobby,StringType,true))

这个userSchema有三个属性,姓名,年龄和爱好;接下来就可以创建DataFrame:

scala> val userDF=spark.readStream.schema(userSchema).json("./resource/")
userDF: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

我们通过spark.readStream方法返回DataStreamReader实例,通过DataStreamReader实例的方法schema和json分别定义了数据源的模式和目录;

DataFrame转换操作

创建好初始DataFrame之后,接下来可以定义DataFrame转换操作;因为这个示例用于统计年龄小于25岁的people爱好排行榜,因此先过滤大于等于25岁的people,然后合并爱好,并统计爱好的次数,代码如下:

scala>  val userlittler25DF=userDF.filter($"age"<25)
userlittler25DF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, age: int ... 1 more field]

scala> val hobbyDF = userlittler25DF.groupBy("hobby").count()
hobbyDF: org.apache.spark.sql.DataFrame = [hobby: string, count: bigint]

根据业务需求,我们定义DataFrame,并通过转换成新的DataFrame,完成流查询;这时还没执行真正的计算查询,而是定义好流计算执行过程,后续调用start方法才真正开启流计算查询;

开启流查询

ok,到这里,我们就可以开启查询过程,代码如下:

scala> val query = hobbyDF.writeStream.outputMode("complete").format("console").start()
query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - query-1 [state = ACTIVE]

这里,设置输出模式为complete,输出到控制台;由[state=ACTIVE],可知,我们的spark应用已开启,以下是执行结果

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+--------+-----+                                                                
|   hobby|count|
+--------+-----+
| running|    2|
|swimming|    1|
+--------+-----+

接下来,由结果可以看出,年龄小于25岁的people当中,有两个爱好running,一个爱好swimming;我们在/usr/local/spark/resource新创建一个文件,并输入以下数据,

{"name":"hello","age":18,"hobby":"swimming"}
{"name":"world","age":19,"hobby":"running"}

在执行spark应用终端可以得到如下结果:

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+--------+-----+                                                                
|   hobby|count|
+--------+-----+
| running|    2|
|swimming|    1|
+--------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------+-----+                                                                
|   hobby|count|
+--------+-----+
| running|    3|
|swimming|    2|
+--------+-----+

由于是complete模式,因此当有新数据时,spark应用处理新的数据,产生新的结果,并和原来的结果进行合并输出;

编写独立应用

之前是在spark-shell中一行一行的执行代码,现在我们可以把程序写成单独一个scala文件,然后提交给spark执行;首先打开一个终端,输入以下命令:

cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir -p src/main/scala
cd src/main/scala
vim TestStructuredStreaming.scala

这样就用vim打开一个scala文件,在该文件输入以下内容:

import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object WordCountStructuredStreaming{
        def main(args: Array[String]){
                val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
                import spark.implicits._
                val userSchema = new StructType().add("name","string").add("age","integer").add("hobby","string")
                val userDF = spark.readStream.schema(userSchema).json("/usr/local/spark/resource/")
                val userlittler25DF = userDF.filter($"age"<25)
                val hobbyDF = userlittler25DF.groupBy("hobby").count()
                val query = hobbyDF.writeStream.outputMode("complete").format("console").start()
                query.awaitTermination()
        }
}

代码写好之后,退出终端,然后在/usr/lcoal/spark/mycode/streaming目录下创建simple.sbt文件:

cd /usr/local/spark/mycode/streaming
vim simple.sbt # 注意,是simple.sbt,不是simple.txt

打开vim编辑器以后,输入以下内容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"

然后就可以执行sbt打包编译了:

cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package

打包成功以后,就可以输入以下命令启动这个程序:

cd /usr/local/spark/mycode/streaming
/usr/local/spark/bin/spark-submit --class "WordCountStructuredStreaming" ./target/scala-2.11/simple-project_2.11-1.0.jar

执行上输出程序之后,就开启了监听状态,因为之前已经在/usr/local/spark/resource保存了数据,因此spark程序跑起来之后,直接输出年龄小于25岁的people,爱好排行榜,和在spark-shell中得到的结果一致。

以上就是Structured Streaming基于文件数据源的基本操作;

子雨大数据之Spark入门
扫一扫访问本博客