厦门大学林子雨编著《Spark编程基础(Python版,第2版)》教材中的命令行和代码(教材官网)
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Spark编程基础(Python版,第2版)》教材中的所有命令行和代码
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> lines.foreach(print)
Hadoop is good
Spark is fast
Spark is better
如果要在PyCharm中调试程序,可以编写如下代码:
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("My App")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines.foreach(print)
sc.stop()
>>> lines = sc.textFile("hdfs://hadoop01:9000/user/hadoop/word.txt")
>>> lines = sc.textFile("/user/hadoop/word.txt")
>>> lines = sc.textFile("word.txt")
>>> lines.foreach(print)
Hadoop is good
Spark is fast
Spark is better
>>> array = [1,2,3,4,5]
>>> rdd = sc.parallelize(array)
>>> rdd.foreach(print)
1
2
3
4
5
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> linesWithSpark = lines.filter(lambda line: "Spark" in line)
>>> linesWithSpark.foreach(print)
Spark is better
Spark is fast
>>> data = [1,2,3,4,5]
>>> rdd1 = sc.parallelize(data)
>>> rdd2 = rdd1.map(lambda x:x+10)
>>> rdd2.collect()
[11, 12, 13]
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> words = lines.map(lambda line:line.split(" "))
>>> words.foreach(print)
['Hadoop', 'is', 'good']
['Spark', 'is', 'fast']
['Spark', 'is', 'better']
在上面的例子中,我们传递给map()算子的是一个Labmda表达式,实际上,也可以单独定义一个方法传递给map()算子,实现代码如下:
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("My App")
sc = SparkContext(conf = conf)
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
def add(data):
return data + 10
print(rdd1.map(add).collect())
sc.stop()
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("My App")
sc = SparkContext(conf = conf)
rdd1 = sc.parallelize([1, 2, 3, 4], 2) # 设置两个分区
def myFunction(iter):
result = list()
print("=============")
for it in iter:
result.append(it*2)
return result
rdd2 = rdd1.mapPartitions(myFunction)
print(rdd2.collect())
sc.stop()
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> words = lines.flatMap(lambda line:line.split(" "))
>>> words.collect()
['Hadoop', 'is', 'good', 'Spark', 'is', 'fast', 'Spark', 'is', 'better']
>>> rdd1 = sc.parallelize([[1,2], [3,4]])
>>> rdd2 = rdd1.flatMap(lambda x:x)
>>> rdd2.collect()
[1, 2, 3, 4]
>>> rdd1 = sc.parallelize(["Hadoop", "Spark", "Storm", "Flink", "Flume"])
>>> rdd2 = rdd1.groupBy(lambda x:x[0])
>>> rdd2.collect()
[('S', <pyspark.resultiterable.ResultIterable object at 0x7fda9f0a6b20>), ('H', <pyspark.resultiterable.ResultIterable object at 0x7fda9f08b310>), ('F', <pyspark.resultiterable.ResultIterable object at 0x7fda9f08b4c0>)]
>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1), \
... ("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.groupByKey()
>>> words1.foreach(print)
('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('better', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>)
('fast', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('good', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('Spark', <pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>)
('is', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e10>)
>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), \
... ("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.reduceByKey(lambda a,b:a+b)
>>> words1.foreach(print)
('good', 1)
('Hadoop', 1)
('better', 1)
('Spark', 2)
('fast', 1)
('is', 3)
>>> rdd1 = sc.parallelize([3, 1, 2, 4],1) # 设置1个分区,保证全局有序
>>> rdd2 = rdd1.sortBy(lambda x:x)
>>> rdd2.foreach(print)
1
2
3
4
>>> rdd2 = rdd1.sortBy(lambda x:x,False) #使用参数False,实现降序排序
>>> rdd2.foreach(print)
4
3
2
1
>>> rdd1 = sc.parallelize(["Flink","Spark","Spark"])
>>> rdd2 = rdd1.distinct()
>>> rdd2.foreach(print)
Flink
Spark
>>> rdd1 = sc.parallelize([1,2,3])
>>> rdd2 = sc.parallelize([3,4,5])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd3.collect()
[1, 2, 3, 3, 4, 5]
>>> rdd1 = sc.parallelize([1,2,3])
>>> rdd2 = sc.parallelize([3,4,5])
>>> rdd3 = rdd1.intersection (rdd2)
>>> rdd3.collect()
[3]
>>> rdd1 = sc.parallelize([1,2,3])
>>> rdd2 = sc.parallelize([3,4,5])
>>> rdd3 = rdd1.subtract (rdd2)
>>> rdd3.collect()
[1, 2]
>>> rdd1 = sc.parallelize([1,2,3])
>>> rdd2 = sc.parallelize(["Hadoop","Spark","Flink"])
>>> rdd3 = rdd1.zip (rdd2)
>>> rdd3.collect()
[(1, 'Hadoop'), (2, 'Spark'), (3, 'Flink')]
>>> rdd = sc.parallelize([1,2,3,4,5])
>>> rdd.count()
5
>>> rdd.first()
1
>>> rdd.take(3)
[1, 2, 3]
>>> rdd.reduce(lambda a,b:a+b)
15
>>> rdd.collect()
[1, 2, 3, 4, 5]
>>> rdd.foreach(lambda elem:print(elem))
1
2
3
4
5
>>> rdd = sc.parallelize([("hadoop",1),("spark",1),("spark",1)])
>>> result = rdd.countByKey()
>>> print(result)
defaultdict(<class 'int'>, {'hadoop': 1, 'spark': 2})
>>> rdd1 = sc.parallelize([1, 2, 3, 4], 2)
>>> result = rdd1.aggregate(0,lambda a,b:a+b,lambda a,b:a+b)
>>> print(result)
10
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> lineLengths = lines.map(lambda s:len(s))
>>> totalLength = lineLengths.reduce(lambda a,b:a+b)
>>> print(totalLength)
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> print(rdd.count()) #行动操作,触发一次真正从头到尾的计算
3
>>> print(','.join(rdd.collect())) #行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> rdd.cache() #会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这
#时rdd还没有被计算生成
>>> print(rdd.count()) #第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()
#才会被执行,把这个rdd放到缓存中
3
>>> print(','.join(rdd.collect())) #第二次行动操作,不需要触发从头到尾的计算,只需要重复使
#用上面缓存中的rdd
Hadoop,Spark,Hive
>>> list = [1,2,3,4,5]
>>> rdd = sc.parallelize(list,2) //设置两个分区
>>> data = sc.parallelize([1,2,3,4,5],2)
>>> len(data.glom().collect()) #显示data这个RDD的分区数量
2
>>> rdd = data.repartition(1) #对data这个RDD进行重新分区
>>> len(rdd.glom().collect()) #显示rdd这个RDD的分区数量
1
打开一个Linux终端,使用vim编辑器创建一个代码文件“/usr/local/spark/mycode/rdd/TestPartitioner.py”,输入以下代码:
from pyspark import SparkConf, SparkContext
def MyPartitioner(key):
print("MyPartitioner is running")
print('The key is %d' % key)
return key%10
def main():
print("The main function is running")
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
data = sc.parallelize(range(10),5)
data.map(lambda x:(x,1)) \
.partitionBy(10,MyPartitioner) \
.map(lambda x:x[0]) \
.saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")
if __name__ == '__main__':
main()
cd /usr/local/spark/mycode/rdd
python3 TestPartitioner.py
cd /usr/local/spark/mycode/rdd
/usr/local/spark/bin/spark-submit TestPartitioner.py
>>> lines = sc. \
... textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> wordCount = lines.flatMap(lambda line:line.split(" ")). \
... map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
>>> print(wordCount.collect())
[('good', 1), ('Spark', 2), ('is', 3), ('better', 1), ('Hadoop', 1), ('fast', 1)]
>>> lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
>>> pairRDD = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
>>> pairRDD.foreach(print)
('I', 1)
('love', 1)
('Hadoop', 1)
……
>>> list = ["Hadoop","Spark","Hive","Spark"]
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word:(word,1))
>>> pairRDD.foreach(print)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
>>> pairRDD = sc.parallelize([("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)])
>>> pairRDD.reduceByKey(lambda a,b:a+b).foreach(print)
('Spark', 2)
('Hive', 1)
('Hadoop', 1)
>>> list = [("spark",1),("spark",2),("hadoop",3),("hadoop",5)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.groupByKey()
PythonRDD[27] at RDD at PythonRDD.scala:48
>>> pairRDD.groupByKey().foreach(print)
('hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>)
('spark', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>)
>>> words = ["one", "two", "two", "three", "three", "three"]
>>> wordPairsRDD = sc.parallelize(words).map(lambda word:(word, 1))
>>> wordCountsWithReduce = wordPairsRDD.reduceByKey(lambda a,b:a+b)
>>> wordCountsWithReduce.foreach(print)
('one', 1)
('two', 2)
('three', 3)
>>> wordCountsWithGroup = wordPairsRDD.groupByKey(). \
... map(lambda t:(t[0],sum(t[1])))
>>> wordCountsWithGroup.foreach(print)
('two', 2)
('three', 3)
('one', 1)
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.keys().foreach(print)
Hadoop
Spark
Hive
Spark
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.values().foreach(print)
1
1
1
1
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.foreach(print)
('Hadoop', 1)
('Spark', 1)
('Hive', 1)
('Spark', 1)
>>> pairRDD.sortByKey().foreach(print)
('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)
>>> d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42), \
... ("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)])
>>> d1.reduceByKey(lambda a,b:a+b).sortByKey(False).collect()
[('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)]
>>> d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42), \
... ("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)])
>>> d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x,False).collect()
[('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)]
>>> d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[0],False).collect()
[('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)]
>>> d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],False).collect()
[('a', 42), ('b', 38), ('f', 29), ('c', 27), ('g', 21), ('e', 17), ('d', 9)]
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD1 = pairRDD.mapValues(lambda x:x+1)
>>> pairRDD1.foreach(print)
('Hadoop', 2)
('Spark', 2)
('Hive', 2)
('Spark', 2)
>>> pairRDD1 = sc. \
... parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)])
>>> pairRDD2 = sc.parallelize([("spark","fast")])
>>> pairRDD3 = pairRDD1.join(pairRDD2)
>>> pairRDD3.foreach(print)
('spark', (1, 'fast'))
('spark', (2, 'fast'))
为了实现该功能,可以创建一个代码文件“/usr/local/spark/mycode/rdd/Combine.py”,并输入如下代码:
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("My App")
sc = SparkContext(conf = conf)
data = sc.parallelize([("company-1", 88), ("company-1", 96), ("company-1", 85), \
("company-2", 94), ("company-2", 86), ("company-2", 74), \
("company-3", 86), ("company-3", 88), ("company-3", 92)], 3)
res = data.combineByKey( \
lambda income: (income, 1), \
lambda acc, income: (acc[0] + income, acc[1] + 1), \
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])). \
map(lambda x: (x[0], x[1][0], x[1][0] / float(x[1][1])))
res.repartition(1) \
.saveAsTextFile("file:///usr/local/spark/mycode/rdd/combineresult")
sc.stop()
cd /usr/local/spark/mycode/rdd
/usr/local/spark/bin/spark-submit Combine.py
>>> rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
>>> rdd.mapValues(lambda x:(x,1)).\
... reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).\
... mapValues(lambda x:x[0]/x[1]).collect()
[('hadoop', 5.0), ('spark', 4.0)]
>>> textFile = sc.\
... textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> textFile.first()
'Hadoop is good'
>>> textFile = sc.\
... textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> textFile.\
... saveAsTextFile("file:///usr/local/spark/mycode/rdd/writeback")
cd /usr/local/spark
./bin/pyspark
cd /usr/local/spark
./bin/pyspark --master local[1]
>>> textFile = sc.\
... textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> textFile.\
... saveAsTextFile("file:///usr/local/spark/mycode/rdd/writeback")
>>> textFile = sc.textFile("hdfs://hadoop01:9000/user/hadoop/word.txt")
>>> textFile = sc.textFile("hdfs://hadoop01:9000/user/hadoop/word.txt")
>>> textFile = sc.textFile("/user/hadoop/word.txt")
>>> textFile = sc.textFile("word.txt")
>>> textFile = sc.textFile("word.txt")
>>> textFile.saveAsTextFile("writeback")
service mysql start
mysql -u root -p #屏幕会提示输入密码
mysql> create database spark;
mysql> use spark;
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into student values(1,'Xueqian','F',23);
mysql> insert into student values(2,'Weiliang','M',24);
mysql> select * from student;
conda activate pyspark
pip install pymysql
新建一个代码文件SparkReadMySQL.py,内容如下:
from pyspark import SparkConf, SparkContext
import pymysql
if __name__ == '__main__':
conf = SparkConf().setAppName("MySQL RDD Example")
sc = SparkContext(conf=conf)
host = "localhost"
port = 3306
user = "root"
password = "123456"
database = "spark"
table = "student"
# 创建MySQL连接
conn = pymysql.connect(host=host, port=port, user=user, password=password, database=database)
cursor = conn.cursor()
# 执行SQL查询
sql = "SELECT * FROM {}".format(table)
cursor.execute(sql)
# 将结果转换为RDD
rdd = sc.parallelize(cursor.fetchall())
rdd.foreach(print)
sc.stop()
新建一个代码文件SparkWriteMySQL.py,内容如下:
from pyspark import SparkConf, SparkContext
import pymysql
if __name__ == '__main__':
conf = SparkConf().setAppName("MySQL RDD Example")
sc = SparkContext(conf=conf)
rddData = sc.parallelize([(3, "Rongcheng", "M", 26), (4, "Guanhua", "M", 27)])
def insertIntoMySQL(tuple):
host = "localhost"
port = 3306
user = "root"
password = "123456"
database = "spark"
table = "student"
conn = pymysql.connect(host=host, port=port, user=user, password=password, database=database)
cursor = conn.cursor()
sql = "INSERT INTO {} (id, name, gender, age) VALUES (%d, '%s', '%s', %d)".format(table)
value = (tuple[0], tuple[1], tuple[2], tuple[3])
cursor.execute(sql % value)
conn.commit()
cursor.close()
conn.close()
rddData.foreach(insertIntoMySQL)
sc.stop()
如下为一个样例文件file0.txt:
1,1768,50,155
2,1218,600,211
3,2239,788,242
4,3101,28,599
5,4899,290,129
6,3110,54,1201
7,4436,259,877
8,2369,7890,27
实现上述功能的代码文件“/usr/local/spark/mycode/rdd/TopN.py”的内容如下:
#!/usr/bin/env python3
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file0.txt")
result1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(","))== 4))
result2 = result1.map(lambda x:x.split(",")[2])
result3 = result2.map(lambda x:(int(x),""))
result4 = result3.repartition(1)
result5 = result4.sortByKey(False)
result6 = result5.map(lambda x:x[0])
result7 = result6.take(5)
for a in result7:
print(a)
根据以上思路,可以编写如下程序:
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("My App")
sc = SparkContext(conf = conf)
# 1.获取原始数据:时间戳,省份,城市,用户,商品
sourceRDD = sc.textFile("file:///home/hadoop/sales.txt")
dataRDD = sourceRDD.filter(lambda line : len(line.split(",")) == 5)
# 2.将原始数据进行结构转换
# 把数据从(时间戳, 省份, 城市, 用户, 商品)
# 转变成((省份, 商品), 1)
def myMapFunction1(line):
datas = line.split(",")
return ((datas[1], datas[4]), 1)
mapRDD = dataRDD.map(myMapFunction1)
# 3.将数据进行分组聚合
# 得到((省份, 商品), sum)形式的结果,sum是汇总结果
reduceRDD = mapRDD.reduceByKey(lambda a,b:a+b)
# 4.将聚合的结果进行数据结构转换
# 从((省份, 商品), sum)转变成(省份, (商品, sum))
def myMapFunction2(tuple):
x = tuple[0] # 值是(省份, 商品)
y = tuple[1] # 值是sum
return (x[0], (x[1],y))
newMapRDD = reduceRDD.map(myMapFunction2)
# 5.根据省份进行分组
# 得到的数据形式是(省份, ((商品1, sum1), (商品2, sum2), ...))
groupRDD = newMapRDD.groupByKey()
# 6.将分组后的数据进行组内排序(降序),取前3名
resultRDD = groupRDD.mapValues(lambda iter: sorted(list(iter), key=lambda tup: tup[1],reverse=True)[:3])
# 7.把数据打印出到控制台
resultRDD.foreach(print)
sc.stop()
实现上述功能的代码文件“/usr/local/spark/mycode/rdd/FileSort.py”的内容如下:
#!/usr/bin/env python3
from pyspark import SparkConf, SparkContext
index = 0
def getindex():
global index
index+=1
return index
def main():
conf = SparkConf().setMaster("local[1]").setAppName("FileSort")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesort/file*.txt")
index = 0
result1 = lines.filter(lambda line:(len(line.strip()) > 0))
result2 = result1.map(lambda x:(int(x.strip()),""))
result3 = result2.repartition(1)
result4 = result3.sortByKey(True)
result5 = result4.map(lambda x:x[0])
result6 = result5.map(lambda x:(getindex(),x))
result6.foreach(print)
result6.saveAsTextFile("file:///usr/local/spark/mycode/rdd/filesort/sortresult")
if __name__ == '__main__':
main()
下面是实现二次排序功能的完整的代码文件“/usr/local/spark/mycode/rdd/SecondarySortApp.py”的具体内容:
#!/usr/bin/env python3
from operator import gt
from pyspark import SparkContext, SparkConf
class SecondarySortKey():
def __init__(self, k):
self.column1 = k[0]
self.column2 = k[1]
def __gt__(self, other):
if other.column1 == self.column1:
return gt(self.column2,other.column2)
else:
return gt(self.column1, other.column1)
def main():
conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')
sc = SparkContext(conf=conf)
file="file:///usr/local/spark/mycode/rdd/secondarysort/file4.txt"
rdd1 = sc.textFile(file)
rdd2 = rdd1.filter(lambda x:(len(x.strip()) > 0))
rdd3 = rdd2.map(lambda x:((int(x.split(" ")[0]),int(x.split(" ")[1])),x))
rdd4 = rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1]))
rdd5 = rdd4.sortByKey(False)
rdd6 = rdd5.map(lambda x:x[1])
rdd6.foreach(print)
if __name__ == '__main__':
main()