Spark2.1.0入门:DStream输出操作

大数据学习路线图

子雨大数据之Spark入门
【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。

准备工作

这里依然以前面的“DStream转换操作”章节中介绍的NetworkWordCountStateful.scala为基础进行修改。下面,让我们先调试一遍这个程序,复习一下整个程序的细节。
请登录Linux系统,打开一个终端:

cd /usr/local/spark/mycode/streaming/stateful
cd src/main/scala
ls

可以看到目录下有两个代码文件(我们在前面章节已经创建了这两个文件):NetworkWordCountStateful.scala和StreamingExamples.scala。
其中,NetworkWordCountStateful.scala的代码如下:

package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
        StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    sc.start()
    sc.awaitTermination()
  }
}

下面请执行下面命令再次sbt打包编译一下(前面章节学习中已经打包编译过了,这里只是为了再次练习一下):

cd /usr/local/spark/mycode/streaming/stateful
/usr/local/sbt/sbt package

打包编译成功以后,就可以运行这个程序:

/usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCountStateful" /usr/local/spark/mycode/streaming/stateful/target/scala-2.11/simple-project_2.11-1.0.jar

启动后屏幕上就会显示类似下面的程序运行信息:

-------------------------------------------
Time: 1479890485000 ms
-------------------------------------------

-------------------------------------------
Time: 1479890490000 ms
-------------------------------------------

下面,请打开另外一个终端,作为单词产生的源头,提供给NetworkWordCountStateful程序进行词频统计:

nc -lk 9999
//请手动输入一些单词,可以随便输入,输入一个单词就敲入回车,比如下面是笔者输入的单词
hadoop
spark
hadoop
spark
hadoop
spark

这个时候,你再去看刚才运行NetworkWordCountStateful程序的监听窗口,就可以看到类似下面的词频统计结果:

-------------------------------------------
Time: 1479890485000 ms
-------------------------------------------
(spark,1)
(hadoop,1)

-------------------------------------------
Time: 1479890490000 ms
-------------------------------------------
(spark,2)
(hadoop,3)

现在可以结束程序运行了,可以按Ctrl+Z结束当前程序的运行。
好了,以前的知识复习结束。现在,请把刚才打开的这两个终端窗口都关闭掉。有了这个基础,我们就可以顺利开展本节内容的学习了,也就是如何把DStream输出到外部系统中。

把DStream输出到文本文件中

下面请新打开一个终端。为了不破坏以前的代码,我们单独复制上面这些代码到一个新的文件夹中,请执行下面命令:

cd /usr/local/spark/mycode/streaming
mkdir dstreamoutput
cd dstreamoutput
mkdir -p src/main/scala
cp /usr/local/spark/mycode/streaming/stateful/src/main/scala/* ./src/main/scala/
cp /usr/local/spark/mycode/streaming/stateful/simple.sbt ./

这样,我们就把代码都复制到了新的文件夹,然后,我们就在这个新的dstreamoutput文件夹中进行程序修改。
首先,我们尝试把DStream内容保存到文本文件中,可以使用如下语句:

stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output.txt")

下面使用vim编辑器打开NetworkWordCountStateful.scala代码文件:

cd /usr/local/spark/mycode/streaming/dstreamoutput
vim src/main/scala/NetworkWordCountStateful.scala

我们要把这条保存数据的语句stateDstream.saveAsTextFiles()放入到NetworkWordCountStateful.scala代码中,修改后的代码如下(或者你可以把NetworkWordCountStateful.scala原来的代码内容全部删除,直接把下面修改后的代码复制进去):

package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
        StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
        //下面是新增的语句,把DStream保存到文本文件中
        stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output.txt")
    sc.start()
    sc.awaitTermination()
  }
}

程序修改完毕,现在我们可以sbt打包编译代码:

cd /usr/local/spark/mycode/streaming/dstreamoutput
/usr/local/sbt/sbt package

打包成功以后,请运行程序:

cd /usr/local/spark/mycode/streaming/dstreamoutput
/usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCountStateful" /usr/local/spark/mycode/streaming/dstreamoutput/target/scala-2.11/simple-project_2.11-1.0.jar

程序运行以后,屏幕上就会显示类似下面的程序运行信息:

-------------------------------------------
Time: 1479890485000 ms
-------------------------------------------

-------------------------------------------
Time: 1479890490000 ms
-------------------------------------------

下面,请打开另外一个终端,作为单词产生的源头,提供给NetworkWordCountStateful程序进行词频统计:

nc -lk 9999
//请手动输入一些单词,可以随便输入,比如下面是笔者输入的单词
hadoop
spark
hadoop
spark
hadoop
spark

这个时候,你再去看刚才运行NetworkWordCountStateful程序的监听窗口,就可以看到类似下面的词频统计结果:

-------------------------------------------
Time: 1479890485000 ms
-------------------------------------------
(spark,1)
(hadoop,1)

-------------------------------------------
Time: 1479890490000 ms
-------------------------------------------
(spark,2)
(hadoop,3)

然后,我们停止程序运行,在当前终端内按键盘Ctrlz+Z组合键,就可以结束程序运行。现在我们就看一下,这些词频结果是否被成功地输出到“/usr/local/spark/mycode/streaming/dstreamoutput/output.txt”文件中了。请执行如下命令:

cd /usr/local/spark/mycode/streaming/dstreamoutput/
ls

可以发现,在这个目录下,生成了很多文本文件,如下:

output.txt-1479951955000
output.txt-1479951960000
output.txt-1479951965000
output.txt-1479951970000
output.txt-1479951975000
output.txt-1479951980000
output.txt-1479951985000

可以看出,由于我们在代码中有一句“val sc = new StreamingContext(conf, Seconds(5))”,也就是说,每隔5秒钟统计一次词频,所以,每隔5秒钟就会生成一次词频统计结果,并输出到“/usr/local/spark/mycode/streaming/dstreamoutput/output.txt”中,每次生成的output.txt后面会自动被加上时间标记(比如1479951955000)。这里要注意,虽然我们把DStream输出到“/usr/local/spark/mycode/streaming/dstreamoutput/output.txt”中,output.txt的命名看起来像一个文件,但是,实际上,spark会生成名称为output.txt的目录,而不是文件。这个问题,我们在之前“文件数据读写”章节的学习中已经讨论过,这里不再赘述。
我们可以查看一下某个output.txt下面的内容:

 cat output.txt-1479951980000/*

会得到类似下面的结果:

(hello,1)
(spark,2)
(hadoop,2)

说明我们已经成功地把DStream输出到文本文件了。

把DStream写入到MySQL数据库中

请执行下面命令在Linux中启动MySQL数据库,并完成数据库和表的创建:

service mysql start
mysql -u root -p
//屏幕会提示你输入密码

输入密码后,你就可以进入“mysql>”命令提示符状态,然后就可以输入下面的SQL语句完成数据库和表的创建。在我们之前“通过JDBC连接数据库(DataFrame)”章节的内容中,我们已经在MySQL数据库中创建了一个名称为“spark”的数据库和名称为“student”的表。这里,我们可以直接使用这个已经创建好的“spark”数据库,然后,在这个数据库中创建一个名称为“wordcount”的表,命令如下:

mysql> use spark
mysql> create table wordcount (word char(20), count int(4));
mysql> select * from wordcount
//这个时候wordcount表是空的,没有任何记录

下面,请新打开另外一个Linux终端,我们就可以对NetworkWordCountStateful.scala代码文件进行修改,为了方便起见,你可以直接执行下面命令删除该文件,再用vim编辑器新建该文件:

cd /usr/local/spark/mycode/streaming/dstreamoutput/src/main/scala
ls
rm NetworkWordCountStateful.scala
vim NetworkWordCountStateful.scala

然后,在NetworkWordCountStateful.scala文件中加入下面代码:

package org.apache.spark.examples.streaming
import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
        //下面是新增的语句,把DStream保存到MySQL数据库中     
     stateDstream.foreachRDD(rdd => {
      //内部函数
      def func(records: Iterator[(String,Int)]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://localhost:3306/spark"
          val user = "root"
          val password = "hadoop"  //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码
          conn = DriverManager.getConnection(url, user, password)
          records.foreach(p => {
            val sql = "insert into wordcount(word,count) values (?,?)"
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, p._1.trim)
                        stmt.setInt(2,p._2.toInt)
            stmt.executeUpdate()
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }

      val repartitionedRDD = rdd.repartition(3)
      repartitionedRDD.foreachPartition(func)
    })

    sc.start()
    sc.awaitTermination()
  }
}

保存该文件,退出vim编辑器。这个代码的具体含义,我们过一会儿再解释,现在我们先打包编译一下程序,看看效果。
因为我们在NetworkWordCountStateful.scala代码中加入了Spark SQL的操作,所以,需要用到Spark SQL依赖包,因此,需要修改一下simple.sbt文件,同样为了方便起见,我们直接删除该文件,然后重新建该文件,请执行下面命令:

cd /usr/local/spark/mycode/streaming/dstreamoutput
rm simple.sbt
vim simple.sbt

然后在simple.sbt中加入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0"

保存该文件,退出vim编辑器。然后,执行下面命令打包编译:

cd /usr/local/spark/mycode/streaming/dstreamoutput
/usr/local/sbt/sbt package

打包编译成功后,就可以执行下面命令运行NetworkWordCountStateful程序进行词频统计,但是,需要注意,因为需要通过JDBC连接MySQL数据库,所以,根据前面“JDBC连接MySQL数据库”章节介绍的知识,需要在spark-submit命令中提供外部jar包,告诉spark程序可以在哪里找到mysql驱动程序。命令如下:

 /usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCountStateful" --jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar /usr/local/spark/mycode/streaming/dstreamoutput/target/scala-2.11/simple-project_2.11-1.0.jar

执行上面命令以后,就进入监听状态,下面我们打开另外一个终端,运行下面命令产生单词提供给NetworkWordCountStateful进行词频统计:

nc -lk 9999
//现在你就可以在当前窗口内随意输入单词,输入一个单词就回车,比如输入下面单词
hello
hadoop
spark
hello
spark

输入一些单词以后,就可以按Ctrl+Z停止nc程序。然后切换到刚才运行NetworkWordCountStateful程序的监听窗口,也按Ctrl+Z停止NetworkWordCountStateful程序运行。然后,再切换到刚才运行MySQL的终端窗口,在mysql命令提示符下输入查看命令,就可以查看spark.wordcount表内当前最新的数据:

mysql> select * from wordcount;
+--------+-------+
| word   | count |
+--------+-------+
| hello  |     1 |
| hadoop |     1 |
| hello  |     2 |
| spark  |     1 |
| hadoop |     2 |
| hello  |     2 |
| spark  |     3 |
| hadoop |     2 |
+--------+-------+

我们重点来分析其中的这段代码(其他代码之前我们都已经介绍过含义):

 stateDstream.foreachRDD(rdd => {
      //内部函数
      def func(records: Iterator[(String,Int)]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://localhost:3306/spark"
          val user = "root"
          val password = "hadoop"
          conn = DriverManager.getConnection(url, user, password)
          records.foreach(p => {
            val sql = "insert into wordcount(word,count) values (?,?)"
            stmt = conn.prepareStatement(sql)
            stmt.setString(1, p._1.trim)
                        stmt.setInt(2,p._2.toInt)
            stmt.executeUpdate()
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }

      val repartitionedRDD = rdd.repartition(3)
      repartitionedRDD.foreachPartition(func)
    })

也就是说,对于stateDstream,为了把它保存到MySQL数据库中,我们采用了如下的形式:

stateDstream.foreachRDD(function)

其中,function就是一个RDD[T]=>Unit类型的函数,对于本程序而言,就是RDD[(String,Int)]=>Unit类型的函数,也就是说,stateDstream中的每个RDD都是RDD[(String,Int)]类型(想象一下,统计结果的形式是("hadoop",3))。这样,对stateDstream中的每个RDD都会执行function中的操作(即把该RDD保存到MySQL的操作)。

下面看function的处理逻辑,在function部分,函数体要执行的处理逻辑实际上是下面的形式:

 def func(records: Iterator[(String,Int)]){……}
 val repartitionedRDD = rdd.repartition(3)
 repartitionedRDD.foreachPartition(func) 

也就是说,这里定义了一个内部函数func,它的功能是,接收records,然后把records保存到MySQL中。到这里,你可能会有疑问?为什么不是把stateDstream中的每个RDD直接拿去保存到MySQL中,还要调用rdd.repartition(3)对这些RDD重新设置分区数为3呢?这是因为,每次保存RDD到MySQL中,都需要启动数据库连接,如果RDD分区数量太大,那么就会带来多次数据库连接开销,为了减少开销,就有必要把RDD的分区数量控制在较小的范围内,所以,这里就把RDD的分区数量重新设置为3。然后,对于每个RDD分区,就调用repartitionedRDD.foreachPartition(func),把每个分区的数据通过func保存到MySQL中,这时,传递给func的输入参数就是Iterator[(String,Int)]类型的records。如果你不好理解下面这种调用形式:

 repartitionedRDD.foreachPartition(func) //这种形式func没有带任何参数,可能不太好理解,不是那么直观

实际上,这句语句和下面的语句是等价的,下面的语句形式你可能会更好理解:

 repartitionedRDD.foreachPartition(records => func(records)) 

上面这种等价的形式比较直观,为func()函数传入了一个records参数,这就正好和 def func(records: Iterator[(String,Int)])定义对应起来了,方便理解。

接下来,我们就要去剖析func()函数的功能了,我们再单独把func()函数的代码提取出来,不要和其他代码混在一起:

def func(records: Iterator[(String,Int)]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://localhost:3306/spark"
          val user = "root"
          val password = "hadoop"
          conn = DriverManager.getConnection(url, user, password)
          records.foreach(p => {
            val sql = "insert into wordcount(word,count) values (?,?)"
            stmt = conn.prepareStatement(sql)
            stmt.setString(1, p._1.trim)
                        stmt.setInt(2,p._2.toInt)
            stmt.executeUpdate()
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }

可以看出,上面这段代码的功能是,创建数据库连接conn,然后调用records.foreach(),对于records中的每条记录p,都把p插入到MySQL数据库中。这里要注意,数据库连接conn的创建,是在records.foreach()方法之前,这样可以大大减小数据库连接开销。否则,如果把数据库连接conn的创建放在records.foreach()方法之后,那么,每条记录p都需要建立一次数据库连接,这个开销会很大,肯定是不可取的,对数据库连接资源会造成很大的压力和挑战。
下面再分析一下prepareStatement中的语句。stmt.setString(1, p._1.trim),是指,为val sql = "insert into wordcount(word,count) values (?,?)" 中的第一个问号设置具体的值,也就是给word字段设置值;stmt.setInt(2,p._2.toInt)是指,为第2个问号赋值,也就是给count字段设置值。很显然,每条记录p的形式是类似("hadoop",3)这种形式,所以,需要用p._1获取到"hadoop"(只是举例,每次获取到的值都会不同),需要用p._2获取到3(只是举例,每次获取到的值都会不同)。p._1.trim中调用了trim函数,是为了去掉字符串头尾可能存在的空格。p._2.toInt是为了把获取的3,转换成整型。

到这里,上述代码基本上就没有理解难度了。本次实验顺利完成。

子雨大数据之Spark入门
扫一扫访问本博客