Spark入门:套接字流(DStream)

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。Spark Streaming自身就提供了一个简单的样例程序,我们先直接演示这个程序,看看效果,然后再动手编写程序打包运行。

请打开一个Shell窗口,进入Shell命令提示符状态,然后执行下面命令:

nc -lk 9999

上面这个启动了nc程序的窗口,我们可以称为“nc窗口”,执行上面命令以后,这个窗口就等待我们输入,我们可以随便输入单词,这些输入的单词就形成数据源,但是,现在不要着急输入单词,因为,我们的Socket监听程序还没有启动,现在输入单词也没有效果。下面我们启动Socket监听程序,请打开另外一个终端窗口,我们称为“监听窗口”,请在这个窗口里面运行下面命令:

cd /usr/local/spark
./bin/run-example streaming.NetworkWordCount localhost 9999

执行上面命令以后,就会启动Spark Streaming自带的样例程序,这个程序的功能是,监听“nc窗口”输入的单词,然后,统计单词出现的次数。注意,这个NetworkWordCount程序每隔1秒钟自动接收一次来自nc窗口的数据流(也就是在这1秒内你在nc窗口内输入的所有单词)。这1秒和下1秒输入的单词,是独立分开统计次数的,不会累计。
上面命令执行后,监听窗口里面会每隔1秒循环打印类似下面的信息:

-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------

-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------

-------------------------------------------
Time: 1479431140000 ms
-------------------------------------------

注意,如果你的电脑屏幕上,不是显示上面这样非常干净的信息,而是夹杂了很多乱七八糟的信息,如下:

//这里省略若干屏幕信息,干扰你的视线
-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
//这里省略若干屏幕信息,干扰你的视线
-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------
//这里省略若干屏幕信息,干扰你的视线
-------------------------------------------
Time: 1479431140000 ms
-------------------------------------------

遇到上面这问题,不要紧,这是和log4j的设置有关的。Log4j是Apache的一个开源项目,通过使用Log4j,我们可以控制日志信息输送的目的地是控制台、文件、GUI组件,甚至是套接口服务器、NT的事件记录器、UNIX Syslog守护进程等;我们也可以控制每一条日志的输出格式;通过定义每一条日志信息的级别,我们能够更加细致地控制日志的生成过程。最令人感兴趣的就是,这些可以通过一个配置文件来灵活地进行配置,而不需要修改应用的代码。

那么如何修改log4j的设置,把这些乱七八糟的信息给过滤掉,不要显示到屏幕上面呢?方法如下:
请新打开一个Shell窗口,进入Shell命令提示符状态,然后执行下面命令:

cd /usr/local/spark/conf
ls

这时,你可以看到一个名字为log4j.properties.template的文件,请执行如下命令复制一份到当前目录:

cp log4j.properties.template log4j.properties

这样就得到一个log4j.properties文件,然后,请使用vim编辑器打开这个文件,修改里面的一个配置选项(只修改log4j.rootCategor,其他配置选项不要修改):

//原来的原始配置如下
log4j.rootCategory=INFO,console
//请修改为下面格式
log4j.rootCategory=WARN,console

修改后,保存退出vim编辑器,就可以关闭当前终端窗口。
切换回到前面已经打开的监听窗口,监听程序一直在运行,屏幕上还在继续打印信息,现在我们让这个监听程序停止运行,可以按键盘的Ctrl+Z组合键,就停止运行了,回到了Shell命令提示符状态,这时,可以再次输入下面命令:

cd /usr/local/spark
./bin/run-example streaming.NetworkWordCount localhost 9999

这时就再次启动了监听程序NetworkWordCount,你可以切换到nc窗口,输入一些单词回车,再输入一些单词回车,然后再返回到监听窗口,你就可以看到监听窗口会出现类似下面的统计结果:

-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
(hello,1)
(world,1)
-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------
(hadoop,1)
-------------------------------------------
Time: 1479431140000 ms
-------------------------------------------
(spark,1)

恭喜你,执行成功了。
现在要结束程序,请在监听窗口中按键盘的Ctrl+Z组合键,停止监听程序NetworkWordCount。请切换到nc窗口,按键盘的Ctrl+Z组合键,停止nc程序。下面要注意,如果你现在要马上开始再做一次刚才的测试,打算立即运行“nc -lk 9999”命令,你会发现,屏幕提示你端口已经别占用。怎么办?第一种解决方案,关闭所有终端,退出系统登录状态,重新登录一次Linux系统,再次运行“nc -lk 9999”命令启动nc程序。显然,这种方法不好,太浪费时间。所以,建议采用第二种方法,就是直接修改端口号,比如我们可以运行命令“nc -lk 9998”,但是,要注意,这时候,你的监听程序也必须监听9998端口,也就是说,必须使用下面命令:

cd /usr/local/spark
./bin/run-example streaming.NetworkWordCount localhost 9998  //注意,这里端口是9998

好的,到这里,关于如何运行Spark自带的NetworkWordCount样例程序,就介绍清楚了。下面我们看看,如果自己编写程序实现这样的功能。

请新打开一个Shell窗口,进入Shell命令提示符状态,然后执行下面命令:

cd  /usr/local/spark/examples/src/main/scala/org/apache/spark/examples/streaming/
ls

这时,你可以看到在这个目录下有个文件NetworkWordCount.scala,这个其实就是刚才执行词频统计程序的源代码,我们可以直接拷贝这个代码文件到自己的目录下,如下:

cp ./NetworkWordCount.scala  /usr/local/spark/mycode/streaming/src/main/scala  //如果这个用户目录不存在,请自己创建
cd /usr/local/spark/mycode/streaming/src/main/scala
vim NetworkWordCount.scala

上面用vim编辑器打开了NetworkWordCount.scala代码文件,可以看到该文件主要内容如下(这里只把代码内容保留,原来的注释都删除了):

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

退出vim编辑器。上面的代码,不能直接拿去sbt打包编辑,因为,里面有个 StreamingExamples.setStreamingLogLevels(),StreamingExamples来自另外一个代码文件,也就是“/usr/local/spark/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala”,请把这个StreamingExamples.scala文件也拷贝到自己的代码目录下(也就是和刚才的NetworkWordCount.scala放在同一个目录下):

cd /usr/local/spark/mycode/streaming/src/main/scala
cp /usr/local/spark/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala ./
vim StreamingExamples.scala

可以看到,StreamingExamples.scala文件主要代码如下(省略了一些注释):

//注意,一定要把package org.apache.spark.examples.streaming这行删除掉,否则编译会出错,因为这个文件已经被我们拷贝到自己代码目录下,已经补在原来的地方

import org.apache.spark.Logging

import org.apache.log4j.{Level, Logger}

/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {

  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

退出vim编辑器。可以看出,StreamingExamples.scala文件主要是用来对输出的日志信息进行格式设置。
下面要对代码进行sbt打包编译。这里需要一个simple.sbt文件,如果之前章节的学习已经在这个目录下建好了这个文件,这里就不需要再建,如果当前目录(/usr/local/spark/mycode/streaming/)下不存在simple.sbt,那么使用vim编辑器创建一个:

cd /usr/local/spark/mycode/streaming/
vim simple.sbt

在simple.sbt中输入以下内容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.2"

创建好simple.sbt文件后,退出vim编辑器。然后,执行下面命令:

cd /usr/local/spark/mycode/streaming
find .

屏幕上返回的信息,应该是类似下面的文件结构:

.
./src
./src/main
./src/main/scala
./src/main/scala/NetworkWordCount.scala
./src/main/scala/StreamingExamples.scala
./simple.sbt

然后,就可以执行sbt打包编译了,命令如下:

cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package

打包成功以后,就可以输入以下命令启动这个程序:

cd /usr/local/spark/mycode/streaming
/usr/local/spark/bin/spark-submit --class "NetworkWordCount" /usr/local/spark/mycode/streaming/target/scala-2.10/simple-project_2.10-1.0.jar localhost 9999

执行上面命令后,就进入了监听状态(我们把运行这个监听程序的窗口称为监听窗口),这时,你就可以像刚才一样,新打开一个窗口作为nc窗口,启动nc程序:

nc -lk 9999

这样,就可以在nc窗口中随意输入一些单词,监听窗口就会自动获得单词数据流信息,在监听窗口打印出词频统计信息。如果要停止运行上述程序,只要按键盘Ctrl+Z键就可以了。

下面我们再前进一步,这回,我们把数据源头的产生方式也修改一下,不要使用nc程序,而是采用自己编写的程序产生Socket数据源。
请在终端的Shell命令提示符下执行下面命令:

cd /usr/local/spark/mycode/streaming/src/main/scala
vim DataSourceSocket.scala

上面命令使用vim编辑器新建一个名称为DataSourceSocket.scala的代码文件,用来产生Socket数据源,请在该代码文件中输入下面代码:

import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source

object DataSourceSocket {
  def index(length: Int) = {

    val rdm = new java.util.Random

    rdm.nextInt(length)
  }
  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println("Usage: <filename> <port> <millisecond>")
      System.exit(1)
    }

    val fileName = args(0)
    val lines = Source.fromFile(fileName).getLines.toList
    val rowCount = lines.length

    val listener = new ServerSocket(args(1).toInt)
    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Got client connected from: " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)
          while (true) {
            Thread.sleep(args(2).toLong)
            val content = lines(index(rowCount))
            println(content)
            out.write(content + '\n')
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }
}

退出vim编辑器。上面这个程序的功能是,从一个文本文件中随机读取某行文本作为数据源,发送出去。

然后,就可以执行sbt打包编译了,命令如下:

cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package

注意,实际上,这个时候,我们的“/usr/local/spark/mycode/streaming/src/main/scala”目录下,就有了三个代码文件,分别是NetworkWordCount.scala、StreamingExamples.scala和DataSourceSocket.scala。sbt打包编译是同时对这三个代码文件打包编译。打包成功以后,就可以输入命令启动数据源程序和监听程序。
下面首先启动用来生成数据源的DataSourceSocket程序,不过,DataSourceSocket程序需要把一个文本文件作为输入参数,所以,在启动这个程序之前,需要首先创建一个文本文件:

cd /usr/local/spark/mycode/streaming/
vim word.txt

在word.txt中随便输入几行英文语句,然后保存并退出vim编辑器。
下面就启动DataSourceSocket程序,这个程序需要三个参数,第一个参数是文本文件路径,第二个参数是端口地址,第三个参数是时间间隔(单位是毫秒,也就是每隔多少毫秒发送一次信息),请执行下面命令启动这个程序:

/usr/local/spark/bin/spark-submit --class "DataSourceSocket" /usr/local/spark/mycode/streaming/target/scala-2.10/simple-project_2.10-1.0.jar /usr/local/spark/mycode/streaming/word.txt 9999 1000

然后,你就会看到,这个窗口会不断打印出一些随机读取到的文本信息,这些信息也是Socket数据源,会被监听程序捕捉到。所以,下面,我们就在另外一个窗口启动监听程序:

/usr/local/spark/bin/spark-submit --class "NetworkWordCount" /usr/local/spark/mycode/streaming/target/scala-2.10/simple-project_2.10-1.0.jar localhost 9999

启动成功后,你就会看到,屏幕上不断打印出词频统计信息。成功完成实验。

子雨大数据之Spark入门
扫一扫访问本博客