林子雨、赖永炫、陶继平编著《Spark编程基础》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码
第5章 RDD编程
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[12] at textFile at <console>:27
scala> val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> val lines = sc.textFile("/user/hadoop/word.txt")
scala> val lines = sc.textFile("word.txt")
scala> val array = Array(1,2,3,4,5)
scala> val rdd = sc.parallelize(array)
scala> val list = List(1,2,3,4,5)
scala> val rdd = sc.parallelize(list)
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val linesWithSpark=lines.filter(line => line.contains("Spark"))
scala> data=Array(1,2,3,4,5)
scala> val rdd1= sc.parallelize(data)
scala> val rdd2=rdd1.map(x=>x+10)
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val words=lines.map(line => line.split(" "))
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val words=lines.flatMap(line => line.split(" "))
scala> val rdd=sc.parallelize(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd.count()
res0: Long = 5
scala> rdd.first()
res1: Int = 1
scala> rdd.take(3)
res2: Array[Int] = Array(1,2,3)
scala> rdd.reduce((a,b)=>a+b)
res3: Int = 15
scala> rdd.collect()
res4: Array[Int] = Array(1,2,3,4,5)
scala> rdd.foreach(elem=>println(elem))
1
2
3
4
5
scala> val lines = sc.textFile("data.txt")
scala> val lineLengths = lines.map(s => s.length)
scala> val totalLength = lineLengths.reduce((a, b) => a + b)
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29
scala> println(rdd.count()) //行动操作,触发一次真正从头到尾的计算
3
scala> println(rdd.collect().mkString(",")) //行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29
scala> rdd.cache() //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
scala> println(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
3
scala> println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive
scala> val array = Array(1,2,3,4,5)
scala> val rdd = sc.parallelize(array,2) //设置两个分区
scala> val data = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)
data: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> data.partitions.size //显示data这个RDD的分区数量
res2: Int=2
scala> val rdd = data.repartition(1) //对data这个RDD进行重新分区
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :26
scala> rdd.partitions.size
res4: Int = 1
import org.apache.spark.{Partitioner, SparkContext, SparkConf}
//自定义分区类,需要继承org.apache.spark.Partitioner类
class MyPartitioner(numParts:Int) extends Partitioner{
//覆盖分区数
override def numPartitions: Int = numParts
//覆盖分区号获取函数
override def getPartition(key: Any): Int = {
key.toString.toInt%10
}
}
object TestPartitioner {
def main(args: Array[String]) {
val conf=new SparkConf()
val sc=new SparkContext(conf)
//模拟5个分区的数据
val data=sc.parallelize(1 to 10,5)
//根据尾号转变为10个分区,分别写到10个文件
data.map((_,1)).partitionBy(new MyPartitioner(10)).map(_._1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")
}
}
scala> val lines = sc. //代码一行放不下,可以在圆点后回车,在下行继续输入
| textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
scala> val wordCount = lines.flatMap(line => line.split(" ")).
| map(word => (word, 1)).reduceByKey((a, b) => a + b)
scala> wordCount.collect()
scala> wordCount.foreach(println)
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/pairrdd/word.txt MapPartitionsRDD[1] at textFile at <console>:27
scala> val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:29
scala> pairRDD.foreach(println)
(i,1)
(love,1)
(hadoop,1)
……
scala> val list = List("Hadoop","Spark","Hive","Spark")
list: List[String] = List(Hadoop, Spark, Hive, Spark)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:29
scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at <console>:31
scala> pairRDD.foreach(println)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)
scala> pairRDD.groupByKey()
res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[15] at groupByKey at <console>:34
scala> val words = Array("one", "two", "two", "three", "three", "three")
scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
scala> val wordCountsWithReduce = wordPairsRDD.reduceByKey(_+_)
scala> val wordCountsWithGroup = wordPairsRDD.
| groupByKey().map(t => (t._1, t._2.sum))
scala> pairRDD.keys
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at keys at <console>:34
scala> pairRDD.keys.foreach(println)
Hadoop
Spark
Hive
Spark
scala> pairRDD.values
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at values at <console>:34
scala> pairRDD.values.foreach(println)
1
1
1
1
scala> pairRDD.sortByKey()
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at sortByKey at <console>:34
scala> pairRDD.sortByKey().foreach(println)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)
scala> val d1 = sc.parallelize(Array((“c”,8),(“b”,25),(“c”,17),(“a”,42),(“b”,4),(“d”,9),(“e”,17),(“c”,2),(“f”,29),(“g”,21),(“b”,9)))
scala> d1.reduceByKey(_+_).sortByKey(false).collect
res2: Array[(String, Int)] = Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))
scala> val d2 = sc.parallelize(Array((“c”,8),(“b”,25),(“c”,17),(“a”,42),(“b”,4),(“d”,9),(“e”,17),(“c”,2),(“f”,29),(“g”,21),(“b”,9)))
scala> d2.reduceByKey(_+_).sortBy(_._2,false).collect
res4: Array[(String, Int)] = Array((a,42),(b,38),(f,29),(c,27),(g,21),(e,17),(d,9))
scala> pairRDD.mapValues(x => x+1)
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at mapValues at <console>:34
scala> pairRDD.mapValues(x => x+1).foreach(println)
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)
scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
scala> val pairRDD2 = sc.parallelize(Array(("spark","fast")))
scala> pairRDD1.join(pairRDD2)
scala> pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))
//Combine.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object Combine {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Combine").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(Array(("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92)),3)
val res = data.combineByKey(
(income) => (income,1),
( acc:(Int,Int), income ) => ( acc._1+income, acc._2+1 ),
( acc1:(Int,Int), acc2:(Int,Int) ) => ( acc1._1+acc2._1, acc1._2+acc2._2 )
).map({ case (key, value) => (key, value._1, value._1/value._2.toFloat) })
res.repartition(1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/result")
}
}
scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
scala> textFile.first()
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
scala> textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")
cd /usr/local/spark
./bin/spark-shell
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt",2)
scala> textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")
scala> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> textFile.first()
scala> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> val textFile = sc.textFile("/user/hadoop/word.txt")
scala> val textFile = sc.textFile("word.txt")
scala> val textFile = sc.textFile("word.txt")
scala> textFile.saveAsTextFile("writeback")
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
scala> val jsonStr = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
scala> jsonStr.foreach(println)
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
//JSONRead.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON
object JSONRead {
def main(args: Array[String]) {
val inputFile = "file:///usr/local/spark/examples/src/main/resources/people.json"
val conf = new SparkConf().setAppName("JSONRead")
val sc = new SparkContext(conf)
val jsonStrs = sc.textFile(inputFile)
val result = jsonStrs.map(s => JSON.parseFull(s))
result.foreach( {r => r match {
case Some(map: Map[String, Any]) => println(map)
case None => println("Parsing failed")
case other => println("Unknown data structure: " + other)
}
}
)
}
}
cd /usr/local/hadoop
./sbin/start-dfs.sh
cd /usr/local/hbase
./bin/start-hbase.sh //启动HBase
./bin/hbase shell //启动HBase Shell
hbase> disable ‘student’
hbase> drop ‘student’
hbase> create ‘student’,’info’
//首先录入student表的第一个学生记录
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
//然后录入student表的第二个学生记录
hbase> put 'student','2','info:name','Weiliang'
hbase> put 'student','2','info:gender','M'
hbase> put 'student','2','info:age','24'
cd /usr/local/spark/jars
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
//SparkOperateHBase.scala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkOperateHBase {
def main(args: Array[String]) {
val conf = HBaseConfiguration.create()
val sc = new SparkContext(new SparkConf())
//设置查询的表名
conf.set(TableInputFormat.INPUT_TABLE, "student")
val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = stuRDD.count()
println("Students RDD Count:" + count)
stuRDD.cache()
//遍历输出
stuRDD.foreach({ case (_,result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
})
}
}
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.5"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.5"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.5"
$ /usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf --class "SparkOperateHBase" /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
//SparkWriteHBase.scala
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
object SparkWriteHBase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "student"
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//下面这行代码用于构建两行记录
val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27"))
val rdd = indataRDD.map(_.split(",")).map{arr=>{
//设置行键(row key)的值
val put = new Put(Bytes.toBytes(arr(0)))
//设置info:name列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
//设置info:gender列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))
//设置info:age列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))
//构建一个键值对,作为rdd的一个元素
(new ImmutableBytesWritable, put)
}}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
}
}
/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf --class "SparkWriteHBase" /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
hbase> scan 'student'
1,1768,50,155
2,1218, 600,211
3,2239,788,242
4,3101,28,599
5,4899,290,129
6,3110,54,1201
7,4436,259,877
8,2369,7890,27
//TopN.scala
import org.apache.spark.{SparkConf, SparkContext}
object TopN {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TopN").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/mycode/rdd/examples",2)
var num = 0;
val result = lines.filter(line => (line.trim().length > 0) && (line.split(",").length == 4))
.map(_.split(",")(2))
.map(x => (x.toInt,""))
.sortByKey(false)
.map(x => x._1).take(5)
.foreach(x => {
num = num + 1
println(num + "\t" + x)
})
}
}
//FileSort.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object FileSort {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("FileSort")
val sc = new SparkContext(conf)
val dataFile = "file:///usr/local/spark/mycode/rdd/data"
val lines = sc.textFile(dataFile,3)
var index = 0
val result = lines.filter(_.trim().length>0).map(n=>(n.trim.toInt,"")).partitionBy(new HashPartitioner(1)).sortByKey().map(t => {
index += 1
(index,t._1)
})
result.saveAsTextFile("file:///usrl/local/spark/mycode/rdd/examples/result")
}
}
//SecondarySortKey.scala
package cn.edu.xmu.spark
class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {
def compare(other:SecondarySortKey):Int = {
if (this.first - other.first !=0) {
this.first - other.first
} else {
this.second - other.second
}
}
}
//SecondarySortApp.scala
package cn.edu.xmu.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object SecondarySortApp {
def main(args:Array[String]){
val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/examples/file1.txt", 1)
val pairWithSortKey = lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),line))
val sorted = pairWithSortKey.sortByKey(false)
val sortedResult = sorted.map(sortedLine =>sortedLine._2)
sortedResult.collect().foreach (println)
}
}