代码-第5章 RDD编程-林子雨编著《Spark编程基础(Python版,第2版)》

大数据学习路线图

厦门大学林子雨编著《Spark编程基础(Python版,第2版)》教材中的命令行和代码(教材官网
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Spark编程基础(Python版,第2版)》教材中的所有命令行和代码

>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> lines.foreach(print)
Hadoop is good
Spark is fast
Spark is better

如果要在PyCharm中调试程序,可以编写如下代码:

# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster("local[*]").setAppName("My App")
    sc = SparkContext(conf = conf)
    lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
    lines.foreach(print)
    sc.stop()
>>> lines = sc.textFile("hdfs://hadoop01:9000/user/hadoop/word.txt")
>>> lines = sc.textFile("/user/hadoop/word.txt")
>>> lines = sc.textFile("word.txt")
>>> lines.foreach(print)
Hadoop is good
Spark is fast
Spark is better
>>> array = [1,2,3,4,5]
>>> rdd = sc.parallelize(array)
>>> rdd.foreach(print)
1
2
3
4
5
>>>  lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>>  linesWithSpark = lines.filter(lambda line: "Spark" in line)
>>> linesWithSpark.foreach(print)
Spark is better
Spark is fast
>>> data = [1,2,3,4,5]
>>> rdd1 = sc.parallelize(data)
>>> rdd2 = rdd1.map(lambda x:x+10)
>>> rdd2.collect()
[11, 12, 13]
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> words = lines.map(lambda line:line.split(" "))
>>> words.foreach(print)
['Hadoop', 'is', 'good']
['Spark', 'is', 'fast']
['Spark', 'is', 'better']

在上面的例子中,我们传递给map()算子的是一个Labmda表达式,实际上,也可以单独定义一个方法传递给map()算子,实现代码如下:

# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster("local[*]").setAppName("My App")
    sc = SparkContext(conf = conf)
    data = [1,2,3,4,5]
    rdd1 = sc.parallelize(data)
    def add(data):
        return data + 10
    print(rdd1.map(add).collect())
    sc.stop()
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster("local[*]").setAppName("My App")
    sc = SparkContext(conf = conf)
    rdd1 = sc.parallelize([1, 2, 3, 4], 2)  # 设置两个分区
    def myFunction(iter):
        result = list()
        print("=============")
        for it in iter:
            result.append(it*2)
        return result
    rdd2 = rdd1.mapPartitions(myFunction)
    print(rdd2.collect())
    sc.stop()
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> words = lines.flatMap(lambda line:line.split(" "))
>>> words.collect()
['Hadoop', 'is', 'good', 'Spark', 'is', 'fast', 'Spark', 'is', 'better']
>>> rdd1 = sc.parallelize([[1,2], [3,4]])
>>> rdd2 = rdd1.flatMap(lambda x:x)
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd1 = sc.parallelize(["Hadoop", "Spark", "Storm", "Flink", "Flume"])
>>> rdd2 = rdd1.groupBy(lambda x:x[0])
>>> rdd2.collect()
[('S', <pyspark.resultiterable.ResultIterable object at 0x7fda9f0a6b20>), ('H', <pyspark.resultiterable.ResultIterable object at 0x7fda9f08b310>), ('F', <pyspark.resultiterable.ResultIterable object at 0x7fda9f08b4c0>)]
>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1), \
... ("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.groupByKey()
>>> words1.foreach(print)
('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('better', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>)
('fast', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('good', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('Spark', <pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>)
('is', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e10>)
>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), \
... ("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.reduceByKey(lambda a,b:a+b)
>>> words1.foreach(print)
('good', 1)
('Hadoop', 1)
('better', 1)
('Spark', 2)
('fast', 1)
('is', 3)
>>> rdd1 = sc.parallelize([3, 1, 2, 4],1) # 设置1个分区,保证全局有序
>>> rdd2 = rdd1.sortBy(lambda x:x)
>>> rdd2.foreach(print)
1
2
3
4
>>> rdd2 = rdd1.sortBy(lambda x:x,False)  #使用参数False,实现降序排序
>>> rdd2.foreach(print)
4
3
2
1
>>> rdd1 = sc.parallelize(["Flink","Spark","Spark"])
>>> rdd2 = rdd1.distinct()
>>> rdd2.foreach(print)
Flink
Spark
>>> rdd1 = sc.parallelize([1,2,3])
>>> rdd2 = sc.parallelize([3,4,5])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd3.collect()
[1, 2, 3, 3, 4, 5]
>>> rdd1 = sc.parallelize([1,2,3])
>>> rdd2 = sc.parallelize([3,4,5])
>>> rdd3 = rdd1.intersection (rdd2)
>>> rdd3.collect()
[3]
>>> rdd1 = sc.parallelize([1,2,3])
>>> rdd2 = sc.parallelize([3,4,5])
>>> rdd3 = rdd1.subtract (rdd2)
>>> rdd3.collect()
[1, 2]
>>> rdd1 = sc.parallelize([1,2,3])
>>> rdd2 = sc.parallelize(["Hadoop","Spark","Flink"])
>>> rdd3 = rdd1.zip (rdd2)
>>> rdd3.collect()
[(1, 'Hadoop'), (2, 'Spark'), (3, 'Flink')]
>>> rdd = sc.parallelize([1,2,3,4,5])
>>> rdd.count()
5
>>> rdd.first()
1
>>> rdd.take(3)
[1, 2, 3]
>>> rdd.reduce(lambda a,b:a+b)
15
>>> rdd.collect()
[1, 2, 3, 4, 5]
>>> rdd.foreach(lambda elem:print(elem))
1
2
3
4
5
>>> rdd = sc.parallelize([("hadoop",1),("spark",1),("spark",1)])
>>> result = rdd.countByKey()
>>> print(result)                                                               
defaultdict(<class 'int'>, {'hadoop': 1, 'spark': 2})
>>> rdd1 = sc.parallelize([1, 2, 3, 4], 2)
>>> result = rdd1.aggregate(0,lambda a,b:a+b,lambda a,b:a+b)
>>> print(result)
10
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> lineLengths = lines.map(lambda s:len(s))
>>> totalLength = lineLengths.reduce(lambda a,b:a+b)
>>> print(totalLength)
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> print(rdd.count()) #行动操作,触发一次真正从头到尾的计算
3
>>> print(','.join(rdd.collect())) #行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> rdd.cache()  #会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这
                    #时rdd还没有被计算生成
>>> print(rdd.count()) #第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()
                            #才会被执行,把这个rdd放到缓存中
3
>>> print(','.join(rdd.collect())) #第二次行动操作,不需要触发从头到尾的计算,只需要重复使
                                          #用上面缓存中的rdd
Hadoop,Spark,Hive
>>> list = [1,2,3,4,5]
>>> rdd = sc.parallelize(list,2)  //设置两个分区
>>> data = sc.parallelize([1,2,3,4,5],2)
>>> len(data.glom().collect())  #显示data这个RDD的分区数量
2
>>> rdd = data.repartition(1)   #对data这个RDD进行重新分区
>>> len(rdd.glom().collect())   #显示rdd这个RDD的分区数量
1

打开一个Linux终端,使用vim编辑器创建一个代码文件“/usr/local/spark/mycode/rdd/TestPartitioner.py”,输入以下代码:

from pyspark import SparkConf, SparkContext

def MyPartitioner(key):
    print("MyPartitioner is running")
    print('The key is %d' % key)
    return key%10

def main():
    print("The main function is running")
    conf  =  SparkConf().setMaster("local").setAppName("MyApp")
    sc  =  SparkContext(conf = conf)
    data = sc.parallelize(range(10),5)
    data.map(lambda x:(x,1)) \
             .partitionBy(10,MyPartitioner) \
             .map(lambda x:x[0]) \
             .saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")

if __name__ == '__main__':
    main()
cd /usr/local/spark/mycode/rdd
python3 TestPartitioner.py
cd /usr/local/spark/mycode/rdd
/usr/local/spark/bin/spark-submit TestPartitioner.py
>>> lines = sc. \
... textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> wordCount = lines.flatMap(lambda line:line.split(" ")). \
... map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
>>> print(wordCount.collect())
[('good', 1), ('Spark', 2), ('is', 3), ('better', 1), ('Hadoop', 1), ('fast', 1)]
>>> lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
>>> pairRDD = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
>>> pairRDD.foreach(print)
('I', 1)
('love', 1)
('Hadoop', 1)
……
>>> list = ["Hadoop","Spark","Hive","Spark"]
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word:(word,1))
>>> pairRDD.foreach(print)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
>>> pairRDD = sc.parallelize([("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)])
>>> pairRDD.reduceByKey(lambda a,b:a+b).foreach(print)
('Spark', 2)
('Hive', 1)
('Hadoop', 1)
>>> list = [("spark",1),("spark",2),("hadoop",3),("hadoop",5)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.groupByKey()
PythonRDD[27] at RDD at PythonRDD.scala:48
>>> pairRDD.groupByKey().foreach(print)
('hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>)
('spark', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>)
>>> words = ["one", "two", "two", "three", "three", "three"]
>>> wordPairsRDD = sc.parallelize(words).map(lambda word:(word, 1))
>>> wordCountsWithReduce = wordPairsRDD.reduceByKey(lambda a,b:a+b)
>>> wordCountsWithReduce.foreach(print)
('one', 1)
('two', 2)
('three', 3)
>>> wordCountsWithGroup = wordPairsRDD.groupByKey(). \
... map(lambda t:(t[0],sum(t[1])))
>>> wordCountsWithGroup.foreach(print)
('two', 2)
('three', 3)
('one', 1)
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.keys().foreach(print)
Hadoop
Spark
Hive
Spark
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.values().foreach(print)
1
1
1
1
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.foreach(print)
('Hadoop', 1)
('Spark', 1)
('Hive', 1)
('Spark', 1)
>>> pairRDD.sortByKey().foreach(print)
('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)
>>> d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42), \
... ("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)])
>>> d1.reduceByKey(lambda a,b:a+b).sortByKey(False).collect()
[('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)]
>>> d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42), \
... ("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)])
>>> d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x,False).collect()
[('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)]
>>> d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[0],False).collect()
[('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)]
>>> d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],False).collect()
[('a', 42), ('b', 38), ('f', 29), ('c', 27), ('g', 21), ('e', 17), ('d', 9)]
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD1 = pairRDD.mapValues(lambda x:x+1)
>>> pairRDD1.foreach(print)
('Hadoop', 2)
('Spark', 2)
('Hive', 2)
('Spark', 2)
>>> pairRDD1 = sc. \
... parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)])
>>> pairRDD2 = sc.parallelize([("spark","fast")])
>>> pairRDD3 = pairRDD1.join(pairRDD2)
>>> pairRDD3.foreach(print)
('spark', (1, 'fast'))
('spark', (2, 'fast'))

为了实现该功能,可以创建一个代码文件“/usr/local/spark/mycode/rdd/Combine.py”,并输入如下代码:

# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster("local[*]").setAppName("My App")
    sc = SparkContext(conf = conf)
    data = sc.parallelize([("company-1", 88), ("company-1", 96), ("company-1", 85), \
                           ("company-2", 94), ("company-2", 86), ("company-2", 74), \
                           ("company-3", 86), ("company-3", 88), ("company-3", 92)], 3)
    res = data.combineByKey( \
        lambda income: (income, 1), \
        lambda acc, income: (acc[0] + income, acc[1] + 1), \
        lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])). \
        map(lambda x: (x[0], x[1][0], x[1][0] / float(x[1][1])))
    res.repartition(1) \
        .saveAsTextFile("file:///usr/local/spark/mycode/rdd/combineresult")
    sc.stop()
cd /usr/local/spark/mycode/rdd
/usr/local/spark/bin/spark-submit Combine.py
>>> rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
>>> rdd.mapValues(lambda x:(x,1)).\
... reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).\
... mapValues(lambda x:x[0]/x[1]).collect()
[('hadoop', 5.0), ('spark', 4.0)]
>>> textFile = sc.\
... textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> textFile.first()
'Hadoop is good'
>>> textFile = sc.\
... textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> textFile.\
... saveAsTextFile("file:///usr/local/spark/mycode/rdd/writeback")
cd /usr/local/spark
./bin/pyspark
cd /usr/local/spark
./bin/pyspark  --master local[1]
>>>  textFile = sc.\
...  textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>>  textFile.\
...  saveAsTextFile("file:///usr/local/spark/mycode/rdd/writeback")
>>> textFile = sc.textFile("hdfs://hadoop01:9000/user/hadoop/word.txt")
>>> textFile = sc.textFile("hdfs://hadoop01:9000/user/hadoop/word.txt")
>>> textFile = sc.textFile("/user/hadoop/word.txt")
>>> textFile = sc.textFile("word.txt")
>>> textFile = sc.textFile("word.txt")
>>> textFile.saveAsTextFile("writeback")
service mysql start
mysql -u root -p  #屏幕会提示输入密码
mysql> create database spark;
mysql> use spark;
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into student values(1,'Xueqian','F',23);
mysql> insert into student values(2,'Weiliang','M',24);
mysql> select * from student;
conda activate pyspark
pip install pymysql

新建一个代码文件SparkReadMySQL.py,内容如下:

from pyspark import SparkConf, SparkContext
import pymysql
if __name__ == '__main__':
    conf = SparkConf().setAppName("MySQL RDD Example")
    sc = SparkContext(conf=conf)
    host = "localhost"
    port = 3306
    user = "root"
    password = "123456"
    database = "spark"
    table = "student"
    # 创建MySQL连接
    conn = pymysql.connect(host=host, port=port, user=user, password=password, database=database)
    cursor = conn.cursor()
    # 执行SQL查询
    sql = "SELECT * FROM {}".format(table)
    cursor.execute(sql)
    # 将结果转换为RDD
    rdd = sc.parallelize(cursor.fetchall())
    rdd.foreach(print)
    sc.stop()

新建一个代码文件SparkWriteMySQL.py,内容如下:

from pyspark import SparkConf, SparkContext
import pymysql
if __name__ == '__main__':
    conf = SparkConf().setAppName("MySQL RDD Example")
    sc = SparkContext(conf=conf)
    rddData = sc.parallelize([(3, "Rongcheng", "M", 26), (4, "Guanhua", "M", 27)])
    def insertIntoMySQL(tuple):
        host = "localhost"
        port = 3306
        user = "root"
        password = "123456"
        database = "spark"
        table = "student"
        conn = pymysql.connect(host=host, port=port, user=user, password=password, database=database)
        cursor = conn.cursor()
        sql = "INSERT INTO {} (id, name, gender, age) VALUES (%d, '%s', '%s', %d)".format(table)
        value = (tuple[0], tuple[1], tuple[2], tuple[3])
        cursor.execute(sql % value)
        conn.commit()
        cursor.close()
        conn.close()
    rddData.foreach(insertIntoMySQL)
    sc.stop()

如下为一个样例文件file0.txt:

1,1768,50,155
2,1218,600,211
3,2239,788,242
4,3101,28,599
5,4899,290,129
6,3110,54,1201
7,4436,259,877
8,2369,7890,27

实现上述功能的代码文件“/usr/local/spark/mycode/rdd/TopN.py”的内容如下:

#!/usr/bin/env python3

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file0.txt")
result1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(","))== 4))
result2 = result1.map(lambda x:x.split(",")[2])
result3 = result2.map(lambda x:(int(x),""))
result4 = result3.repartition(1)
result5 = result4.sortByKey(False)
result6 = result5.map(lambda x:x[0])
result7 = result6.take(5)
for a in result7:
    print(a)

根据以上思路,可以编写如下程序:

# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster("local[*]").setAppName("My App")
    sc = SparkContext(conf = conf)
    # 1.获取原始数据:时间戳,省份,城市,用户,商品
    sourceRDD = sc.textFile("file:///home/hadoop/sales.txt")
    dataRDD = sourceRDD.filter(lambda line : len(line.split(",")) == 5)
    # 2.将原始数据进行结构转换
    # 把数据从(时间戳, 省份, 城市, 用户, 商品)
    # 转变成((省份, 商品), 1)
    def myMapFunction1(line):
        datas = line.split(",")
        return ((datas[1], datas[4]), 1)
    mapRDD = dataRDD.map(myMapFunction1)
    # 3.将数据进行分组聚合
    # 得到((省份, 商品), sum)形式的结果,sum是汇总结果
    reduceRDD = mapRDD.reduceByKey(lambda a,b:a+b)
    # 4.将聚合的结果进行数据结构转换
    # 从((省份, 商品), sum)转变成(省份, (商品, sum))
    def myMapFunction2(tuple):
        x = tuple[0] # 值是(省份, 商品)
        y = tuple[1] # 值是sum
        return (x[0], (x[1],y))
    newMapRDD = reduceRDD.map(myMapFunction2)
    # 5.根据省份进行分组
    # 得到的数据形式是(省份, ((商品1, sum1), (商品2, sum2), ...))
    groupRDD = newMapRDD.groupByKey()
    # 6.将分组后的数据进行组内排序(降序),取前3名
    resultRDD = groupRDD.mapValues(lambda iter: sorted(list(iter), key=lambda tup: tup[1],reverse=True)[:3])
    # 7.把数据打印出到控制台
    resultRDD.foreach(print)
    sc.stop()

实现上述功能的代码文件“/usr/local/spark/mycode/rdd/FileSort.py”的内容如下:

#!/usr/bin/env python3

from pyspark import SparkConf, SparkContext

index = 0

def getindex():
    global index
    index+=1
    return index

def main():
    conf = SparkConf().setMaster("local[1]").setAppName("FileSort")
    sc = SparkContext(conf = conf)
    lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesort/file*.txt")
    index = 0
    result1 = lines.filter(lambda line:(len(line.strip()) > 0))
    result2 = result1.map(lambda x:(int(x.strip()),""))
    result3 = result2.repartition(1)
    result4 = result3.sortByKey(True)
    result5 = result4.map(lambda x:x[0])
    result6 = result5.map(lambda x:(getindex(),x))
    result6.foreach(print)
result6.saveAsTextFile("file:///usr/local/spark/mycode/rdd/filesort/sortresult")
if __name__ == '__main__':
    main()

下面是实现二次排序功能的完整的代码文件“/usr/local/spark/mycode/rdd/SecondarySortApp.py”的具体内容:

#!/usr/bin/env python3

from operator import gt
from pyspark import SparkContext, SparkConf

class SecondarySortKey():
    def __init__(self, k):
        self.column1 = k[0]
        self.column2 = k[1]

    def __gt__(self, other): 
        if other.column1 == self.column1:
            return gt(self.column2,other.column2)
        else:
            return gt(self.column1, other.column1)

def main():
    conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')
    sc = SparkContext(conf=conf)
    file="file:///usr/local/spark/mycode/rdd/secondarysort/file4.txt"
    rdd1 = sc.textFile(file)
    rdd2 = rdd1.filter(lambda x:(len(x.strip()) > 0))
    rdd3 = rdd2.map(lambda x:((int(x.split(" ")[0]),int(x.split(" ")[1])),x))
    rdd4 = rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1]))
    rdd5 = rdd4.sortByKey(False)
    rdd6 = rdd5.map(lambda x:x[1])
    rdd6.foreach(print)

if __name__ == '__main__':
    main()