厦门大学林子雨编著《Spark编程基础(Python版,第2版)》教材中的命令行和代码(教材官网)
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Spark编程基础(Python版,第2版)》教材中的所有命令行和代码
# coding:utf8
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession\
.builder\
.master("local[*]")\
.appName("Simple Application")\
.getOrCreate()
logFile = "file:///usr/local/spark/README.md"
logDF = spark.read.text(logFile)
numAs = logDF.filter(logDF["value"].contains("a")).count()
numBs = logDF.filter(logDF["value"].contains("b")).count()
print('Lines with a: %s, Lines with b: %s' % (numAs, numBs))
spark.stop()
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("Simple App")
sc = SparkContext(conf=conf)
bookRDD = sc.parallelize([("spark", 2), ("hadoop", 6), ("hadoop", 4), ("spark", 6)])
saleRDD = bookRDD.map(lambda x: (x[0], (x[1], 1))) \
.reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1])) \
.map(lambda x : (x[0], x[1][0] / x[1][1]))
saleRDD.foreach(print)
sc.stop()
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
if __name__ == '__main__':
spark = SparkSession\
.builder\
.master("local[*]")\
.appName("Simple Application")\
.getOrCreate()
bookDF = spark \
.createDataFrame([("spark", 2), ("hadoop", 6), ("hadoop", 4), ("spark", 6)]) \
.toDF("book", "amount")
avgDF = bookDF.groupBy("book").agg(avg("amount"))
avgDF.show()
spark.stop()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/users.parquet"
>>> df = spark.read.format("parquet").load(filePath)
>>> df.show()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/users.parquet"
>>> df = spark.read.parquet(filePath)
>>> df.write.format("parquet").mode("overwrite").option("compression","snappy").
... save("file:///home/hadoop/otherusers")
>>> df.write.parquet("file:///home/hadoop/otherusers")
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.json"
>>> df = spark.read.format("json").load(filePath)
>>> df.show()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.json"
>>> df = spark.read.json(filePath)
>>> df.write.json("file:///home/hadoop/otherpeople")
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.csv"
>>> schema = "name STRING,age INT,job STRING"
>>> df = spark.read.format("csv").schema(schema).option("header","true").\
... option("sep",";").load(filePath)
>>> df.show()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.csv"
>>> schema = "name STRING,age INT,job STRING"
>>> df = spark.read.schema(schema).option("header","true").\
... option("sep",";").csv(filePath)
>>> df.write.csv("file:///home/hadoop/anotherpeople")
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.txt"
>>> df = spark.read.format("text").load(filePath)
>>> df.show()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.txt"
>>> df = spark.read.text(filePath)
>>> df.write.text("file:///home/hadoop/newpeople")
>>> df.write.format("text").save("file:///home/hadoop/newpeople")
>>> df = spark.createDataFrame([\
("Xiaomei","Female","21"),\
("Xiaoming","Male","22"),\
("Xiaoxue","Female","23")]).\
toDF("name","sex","age")
>>> df.show()
>>> from pyspark.sql import SparkSession,Row
>>> from pyspark.sql.types import IntegerType, StringType, StructField, StructType
>>> schema = StructType([StructField("name", StringType(), True),\
StructField("age", IntegerType(), True),\
StructField("sex", StringType(), True)])
>>> data = []
>>> data.append(Row("Xiaomei", 21, "Female"))
>>> data.append(Row("Xiaoming", 22, "Male"))
>>> data.append(Row("Xiaoxue", 23, "Female"))
>>> df = spark.createDataFrame(data, schema)
>>> df.show()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.json"
>>> df = spark.read.json(filePath)
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.json"
>>> df = spark.read.json(filePath)
>>> df.show()
>>> df.createTempView("people")
>>> spark.sql("SELECT * FROM people").show()
>>> spark.sql("SELECT name FROM people where age > 20").show()
>>> from pyspark.sql import SparkSession,Row
>>> from pyspark.sql.functions import from_unixtime
>>> from pyspark.sql.types import IntegerType, StringType, LongType, StructField, StructType
>>> schema = StructType([StructField("name", StringType(), True),\
... StructField("age", IntegerType(), True),\
... StructField("create_time",LongType(),True)])
>>> data = []
>>> data.append(Row("Xiaomei",21,1580432800))
>>> data.append(Row("Xiaoming",22,1580436400))
>>> data.append(Row("Xiaoxue",23,1580438800))
>>> df = spark.createDataFrame(data, schema)
>>> df.show()
>>> df.createTempView("user_info")
>>> spark.sql("SELECT name,age,from_unixtime(create_time,'yyyy-MM-dd HH:mm:ss') FROM user_info").show()
>>> spark.udf.register("toUpperCaseUDF", lambda column : column.upper())
<function <lambda> at 0x7f2a9c03f8b0>
>>> spark.sql("SELECT toUpperCaseUDF(name),age,from_unixtime(create_time,'yyyy-MM-dd HH:mm:ss') FROM user_info").show()
>>> from pyspark.sql import Row
>>> people = spark.sparkContext.\
... textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").\
... map(lambda line: line.split(",")).\
... map(lambda p: Row(name=p[0], age=int(p[1])))
>>> schemaPeople = spark.createDataFrame(people)
#必须注册为临时表才能供下面的查询使用
>>> schemaPeople.createOrReplaceTempView("people")
>>> personsDF = spark.sql("select name,age from people where age > 20")
#DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值
>>> personsRDD=personsDF.rdd.map(lambda p:"Name: "+p.name+ ","+"Age: "+str(p.age))
>>> personsRDD.foreach(print)
Name: Michael,Age: 29
Name: Andy,Age: 30
>>> from pyspark.sql.types import *
>>> from pyspark.sql import Row
#下面生成“表头”
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
>>> schema = StructType(fields)
#下面生成“表中的记录”
>>> lines = spark.sparkContext.\
... textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
>>> parts = lines.map(lambda x: x.split(","))
>>> people = parts.map(lambda p: Row(p[0], p[1].strip()))
#下面把“表头”和“表中的记录”拼装在一起
>>> schemaPeople = spark.createDataFrame(people, schema)
#注册一个临时表供后面的查询使用
>>> schemaPeople.createOrReplaceTempView("people")
>>> results = spark.sql("SELECT name,age FROM people")
>>> results.show()
sudo service mysql start
mysql -u root -p #屏幕会提示输入密码
mysql> create database spark;
mysql> use spark;
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into student values(1,'Xueqian','F',23);
mysql> insert into student values(2,'Weiliang','M',24);
mysql> select * from student;
>>> jdbcDF = spark.read \
.format("jdbc") \
.option("driver","com.mysql.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/spark?useSSL=false") \
.option("dbtable", "student") \
.option("user", "root") \
.option("password", "123456") \
.load()
>>> jdbcDF.show()
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession. \
Builder(). \
appName('SparkReadMySQL'). \
master('local[*]'). \
getOrCreate()
jdbcDF = spark.read \
.format("jdbc") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/spark?useSSL=false") \
.option("dbtable", "student") \
.option("user", "root") \
.option("password", "123456") \
.load()
jdbcDF.show()
spark.stop()
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#设置模式信息
schema = StructType([StructField("id", IntegerType(), True), \
StructField("name", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("age", IntegerType(), True)])
#设置两条数据,表示两个学生的信息
studentRDD = spark \
.sparkContext \
.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]) \
.map(lambda x:x.split(" "))
#创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
#建立Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '123456'
prop['driver'] = "com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark?useSSL=false",'student','append', prop)
mysql> select * from student;
conda activate pyspark
pip install pandas
>>> import numpy as np
>>> import pandas as pd
>>> from pandas import Series,DataFrame
>>> obj=Series([3,5,6,8,9,2])
>>> obj
>>> obj2=Series([3,5,6,8,9,2],index=['a','b','c','d','e','f'])
>>> obj2
>>> obj2['a']
>>> obj2[['b','d','f']]
>>> import numpy as np
>>> import pandas as pd
>>> from pandas import Series,DataFrame
>>> data = {'sno':['95001', '95002', '95003', '95004'],
'name':['Xiaoming','Zhangsan','Lisi','Wangwu'],
'sex':['M','F','F','M'],
'age':[22,25,24,23]}
>>> frame=DataFrame(data)
>>> frame
>>> frame=DataFrame(data,columns=['name','sno','sex','age'])
>>> frame
>>> frame=DataFrame(data,columns=['sno','name','sex','age','grade'],\
... index=['a','b','c','d'])
>>> frame
>>> frame['sno']
>>> frame.name
>>> frame.loc['b']
代码文件PysparkPandasDataframe.py
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType,StructField, StructType
import pandas as pd
if __name__ == '__main__':
spark = SparkSession\
.builder\
.master("local[*]")\
.appName("Simple Application")\
.getOrCreate()
# 定义pandas的DataFrame
pd_df = pd.DataFrame({
'id':[1,2,3,4,5],
'name':['zhangsan ','lisi','wangwu','maliu','liuqi'],
'age':[24,26,22,28,24]
}
)
# 展示pandas中的DataFrame
print(pd_df)
# 定义PySpark的DataFrame的数据结构
schema = StructType([StructField("id", IntegerType(), True), \
StructField("name", StringType(), True), \
StructField("age", IntegerType(), True)])
# 将pandas中的DataFrame转换为PySPark中的DataFrame
spark_df=spark.createDataFrame(pd_df,schema)
# 展示PySpark中的DataFrame
spark_df.show()
# 将PySpark中的DataFrame转化为pandas中的DataFrame
pd_df2=spark_df.toPandas()
# 展示pandas中的DataFrame
print(pd_df2)
假设有一个文本文件people_data.txt,其数据内容如下:
1,zhangsan,22
2,lisi,26
3,wangwu,28
4,maliu,24
pip install pyarrow
代码文件PysparkPandasFunction.py
# coding:utf8
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import IntegerType, StringType, StructField, StructType, Row
import pandas as pd
if __name__ == '__main__':
spark = SparkSession\
.builder\
.master("local[*]")\
.appName("Simple Application")\
.getOrCreate()
peopleRDD = spark.sparkContext\
.textFile("file:///home/hadoop/people_data.txt")\
.map(lambda line: line.split(","))\
.map(lambda p: Row(id=int(p[0]), name=p[1], age=int(p[2])))
schema = StructType([StructField("id", IntegerType(), True), \
StructField("name", StringType(), True), \
StructField("age", IntegerType(), True)])
peopleDF = peopleRDD.toDF(schema)
# 注册自定义函数,需要包括两个步骤
# 第1步是使用装饰器@进行装饰
@F.pandas_udf(IntegerType())
# 定义自定义函数
def myFunction(a:pd.Series) -> int:
# a是接收参数,接收的是整列数据,所以需要指定数据类型为pd.Series
# 对接收到的a进行计算
result = a.sum()
return result
# 注册自定义函数的第2步:使用PySpark的register方法进行注册
sum_func = spark.udf.register("sum_func",myFunction)
# 在DSL风格语法中使用自定义函数
sumDF = peopleDF.select(sum_func("age"))
sumDF.show()
# 在SQL风格语法中使用自定义函数
peopleDF.createOrReplaceTempView("people")
sumDF2 = spark.sql("select sum_func(age) from people")
sumDF2.show()
spark.stop()
有一个电影评分数据集,数据格式如下:
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
程序MovieRating.py
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
if __name__ == '__main__':
spark = SparkSession\
.builder\
.master("local[*]")\
.appName("Simple Application")\
.getOrCreate()
# 1.读取数据集
filePath = "file:///home/hadoop/ratings.dat"
schema = StructType([StructField("user_id", StringType(), True),\
StructField("movie_id", StringType(), True), \
StructField("rating", IntegerType(), True), \
StructField("ts",StringType(),True)])
ratingsDF = spark.read.format("csv").\
option("sep","::").\
option("header",False).\
option("encoding","utf-8").\
schema(schema).\
load(filePath)
print("1.数据集如下:")
ratingsDF.show()
ratingsDF.createTempView("ratings")
# 2. 求每个用户的平均打分
print("2.求每个用户的平均打分")
ratingsDF.groupby("user_id").\
avg("rating").\
withColumnRenamed("avg(rating)","avg_rating").\
withColumn("avg_rating",F.round("avg_rating",3)).\
orderBy("avg_rating",ascending=False).\
show()
# 3. 求每部电影的平均打分
print("3.求每部电影的平均打分")
ratingsDF.createTempView("movie_ratings")
spark.sql("""
SELECT movie_id, ROUND(AVG(rating),3) AS avg_rating FROM movie_ratings GROUP BY movie_id ORDER BY avg_rating DESC
""").show()
# 4.查询大于平均打分的电影的数量
print("4.查询大于平均打分的电影的数量")
movieCount = ratingsDF.where(ratingsDF["rating"]>ratingsDF.select(F.avg(ratingsDF["rating"])).first()["avg(rating)"]).count()
print("大于平均打分的电影的数量是:", movieCount)
# 5. 查询高分(大于3分)电影中打分次数最多的用户,给出此人打分的平均值
print(" 5.查询高分(大于3分)电影中打分次数最多的用户,给出此人打分的平均值")
user_id = ratingsDF.where(ratingsDF["rating"]>3).\
groupBy("user_id").\
count().\
withColumnRenamed("count","high_rating_count"). \
orderBy("high_rating_count", ascending=False).\
limit(1).\
first()["user_id"]
ratingsDF.filter(ratingsDF["user_id"]==user_id).\
select(F.round(F.avg("rating"),3)).show()
# 6.查询每个用户的平均打分、最低打分和最高打分
print("6.查询每个用户的平均打分、最低打分和最高打分")
ratingsDF.groupBy("user_id").\
agg(
F.round(F.avg("rating"),3).alias("avg_rating"),
F.round(F.min("rating"), 3).alias("min_rating"),
F.round(F.max("rating"), 3).alias("max_rating")
).show()
# 7. 查询打分次数超过100次的电影的平均分排行榜的TOP10
print("7.查询打分次数超过100次的电影的平均分排行榜的TOP10")
ratingsDF.groupBy("movie_id").\
agg(
F.count("movie_id").alias("cnt"),
F.round(F.avg("rating"),3).alias("avg_rating")
).where("cnt>100").\
orderBy("avg_rating",ascending=False).\
limit(10).\
show()