林子雨编著《Spark编程基础》教材第7章的代码

大数据学习路线图

林子雨、赖永炫、陶继平编著《Spark编程基础》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码

第7章 Spark Streaming

scala> import  org.apache.spark.streaming._
scala> val  ssc = new StreamingContext(sc, Seconds(1))
import  org.apache.spark._
import  org.apache.spark.streaming._
val  conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
val  ssc = new StreamingContext(conf, Seconds(1))
cd  /usr/local/spark/mycode
mkdir  streaming
cd  streaming
mkdir  logfile
scala> import  org.apache.spark.streaming._
scala> val  ssc = new StreamingContext(sc, Seconds(20))
scala> val  lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
scala> val  words = lines.flatMap(_.split(" "))
scala> val  wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
scala> wordCounts.print()
scala> ssc.start()
scala> ssc.awaitTermination()
cd  /usr/local/spark/mycode
mkdir  streaming
cd  streaming
mkdir  file
cd  file
mkdir  -p  src/main/scala
cd  src/main/scala
vim  TestStreaming.scala
//TestStreaming.scala
import org.apache.spark._
import org.apache.spark.streaming._
object WordCountStreaming {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")  //设置为本地运行模式,2个线程,一个监听,另一个处理数据
    val ssc = new StreamingContext(sparkConf, Seconds(2))   // 时间间隔为2秒
    val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")  //这里采用本地文件,当然也可以采用HDFS文件
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0" 
cd  /usr/local/spark/mycode/streaming/file
/usr/local/sbt/sbt  package
cd  /usr/local/spark/mycode/streaming/file
/usr/local/spark/bin/spark-submit  --class  "WordCountStreaming"  /usr/local/spark/mycode/streaming/file/target/scala-2.11/simple-project_2.11-1.0.jar
cd  /usr/local/spark/mycode
mkdir  streaming  #如果已经存在该目录,则不用创建
cd  streaming
mkdir  socket
cd  socket
mkdir  -p  /src/main/scala  #如果已经存在该目录,则不用创建
cd  /usr/local/spark/mycode/streaming/socket/src/main/scala
vim  NetworkWordCount.scala  #这里使用vim编辑器创建文件
//NetworkWordCount.scala
package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }
    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
//StreamingExamples.scala
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0" 
cd  /usr/local/spark/mycode/streaming/socket
/usr/local/sbt/sbt  package
cd  /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit  --class  "org.apache.spark.examples.streaming.NetworkWordCount"  /usr/local/spark/mycode/streaming/socket/target/scala-2.11/simple-project_2.11-1.0.jar  localhost  9999
nc  -lk  9999
cd  /usr/local/spark/mycode/streaming/socket/src/main/scala
vim  DataSourceSocket.scala
//DataSourceSocket.scala
package org.apache.spark.examples.streaming
import  java.io.{PrintWriter}
import  java.net.ServerSocket 
import  scala.io.Source
object DataSourceSocket {
  def index(length: Int) = { //返回位于0到length-1之间的一个随机数
    val rdm = new java.util.Random
    rdm.nextInt(length)
  }
  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println("Usage: <filename> <port> <millisecond>")
      System.exit(1)
    }
    val fileName = args(0)  //获取文件路径
    val lines = Source.fromFile(fileName).getLines.toList  //读取文件中的所有行的内容
    val rowCount = lines.length  //计算出文件的行数
val listener = new ServerSocket(args(1).toInt)  //创建监听特定端口的ServerSocket对象
    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Got client connected from: " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)
          while (true) {
            Thread.sleep(args(2).toLong)  //每隔多长时间发送一次数据
            val content = lines(index(rowCount))  //从lines列表中取出一个元素
            println(content)
            out.write(content + '\n')  //写入要发送给客户端的数据
            out.flush()  //发送数据给客户端
          }
          socket.close()
        }
      }.start()
    }
  }
}
cd  /usr/local/spark/mycode/streaming/socket
/usr/local/sbt/sbt  package
/usr/local/spark/bin/spark-submit  --class  "org.apache.spark.examples.streaming.DataSourceSocket"  /usr/local/spark/mycode/streaming/socket/target/scala-2.11/simple-project_2.11-1.0.jar   /usr/local/spark/mycode/streaming/socket/word.txt  9999  1000
cd  /usr/local/spark/mycode/streaming/socket 
/usr/local/spark/bin/spark-submit  --class  "org.apache.spark.examples.streaming.NetworkWordCount"  /usr/local/spark/mycode/streaming/socket/target/scala-2.11/simple-project_2.11-1.0.jar  localhost  9999
//TestRDDQueueStream.scala
package org.apache.spark.examples.streaming 
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object QueueStream {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
    val queueStream = ssc.queueStream(rddQueue)
    val mappedStream = queueStream.map(r => (r % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()
ssc.start()
for (i <- 1 to 10){
        rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
        Thread.sleep(1000)
    }
    ssc.stop()
  }
}
cd  /usr/local/spark/mycode/streaming/rddqueue
/usr/local/spark/bin/spark-submit  --class  "org.apache.spark.examples.streaming.QueueStream"  /usr/local/spark/mycode/streaming/rddqueue/target/scala-2.11/simple-project_2.11-1.0.jar
cd  /usr/local/kafka
./bin/zookeeper-server-start.sh  config/zookeeper.properties
cd  /usr/local/kafka
./bin/kafka-server-start.sh  config/server.properties
cd  /usr/local/kafka
bin/kafka-server-start.sh  config/server.properties  &
cd  /usr/local/kafka
./bin/kafka-topics.sh  --create  --zookeeper  localhost:2181  --replication-factor  1  --partitions  1  --topic  wordsendertest
#这个Topic叫wordsendertest,2181是Zookeeper默认的端口号,--partitions是Topic里面的分区数,--replication-factor是备份的数量,在Kafka集群中使用,由于这里是单机版,所以不用备份
#可以用list列出所有创建的Topic,来查看上面创建的Topic是否存在
./bin/kafka-topics.sh  --list  --zookeeper  localhost:2181
./bin/kafka-console-producer.sh  --broker-list  localhost:9092  --topic  wordsendertest
cd /usr/local/kafka
./bin/kafka-console-consumer.sh  --zookeeper  localhost:2181  --topic  wordsender  --from-beginning
cd  /usr/local/spark
./bin/spark-shell
scala> import  org.apache.spark.streaming.kafka._
<console>:25: error: object kafka is not a member of package org.apache.spark.streaming
         import org.apache.spark.streaming.kafka._
                                           ^
cd  /usr/local/spark/jars
mkdir  kafka
cd  ~/下载
cp  ./spark-streaming-kafka-0-8_2.11-2.1.0.jar  /usr/local/spark/jars/kafka
cd  /usr/local/kafka/libs
ls
cp  ./*  /usr/local/spark/jars/kafka
cd  /usr/local/spark
./bin/spark-shell
scala> import  org.apache.spark.streaming.kafka._
//会显示下面信息
import org.apache.spark.streaming.kafka._
cd  /usr/local/spark/mycode
mkdir  kafka
cd  kafka
mkdir  -p  src/main/scala
cd  src/main/scala
vim  KafkaWordProducer.scala
//KafkaWordProducer.scala
package org.apache.spark.examples.streaming
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordProducer {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
   // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")
                    print(str)
                    println()
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
     Thread.sleep(1000)
    }
  }
}
//KafkaWordCount.scala
package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

object KafkaWordCount{
def main(args:Array[String]){
StreamingExamples.setStreamingLogLevels()
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sc,Seconds(10))
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
val zkQuorum = "localhost:2181" //Zookeeper服务器地址
val group = "1"  //Topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group" 
val topics = "wordsender"  //topics的名称
val numThreads = 1  //每个topic的分区数
val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) //这行代码的含义在下一节的窗口转换操作中会有介绍
wordCounts.print
ssc.start
ssc.awaitTermination
}
}
cd  /usr/local/spark/mycode/kafka/
vim  simple.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
cd  /usr/local/spark/mycode/kafka/
/usr/local/sbt/sbt  package
cd  /usr/local/hadoop
./sbin/start-dfs.sh
cd  /usr/local/spark
/usr/local/spark/bin/spark-submit  --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/*  --class  "org.apache.spark.examples.streaming.KafkaWordProducer"   /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar  localhost:9092  wordsender  3  5
cd  /usr/local/spark
/usr/local/spark/bin/spark-submit  --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/*  --class  "org.apache.spark.examples.streaming.KafkaWordCount"    /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2)
//NetworkWordCountStateful.scala
package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
      StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    sc.start()
    sc.awaitTermination()
  }
}
/usr/local/spark/bin/spark-submit  --class  "org.apache.spark.examples.streaming.NetworkWordCountStateful"   /usr/local/spark/mycode/streaming/stateful/target/scala-2.11/simple-project_2.11-1.0.jar
nc  -lk  9999  #在这个窗口中手动输入一些单词
//修改后的NetworkWordCountStateful.scala代码
package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
//下面是新增的语句,把DStream保存到文本文件中
stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output")
    sc.start()
    sc.awaitTermination()
  }
}
mysql> use  spark;
mysql> create  table  wordcount (word char(20), count int(4));
//修改后的支持MySQL数据库写入的NetworkWordCountStateful.scala代码
package org.apache.spark.examples.streaming
import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
//下面是新增的语句,把DStream保存到MySQL数据库中
     stateDstream.foreachRDD(rdd => {//函数体的左大括号
      //内部函数
      def func(records: Iterator[(String,Int)]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://localhost:3306/spark"
          val user = "root"
          val password = "hadoop"  //数据库密码是hadoop
          conn = DriverManager.getConnection(url, user, password)
          records.foreach(p => {
            val sql = "insert into wordcount(word,count) values (?,?)"
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, p._1.trim)
                        stmt.setInt(2,p._2.toInt)
            stmt.executeUpdate()
          }) 
} catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }
      val repartitionedRDD = rdd.repartition(3)
      repartitionedRDD.foreachPartition(func)
    }) //函数体的右大括号
    sc.start()
    sc.awaitTermination()
  }
}