【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]
在进行Spark SQL编程之前,需要了解你当前安装的Spark是否包含Hive支持。Hive是基于Hadoop的数据仓库,可以让用户输入类似SQL语法的HiveQL语句,Hive会自动把HiveQL语句转换成底层的MapReduce任务去执行(要想了解更多数据仓库Hive的知识,可以参考厦门大学数据库实验室的Hive授课视频、Hive安装指南)。因为, 根据是否包含Hive支持,Spark提供了两个不同的入口,即HiveConext和SQLContext。
(1)SQLContext
SQLContext支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。
(2)HiveContext
HiveContext继承自SQLContext。HiveConext提供了HiveQL以及其他依赖于Hive的功能的支持,而SQLContext则只支持Spark SQL功能的一个子集,那些依赖于Hive的功能则被排除在外。不过,需要注意的是,即使我们没有安装数据仓库Hive,我们也可以正常使用HiveContext。
那么,我们怎么知道当前安装的Spark是否包含Hive支持呢?一般来说,Hive在编译时可以包含Hive支持,也可以不包含Hive支持。本教程在前面的“Spark安装和使用”章节介绍的是,让大家下载二进制版本的Spark,这个版本在编译时已经包含了Hive支持。有了Hive支持以后,我们就可以访问Hive表,并且支持使用HiveQL查询语言。再次强调,让Spark包含Hive支持,并不需要我们事先安装Hive,只有当我们的Spark SQL程序需要访问Hive中的数据时(也就是借助于Hive实现数据存储时),才需要安装Hive。
下面我们就介绍如何使用SQLContext来创建DataFrame。
请进入Linux系统,打开“终端”,进入Shell命令提示符状态。
首先,请找到样例数据。 Spark已经为我们提供了几个样例数据,就保存在“/usr/local/spark/examples/src/main/resources/”这个目录下,这个目录下有两个样例数据people.json和people.txt。
people.json文件的内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
people.txt文件的内容如下:
Michael, 29
Andy, 30
Justin, 19
下面我们就介绍如何从people.json文件中读取数据并生成DataFrame并显示数据(从people.txt文件生成DataFrame需要后面将要介绍的另外一种方式)。
请使用如下命令打开spark-shell:
cd /usr/local/spark
./bin/spark-shell
进入到spark-shell状态后执行下面命令:
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4a65c40
scala> val df = sqlContext.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
现在,我们可以执行一些常用的DataFrame操作。
//打印模式信息
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
//选择多列
scala> df.select(df("name"),df("age")+1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
//条件过滤
scala> df.filter(df("age") > 20 ).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
//分组聚合
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
//排序
scala> df.sort(df("age").desc).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
//多列排序
scala> df.sort(df("age").desc, df("name").asc).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
//对列进行重命名
scala> df.select(df("name").as("username"),df("age")).show()
+--------+----+
|username| age|
+--------+----+
| Michael|null|
| Andy| 30|
| Justin| 19|
+--------+----+