林子雨、赖永炫、陶继平编著《Spark编程基础》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码
第7章 Spark Streaming
scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(1))
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir logfile
scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(20))
scala> val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
scala> val words = lines.flatMap(_.split(" "))
scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
scala> wordCounts.print()
scala> ssc.start()
scala> ssc.awaitTermination()
cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir file
cd file
mkdir -p src/main/scala
cd src/main/scala
vim TestStreaming.scala
//TestStreaming.scala
import org.apache.spark._
import org.apache.spark.streaming._
object WordCountStreaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]") //设置为本地运行模式,2个线程,一个监听,另一个处理数据
val ssc = new StreamingContext(sparkConf, Seconds(2)) // 时间间隔为2秒
val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile") //这里采用本地文件,当然也可以采用HDFS文件
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
cd /usr/local/spark/mycode/streaming/file
/usr/local/sbt/sbt package
cd /usr/local/spark/mycode/streaming/file
/usr/local/spark/bin/spark-submit --class "WordCountStreaming" /usr/local/spark/mycode/streaming/file/target/scala-2.11/simple-project_2.11-1.0.jar
cd /usr/local/spark/mycode
mkdir streaming #如果已经存在该目录,则不用创建
cd streaming
mkdir socket
cd socket
mkdir -p /src/main/scala #如果已经存在该目录,则不用创建
cd /usr/local/spark/mycode/streaming/socket/src/main/scala
vim NetworkWordCount.scala #这里使用vim编辑器创建文件
//NetworkWordCount.scala
package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
//StreamingExamples.scala
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
cd /usr/local/spark/mycode/streaming/socket
/usr/local/sbt/sbt package
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /usr/local/spark/mycode/streaming/socket/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999
nc -lk 9999
cd /usr/local/spark/mycode/streaming/socket/src/main/scala
vim DataSourceSocket.scala
//DataSourceSocket.scala
package org.apache.spark.examples.streaming
import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source
object DataSourceSocket {
def index(length: Int) = { //返回位于0到length-1之间的一个随机数
val rdm = new java.util.Random
rdm.nextInt(length)
}
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println("Usage: <filename> <port> <millisecond>")
System.exit(1)
}
val fileName = args(0) //获取文件路径
val lines = Source.fromFile(fileName).getLines.toList //读取文件中的所有行的内容
val rowCount = lines.length //计算出文件的行数
val listener = new ServerSocket(args(1).toInt) //创建监听特定端口的ServerSocket对象
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(args(2).toLong) //每隔多长时间发送一次数据
val content = lines(index(rowCount)) //从lines列表中取出一个元素
println(content)
out.write(content + '\n') //写入要发送给客户端的数据
out.flush() //发送数据给客户端
}
socket.close()
}
}.start()
}
}
}
cd /usr/local/spark/mycode/streaming/socket
/usr/local/sbt/sbt package
/usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.DataSourceSocket" /usr/local/spark/mycode/streaming/socket/target/scala-2.11/simple-project_2.11-1.0.jar /usr/local/spark/mycode/streaming/socket/word.txt 9999 1000
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /usr/local/spark/mycode/streaming/socket/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999
//TestRDDQueueStream.scala
package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object QueueStream {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val mappedStream = queueStream.map(r => (r % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()
for (i <- 1 to 10){
rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
Thread.sleep(1000)
}
ssc.stop()
}
}
cd /usr/local/spark/mycode/streaming/rddqueue
/usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.QueueStream" /usr/local/spark/mycode/streaming/rddqueue/target/scala-2.11/simple-project_2.11-1.0.jar
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka
./bin/kafka-server-start.sh config/server.properties
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties &
cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest
#这个Topic叫wordsendertest,2181是Zookeeper默认的端口号,--partitions是Topic里面的分区数,--replication-factor是备份的数量,在Kafka集群中使用,由于这里是单机版,所以不用备份
#可以用list列出所有创建的Topic,来查看上面创建的Topic是否存在
./bin/kafka-topics.sh --list --zookeeper localhost:2181
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsendertest
cd /usr/local/kafka
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wordsender --from-beginning
cd /usr/local/spark
./bin/spark-shell
scala> import org.apache.spark.streaming.kafka._
<console>:25: error: object kafka is not a member of package org.apache.spark.streaming
import org.apache.spark.streaming.kafka._
^
cd /usr/local/spark/jars
mkdir kafka
cd ~/下载
cp ./spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars/kafka
cd /usr/local/kafka/libs
ls
cp ./* /usr/local/spark/jars/kafka
cd /usr/local/spark
./bin/spark-shell
scala> import org.apache.spark.streaming.kafka._
//会显示下面信息
import org.apache.spark.streaming.kafka._
cd /usr/local/spark/mycode
mkdir kafka
cd kafka
mkdir -p src/main/scala
cd src/main/scala
vim KafkaWordProducer.scala
//KafkaWordProducer.scala
package org.apache.spark.examples.streaming
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordProducer {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
"<messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
// Send some messages
while(true) {
(1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
.mkString(" ")
print(str)
println()
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}
Thread.sleep(1000)
}
}
}
//KafkaWordCount.scala
package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
object KafkaWordCount{
def main(args:Array[String]){
StreamingExamples.setStreamingLogLevels()
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sc,Seconds(10))
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
val zkQuorum = "localhost:2181" //Zookeeper服务器地址
val group = "1" //Topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group"
val topics = "wordsender" //topics的名称
val numThreads = 1 //每个topic的分区数
val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) //这行代码的含义在下一节的窗口转换操作中会有介绍
wordCounts.print
ssc.start
ssc.awaitTermination
}
}
cd /usr/local/spark/mycode/kafka/
vim simple.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
cd /usr/local/spark/mycode/kafka/
/usr/local/sbt/sbt package
cd /usr/local/hadoop
./sbin/start-dfs.sh
cd /usr/local/spark
/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordProducer" /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar localhost:9092 wordsender 3 5
cd /usr/local/spark
/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCount" /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2)
//NetworkWordCountStateful.scala
package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCountStateful {
def main(args: Array[String]) {
//定义状态更新函数
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
StreamingExamples.setStreamingLogLevels() //设置log4j日志级别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
val sc = new StreamingContext(conf, Seconds(5))
sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/") //设置检查点,检查点具有容错机制
val lines = sc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
sc.start()
sc.awaitTermination()
}
}
/usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCountStateful" /usr/local/spark/mycode/streaming/stateful/target/scala-2.11/simple-project_2.11-1.0.jar
nc -lk 9999 #在这个窗口中手动输入一些单词
//修改后的NetworkWordCountStateful.scala代码
package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCountStateful {
def main(args: Array[String]) {
//定义状态更新函数
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
StreamingExamples.setStreamingLogLevels() //设置log4j日志级别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
val sc = new StreamingContext(conf, Seconds(5))
sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/") //设置检查点,检查点具有容错机制
val lines = sc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
//下面是新增的语句,把DStream保存到文本文件中
stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output")
sc.start()
sc.awaitTermination()
}
}
mysql> use spark;
mysql> create table wordcount (word char(20), count int(4));
//修改后的支持MySQL数据库写入的NetworkWordCountStateful.scala代码
package org.apache.spark.examples.streaming
import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
object NetworkWordCountStateful {
def main(args: Array[String]) {
//定义状态更新函数
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
StreamingExamples.setStreamingLogLevels() //设置log4j日志级别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
val sc = new StreamingContext(conf, Seconds(5))
sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/") //设置检查点,检查点具有容错机制
val lines = sc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
//下面是新增的语句,把DStream保存到MySQL数据库中
stateDstream.foreachRDD(rdd => {//函数体的左大括号
//内部函数
def func(records: Iterator[(String,Int)]) {
var conn: Connection = null
var stmt: PreparedStatement = null
try {
val url = "jdbc:mysql://localhost:3306/spark"
val user = "root"
val password = "hadoop" //数据库密码是hadoop
conn = DriverManager.getConnection(url, user, password)
records.foreach(p => {
val sql = "insert into wordcount(word,count) values (?,?)"
stmt = conn.prepareStatement(sql);
stmt.setString(1, p._1.trim)
stmt.setInt(2,p._2.toInt)
stmt.executeUpdate()
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) {
stmt.close()
}
if (conn != null) {
conn.close()
}
}
}
val repartitionedRDD = rdd.repartition(3)
repartitionedRDD.foreachPartition(func)
}) //函数体的右大括号
sc.start()
sc.awaitTermination()
}
}