林子雨编著《Spark编程基础》教材第5章的代码

大数据学习路线图

林子雨、赖永炫、陶继平编著《Spark编程基础》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码

第5章 RDD编程

scala> val  lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[12] at textFile at <console>:27 
scala> val  lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> val  lines = sc.textFile("/user/hadoop/word.txt")
scala> val  lines = sc.textFile("word.txt")
scala> val  array = Array(1,2,3,4,5)
scala> val  rdd = sc.parallelize(array)
scala> val  list = List(1,2,3,4,5)
scala> val  rdd = sc.parallelize(list)
scala>  val  lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala>  val  linesWithSpark=lines.filter(line => line.contains("Spark"))
scala> data=Array(1,2,3,4,5)
scala> val  rdd1= sc.parallelize(data)
scala> val  rdd2=rdd1.map(x=>x+10)
scala> val  lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val  words=lines.map(line => line.split(" "))
scala> val  lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val  words=lines.flatMap(line => line.split(" "))
scala> val  rdd=sc.parallelize(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd.count()
res0: Long = 5
scala> rdd.first()
res1: Int = 1
scala> rdd.take(3)
res2: Array[Int] = Array(1,2,3)
scala> rdd.reduce((a,b)=>a+b)
res3: Int = 15
scala> rdd.collect()
res4: Array[Int] = Array(1,2,3,4,5)
scala> rdd.foreach(elem=>println(elem))
1
2
3
4
5
scala> val  lines = sc.textFile("data.txt")
scala> val  lineLengths = lines.map(s => s.length)
scala> val  totalLength = lineLengths.reduce((a, b) => a + b)
scala> val  list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val  rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29
scala> println(rdd.count())  //行动操作,触发一次真正从头到尾的计算
3
scala> println(rdd.collect().mkString(","))  //行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive
scala> val  list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val  rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29
scala> rdd.cache()  //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
scala> println(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
3
scala> println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive
scala> val  array = Array(1,2,3,4,5)
scala> val  rdd = sc.parallelize(array,2)  //设置两个分区
scala> val  data = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)
data: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> data.partitions.size  //显示data这个RDD的分区数量
res2: Int=2
scala> val  rdd = data.repartition(1)  //对data这个RDD进行重新分区
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :26
scala> rdd.partitions.size
res4: Int = 1
import org.apache.spark.{Partitioner, SparkContext, SparkConf}
//自定义分区类,需要继承org.apache.spark.Partitioner类
class MyPartitioner(numParts:Int) extends Partitioner{
  //覆盖分区数
  override def numPartitions: Int = numParts 
  //覆盖分区号获取函数
  override def getPartition(key: Any): Int = {
    key.toString.toInt%10
}
}
object TestPartitioner {
  def main(args: Array[String]) {
    val conf=new SparkConf()
    val sc=new SparkContext(conf)
    //模拟5个分区的数据
    val data=sc.parallelize(1 to 10,5)
    //根据尾号转变为10个分区,分别写到10个文件
    data.map((_,1)).partitionBy(new MyPartitioner(10)).map(_._1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")
  }
} 
scala> val  lines = sc.         //代码一行放不下,可以在圆点后回车,在下行继续输入
|  textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
scala> val wordCount = lines.flatMap(line => line.split(" ")).
|  map(word => (word, 1)).reduceByKey((a, b) => a + b)
scala> wordCount.collect()
scala> wordCount.foreach(println)
scala> val  lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/pairrdd/word.txt MapPartitionsRDD[1] at textFile at <console>:27
scala> val  pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:29
scala> pairRDD.foreach(println)
(i,1)
(love,1)
(hadoop,1)
……
scala> val  list = List("Hadoop","Spark","Hive","Spark")
list: List[String] = List(Hadoop, Spark, Hive, Spark) 
scala> val  rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:29 
scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at <console>:31
scala> pairRDD.foreach(println)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)
scala> pairRDD.groupByKey()
res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[15] at groupByKey at <console>:34
scala>  val  words = Array("one", "two", "two", "three", "three", "three") 
scala>  val  wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
scala>  val  wordCountsWithReduce = wordPairsRDD.reduceByKey(_+_)
scala>  val  wordCountsWithGroup = wordPairsRDD.
|  groupByKey().map(t => (t._1, t._2.sum))
scala> pairRDD.keys
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at keys at <console>:34
scala> pairRDD.keys.foreach(println)
Hadoop
Spark
Hive
Spark
scala> pairRDD.values
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at values at <console>:34 
scala> pairRDD.values.foreach(println)
1
1
1
1
scala> pairRDD.sortByKey()
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at sortByKey at <console>:34
scala> pairRDD.sortByKey().foreach(println)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)
scala> val  d1 = sc.parallelize(Array((“c”,8),(“b”,25),(“c”,17),(“a”,42),(“b”,4),(“d”,9),(“e”,17),(“c”,2),(“f”,29),(“g”,21),(“b”,9)))
scala> d1.reduceByKey(_+_).sortByKey(false).collect
res2: Array[(String, Int)] = Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))
scala> val  d2 = sc.parallelize(Array((“c”,8),(“b”,25),(“c”,17),(“a”,42),(“b”,4),(“d”,9),(“e”,17),(“c”,2),(“f”,29),(“g”,21),(“b”,9)))
scala> d2.reduceByKey(_+_).sortBy(_._2,false).collect
res4: Array[(String, Int)] = Array((a,42),(b,38),(f,29),(c,27),(g,21),(e,17),(d,9))
scala> pairRDD.mapValues(x => x+1)
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at mapValues at <console>:34
scala> pairRDD.mapValues(x => x+1).foreach(println)
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)
scala> val  pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
scala> val  pairRDD2 = sc.parallelize(Array(("spark","fast")))
scala> pairRDD1.join(pairRDD2)
scala> pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))
//Combine.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object Combine {
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("Combine").setMaster("local")
        val sc = new SparkContext(conf)
        val data = sc.parallelize(Array(("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92)),3)
        val res = data.combineByKey(
            (income) => (income,1),
            ( acc:(Int,Int), income ) => ( acc._1+income, acc._2+1 ),
            ( acc1:(Int,Int), acc2:(Int,Int) ) => ( acc1._1+acc2._1, acc1._2+acc2._2 )
        ).map({ case (key, value) => (key, value._1, value._1/value._2.toFloat) })
        res.repartition(1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/result")
    }
}
scala> val  rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
scala> val  textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
scala> textFile.first()
scala> val  textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")
scala> val  textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
scala> textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")
cd /usr/local/spark
./bin/spark-shell
scala> val  textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt",2)
scala> textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")
scala> val  textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> textFile.first()
scala> val  textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> val  textFile = sc.textFile("/user/hadoop/word.txt")
scala> val  textFile = sc.textFile("word.txt")
scala> val  textFile = sc.textFile("word.txt")
scala> textFile.saveAsTextFile("writeback")
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
scala> val  jsonStr = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
scala> jsonStr.foreach(println)
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
//JSONRead.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON
object JSONRead {
    def main(args: Array[String]) {
        val inputFile = "file:///usr/local/spark/examples/src/main/resources/people.json"
        val conf = new SparkConf().setAppName("JSONRead")
        val sc = new SparkContext(conf)
        val jsonStrs = sc.textFile(inputFile)
        val result = jsonStrs.map(s => JSON.parseFull(s))
        result.foreach( {r => r match {
                        case Some(map: Map[String, Any]) => println(map)
                        case None => println("Parsing failed")
                        case other => println("Unknown data structure: " + other)
                }
        }
        )
    }
}
cd  /usr/local/hadoop
./sbin/start-dfs.sh
cd  /usr/local/hbase
./bin/start-hbase.sh  //启动HBase
./bin/hbase shell  //启动HBase Shell
hbase> disable  ‘student’
hbase> drop  ‘student’
hbase> create  ‘student’,’info’
//首先录入student表的第一个学生记录
hbase> put  'student','1','info:name','Xueqian'
hbase> put  'student','1','info:gender','F'
hbase> put  'student','1','info:age','23'
//然后录入student表的第二个学生记录
hbase> put  'student','2','info:name','Weiliang'
hbase> put  'student','2','info:gender','M'
hbase> put  'student','2','info:age','24'
cd  /usr/local/spark/jars
mkdir  hbase
cd  hbase
cp  /usr/local/hbase/lib/hbase*.jar  ./
cp  /usr/local/hbase/lib/guava-12.0.1.jar  ./
cp  /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar  ./
cp  /usr/local/hbase/lib/protobuf-java-2.5.0.jar  ./
//SparkOperateHBase.scala
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf 
object SparkOperateHBase {
def main(args: Array[String]) {
    val conf = HBaseConfiguration.create()
    val sc = new SparkContext(new SparkConf())
    //设置查询的表名
    conf.set(TableInputFormat.INPUT_TABLE, "student")
    val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])
    val count = stuRDD.count()
    println("Students RDD Count:" + count)
    stuRDD.cache()
    //遍历输出
    stuRDD.foreach({ case (_,result) =>
        val key = Bytes.toString(result.getRow)
        val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
        val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
        val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
        println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
    })
}
}
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.5"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.5"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.5" 
$ /usr/local/spark/bin/spark-submit  --driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf  --class "SparkOperateHBase"  /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
//SparkWriteHBase.scala
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
object SparkWriteHBase {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")
    val sc = new SparkContext(sparkConf)
    val tablename = "student"
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    val job = new Job(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    //下面这行代码用于构建两行记录
val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27"))
    val rdd = indataRDD.map(_.split(",")).map{arr=>{
      //设置行键(row key)的值
val put = new Put(Bytes.toBytes(arr(0)))
      //设置info:name列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
//设置info:gender列的值
      put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))
      //设置info:age列的值
      put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))
      //构建一个键值对,作为rdd的一个元素
(new ImmutableBytesWritable, put)
    }}
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
  }
}
/usr/local/spark/bin/spark-submit  --driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf  --class "SparkWriteHBase"  /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
hbase> scan  'student'
1,1768,50,155
2,1218, 600,211
3,2239,788,242
4,3101,28,599
5,4899,290,129
6,3110,54,1201
7,4436,259,877
8,2369,7890,27
//TopN.scala
import org.apache.spark.{SparkConf, SparkContext}
object TopN {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TopN").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/mycode/rdd/examples",2)
    var num = 0;
    val result = lines.filter(line => (line.trim().length > 0) && (line.split(",").length == 4))
      .map(_.split(",")(2))
      .map(x => (x.toInt,""))
      .sortByKey(false)
      .map(x => x._1).take(5)
      .foreach(x => {
        num = num + 1
        println(num + "\t" + x)
      })
  }
}
//FileSort.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object FileSort {
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("FileSort")
        val sc = new SparkContext(conf)
        val dataFile = "file:///usr/local/spark/mycode/rdd/data"
        val lines = sc.textFile(dataFile,3)
        var index = 0
        val result = lines.filter(_.trim().length>0).map(n=>(n.trim.toInt,"")).partitionBy(new HashPartitioner(1)).sortByKey().map(t => {
      index += 1
            (index,t._1)
        })
        result.saveAsTextFile("file:///usrl/local/spark/mycode/rdd/examples/result")
    }
}
//SecondarySortKey.scala
package cn.edu.xmu.spark
class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {
def compare(other:SecondarySortKey):Int = {
    if (this.first - other.first !=0) {
         this.first - other.first
    } else {
      this.second - other.second
    }
  }
}
//SecondarySortApp.scala
package cn.edu.xmu.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object SecondarySortApp {
  def main(args:Array[String]){
     val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
       val sc = new SparkContext(conf)
       val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/examples/file1.txt", 1)
       val pairWithSortKey = lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),line))
       val sorted = pairWithSortKey.sortByKey(false)
       val sortedResult = sorted.map(sortedLine =>sortedLine._2)
       sortedResult.collect().foreach (println)
  }
}