Spark入门:键值对RDD(Python版)

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
返回Spark教程首页
推荐纸质教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》

虽然RDD中可以包含任何类型的对象,但是“键值对”是一种比较常见的RDD元素类型,分组和聚合操作中经常会用到。
Spark操作中经常会用到“键值对RDD”(Pair RDD),用于完成聚合计算。普通RDD里面存储的数据类型是Int、String等,而“键值对RDD”里面存储的数据类型是“键值对”。

创建RDD之前的准备工作

在即将进行相关的实践操作之前,我们首先要登录Linux系统(本教程统一采用hadoop用户登录),然后,打开命令行“终端”,请按照下面的命令启动Hadoop中的HDFS组件:

cd  /usr/local/hadoop
./sbin/start-dfs.sh

然后,我们按照下面命令启动pyspark:

cd /usr/local/spark
./bin/pyspark

然后,新建第二个“终端”,方法是,在前面已经建设的第一个终端窗口的左上方,点击“终端”菜单,在弹出的子菜单中选择“新建终端”,就可以打开第二个终端窗口,现在,我们切换到第二个终端窗口,在第二个终端窗口中,执行以下命令,进入之前已经创建好的“/usr/local/spark/mycode/”目录,在这个目录下新建pairrdd子目录,用来存放本章的代码和相关文件:

cd /usr/local/spark/mycode/
mkdir pairrdd

然后,使用vim编辑器,在pairrdd目录下新建一个word.txt文件,你可以在文件里面随便输入几行英文语句用来测试。

经过上面的准备工作以后,我们就可以开始创建RDD了。

键值对RDD的创建

第一种创建方式:从文件中加载

我们可以采用多种方式创建键值对RDD,其中一种主要方式是使用map()函数来实现,如下:

>>>  lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
>>> pairRDD = lines.flatMap(lambda line : line.split(" ")).map(lambda word : (word,1))
>>> pairRDD.foreach(print)
(i,1)
(love,1)
(hadoop,1)
(i,1)
(love,1)
(Spark,1)
(Spark,1)
(is,1)
(fast,1)
(than,1)
(hadoop,1)

我们之前在“第一个Spark应用程序:WordCount”章节已经详细解释过类似代码,所以,上面代码不再做细节分析。从代码执行返回信息:pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at :29,可以看出,返回的结果是键值对类型的RDD,即RDD[(String, Int)]。从pairRDD.foreach(println)执行的打印输出结果也可以看到,都是由(单词,1)这种形式的键值对。

第二种创建方式:通过并行集合(列表)创建RDD

>>> list = ["Hadoop","Spark","Hive","Spark"]
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word : (word,1))
>>> pairRDD.foreach(print)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)

我们下面实例都是采用这种方式得到的pairRDD作为基础。

常用的键值对转换操作

常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等,下面我们通过实例来介绍。

reduceByKey(func)

reduceByKey(func)的功能是,使用func函数合并具有相同键的值。比如,reduceByKey((a,b) => a+b),有四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),对具有相同key的键值对进行合并后的结果就是:("spark",3)、("hadoop",8)。可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value,比如,对于两个具有相同key的键值对("spark",1)、("spark",2),a就是1,b就是2。
我们对上面第二种方式创建得到的pairRDD进行reduceByKey()操作,代码如下:

>>> pairRDD.reduceByKey(lambda a,b : a+b).foreach(print)
(Spark,2)
(Hive,1)
(Hadoop,1)

groupByKey()

groupByKey()的功能是,对具有相同键的值进行分组。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),采用groupByKey()后得到的结果是:("spark",(1,2))和("hadoop",(3,5))。
我们对上面第二种方式创建得到的pairRDD进行groupByKey()操作,代码如下:

>>> pairRDD.groupByKey()
PythonRDD[11] at RDD at PythonRDD.scala:48
>>> pairRDD.groupByKey().foreach(print)
('spark', <pyspark.resultiterable.ResultIterable object at 0x7f1869f81f60>)
('hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f1869f81f60>)
('hive', <pyspark.resultiterable.ResultIterable object at 0x7f1869f81f60>)

keys()

keys()只会把键值对RDD中的key返回形成一个新的RDD。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的RDD,采用keys()后得到的结果是一个RDD[Int],内容是{"spark","spark","hadoop","hadoop"}。
我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:

>>> pairRDD.keys()
PythonRDD[20] at RDD at PythonRDD.scala:48
>>> pairRDD.keys().foreach(print)
Hadoop
Spark
Hive
Spark

values()

values()只会把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的RDD,采用values()后得到的结果是一个RDD[Int],内容是{1,2,3,5}。
我们对上面第二种方式创建得到的pairRDD进行values()操作,代码如下:

scala> pairRDD.values()
PythonRDD[22] at RDD at PythonRDD.scala:48
scala> pairRDD.values().foreach(print)
1
1
1
1

sortByKey()

sortByKey()的功能是返回一个根据键排序的RDD。
我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:

>>> pairRDD.sortByKey()
PythonRDD[30] at RDD at PythonRDD.scala:48
>>> pairRDD.sortByKey().foreach(print)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)

mapValues(func)

我们经常会遇到一种情形,我们只想对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,Spark提供了mapValues(func),它的功能是,对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的pairRDD,如果执行pairRDD.mapValues(lambda x : x+1),就会得到一个新的键值对RDD,它包含下面四个键值对("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。
我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:

>>> pairRDD.mapValues(lambda x : x+1)
PythonRDD[38] at RDD at PythonRDD.scala:48
>>> pairRDD.mapValues( lambda x : x+1).foreach(print)
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)

join

join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

比如,pairRDD1是一个键值对集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一个键值对集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{("spark",1,"fast"),("spark",2,"fast")}。对于这个实例,我们下面在pyspark中运行一下:

>>> pairRDD1 = sc.parallelize([('spark',1),('spark',2),('hadoop',3),('hadoop',5)])
>>> pairRDD2 = sc.parallelize([('spark','fast')])
>>> pairRDD1.join(pairRDD2)
PythonRDD[49] at RDD at PythonRDD.scala:48 
>>> pairRDD1.join(pairRDD2).foreach(print)

一个综合实例

题目:给定一组键值对("spark",2),("hadoop",6),("hadoop",4),("spark",6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
很显然,对于上面的题目,结果是很显然的,("spark",4),("hadoop",5)。
下面,我们在pyspark中演示代码执行过程:

>>> rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
>>> rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1])).mapValues(lambda x : (x[0] / x[1])).collect()

要注意,上面语句中,mapValues(lambda x : (x,1))中出现了变量x,reduceByKey(lambda x,y : (x[0]+y[0],x[1]+ y[1]))中也出现了变量x,mapValues(lambda x : (x[0] / x[1]))也出现了变量x。但是,必须要清楚,这三个地方出现的x,虽然都具有相同的变量名称x,但是,彼此之间没有任何关系,它们都处在不同的变量作用域内。如果你觉得这样会误导自己,造成理解上的掌握,实际上,你可以把三个出现x的地方分别替换成x1、x2、x3也是可以的,但是,很显然没有必要这么做。
上面是完整的语句和执行过程,可能不太好理解,下面我们进行逐条语句分解给大家介绍。每条语句执行后返回的屏幕信息,可以帮助大家更好理解语句的执行效果,比如生成了什么类型的RDD。

(1)首先构建一个数组,数组里面包含了四个键值对,然后,调用parallelize()方法生成RDD,从执行结果反馈信息,可以看出,rdd类型是RDD[(String, Int)]。

>>> rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])

(2)针对构建得到的rdd,我们调用mapValues()函数,把rdd中的每个每个键值对(key,value)的value部分进行修改,把value转换成键值对(value,1),其中,数值1表示这个key在rdd中出现了1次,为什么要记录出现次数呢?因为,我们最终要计算每个key对应的平均值,所以,必须记住这个key出现了几次,最后用value的总和除以key的出现次数,就是这个key对应的平均值。比如,键值对("spark",2)经过mapValues()函数处理后,就变成了("spark",(2,1)),其中,数值1表示“spark”这个键的1次出现。下面就是rdd.mapValues()操作在spark-shell中的执行演示:
scala> rdd.mapValues(x => (x,1)).collect()
res23: Array[(String, (Int, Int))] = Array((spark,(2,1)), (hadoop,(6,1)), (hadoop,(4,1)), (spark,(6,1)))
上面语句中,collect()是一个行动操作,功能是以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数。

(3)然后,再对上一步得到的RDD调用reduceByKey()函数,在spark-shell中演示如下:

>>> rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1])).collect()

这里,必须要十分准确地理解reduceByKey()函数的功能。可以参考上面我们对该函数的介绍,reduceByKey(func)的功能是使用func函数合并具有相同键的值。这里的func函数就是Lamda表达式 x,y : (x[0]+y[0],x[1] + y[1]),这个表达式中,x和y都是value,而且是具有相同key的两个键值对所对应的value,比如,在这个例子中, ("hadoop",(6,1))和("hadoop",(4,1))这两个键值对具有相同的key,所以,对于函数中的输入参数(x,y)而言,x就是(6,1),序列从0开始计算,x[0]表示这个键值对中的第1个元素6,x[1]表示这个键值对中的第二个元素1,y就是(4,1),y[0]表示这个键值对中的第1个元素4,y[1]表示这个键值对中的第二个元素1,所以,函数体(x[0]+y[0],x[1] + y[2]),相当于生成一个新的键值对(key,value),其中,key是x[0]+y[0],也就是6+4=10,value是x[1] + y[1],也就是1+1=2,因此,函数体(x[0]+y[0],x[1] + y[1])执行后得到的value是(10,2),但是,要注意,这个(10,2)是reduceByKey()函数执行后,"hadoop"这个key对应的value,也就是,实际上reduceByKey()函数执行后,会生成一个键值对("hadoop",(10,2)),其中,10表示hadoop书籍的总销量,2表示两天。同理,reduceByKey()函数执行后会生成另外一个键值对("spark",(8,2))。

(4)最后,就可以求出最终结果。我们可以对上面得到的两个键值对("hadoop",(10,2))和("spark",(8,2))所构成的RDD执行mapValues()操作,得到每种书的每天平均销量。当第一个键值对("hadoop",(10,2))输入给mapValues(x => (x[0] / x[1]))操作时,key是"hadoop",保持不变,value是(10,2),会被赋值给Lamda表达式x => (x[0] / x[1]中的x,因此,x的值就是(10,2),x[0]就是10,表示hadoop书总销量是10,x[1]就是2,表示2天,因此,hadoop书籍的每天平均销量就是x[0] / x[1],也就是5。mapValues()输出的一个键值对就是("hadoop",5)。同理,当把("spark",(8,2))输入给mapValues()时,会计算得到另外一个键值对("spark",4)。在pyspark中演示如下:

>>> rdd.mapValues(lambda x: (x,1)).reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1])).mapValues(lambda x: (x[0]/x[1])).collect()
 [('hadoop', 5.0), ('spark', 4.0)]