【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]
Spark官网提供了两种方法来实现从RDD转换得到DataFrame,第一种方法是,利用反射来推断包含特定类型对象的RDD的schema;第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。
利用反射机制推断RDD模式
在利用反射机制推断RDD模式时,需要首先定义一个case class,因为,只有case class才能被Spark隐式地转换为DataFrame。
下面是在spark-shell中执行命令以及反馈的信息:
scala> import org.apache.spark.sql.SQLContext
scala> val sqlContext = new SQLContext(sc)
scala> import sqlContext.implicits._ //导入包,支持把一个RDD隐式转换为一个DataFrame
import sqlContext.implicits._
scala> case class Person(name: String, age: Int) //定义一个case class
defined class Person
scala> val people = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
16/11/15 11:19:18 INFO spark.SparkContext: Created broadcast 17 from textFile at <console>:37
people: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> people.registerTempTable("peopleTempTab") //必须注册为临时表才能供下面的查询使用
scala> val personsRDD = sqlContext.sql("select name,age from peopleTempTab where age > 20").rdd //最终生成一个RDD
personsRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[43] at rdd at <console>:35
//从上面的信息可以看出,生成的RDD的每个元素的类型是org.apache.spark.sql.Row
scala> personsRDD.foreach(t => println("Name:"+t(0),"Age:"+t(1))) //RDD中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值
(Name:Michael,Age:29)
(Name:Andy,Age:30)
使用编程方式定义RDD模式
当无法提前定义case class时,就需要采用编程方式定义RDD模式。
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@5f7bd970
scala> val people = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
people: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:28
scala> val schemaString = "name age" //定义一个模式字符串
schemaString: String = name age
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.types.{StructType,StructField,StringType}
import org.apache.spark.sql.types.{StructType, StructField, StringType}
scala> val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) //根据模式字符串生成模式
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true))
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
scala> val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) //对people这个RDD中的每一行元素都进行解析
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:32
scala> val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
peopleDataFrame: org.apache.spark.sql.DataFrame = [name: string, age: string]
scala> peopleDataFrame.registerTempTable("peopleTempTab") //必须注册为临时表才能供下面查询使用
scala> val personsRDD = sqlContext.sql("select name,age from peopleTempTab where age > 20").rdd
personsRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[7] at rdd at <console>:32
scala> personsRDD.foreach(t => println("Name:"+t(0)+",Age:"+t(1)))
(Name:Michael,Age:29)
(Name:Andy,Age:30)
在上面的代码中,people.map(_.split(","))实际上和people.map(line => line.split(","))这种表述是等价的,作用是对people这个RDD中的每一行元素都进行解析。比如,people这个RDD的第一行是:
Michael, 29
这行内容经过people.map(_.split(","))操作后,就得到一个集合{Michael,29}。后面经过map(p => Row(p(0), p(1).trim))操作时,这时的p就是这个集合{Michael,29},这时p(0)就是Micheael,p(1)就是29,map(p => Row(p(0), p(1).trim))就会生成一个Row对象,这个对象里面包含了两个字段的值,这个Row对象就构成了rowRDD中的其中一个元素。因为people有3行文本,所以,最终,rowRDD中会包含3个元素,每个元素都是org.apache.spark.sql.Row类型。实际上,Row对象只是对基本数据类型(比如整型或字符串)的数组的封装,本质就是一个定长的字段数组。
peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema),这条语句就相当于建立了rowRDD数据集和模式之间的对应关系,从而我们就知道对于rowRDD的每行记录,第一个字段的名称是schema中的“name”,第二个字段的名称是schema中的“age”。
把RDD保存成文件
这里介绍如何把RDD保存成文本文件,后面还会介绍其他格式的保存。
进入spark-shell执行下面命令:
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4a65c40
scala> val df = sqlContext.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")
可以看出,我们是把DataFrame转换成RDD,然后调用saveAsTextFile()保存成文本文件。在后面小节中,我们还会介绍其他保存方式。
上述过程执行结束后,可以打开第二个终端窗口,在Shell命令提示符下查看新生成的newpeople.txt:
cd /usr/local/spark/mycode/
ls
可以看到/usr/local/spark/mycode/这个目录下面有个newpeople.txt文件夹(注意,不是文件),这个文件夹中包含下面两个文件:
part-00000
_SUCCESS
不用理会_SUCCESS这个文件,只要看一下part-00000这个文件,可以用vim编辑器打开这个文件查看它的内容,该文件内容如下:
[null,Michael]
[30,Andy]
[19,Justin]
如果我们要再次把newpeople.txt中的数据加载到RDD中,可以直接使用newpeople.txt目录名称,而不需要使用part-00000文件,如下:
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/newpeople.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/newpeople.txt MapPartitionsRDD[11] at textFile at <console>:28
scala> textFile.foreach(println)
[null,Michael]
[30,Andy]
[19,Justin]