代码-第6章 Spark SQL-林子雨编著《Spark编程基础(Python版,第2版)》

大数据学习路线图

厦门大学林子雨编著《Spark编程基础(Python版,第2版)》教材中的命令行和代码(教材官网
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Spark编程基础(Python版,第2版)》教材中的所有命令行和代码

# coding:utf8
from pyspark.sql import SparkSession
if __name__ == '__main__':
    spark = SparkSession\
        .builder\
        .master("local[*]")\
        .appName("Simple Application")\
        .getOrCreate()
    logFile = "file:///usr/local/spark/README.md"
    logDF = spark.read.text(logFile)
    numAs = logDF.filter(logDF["value"].contains("a")).count()
    numBs = logDF.filter(logDF["value"].contains("b")).count()
    print('Lines with a: %s, Lines with b: %s' % (numAs, numBs))
    spark.stop()
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName("Simple App")
    sc = SparkContext(conf=conf)
    bookRDD = sc.parallelize([("spark", 2), ("hadoop", 6), ("hadoop", 4), ("spark", 6)])
    saleRDD = bookRDD.map(lambda x: (x[0], (x[1], 1))) \
        .reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1])) \
        .map(lambda x : (x[0], x[1][0] / x[1][1]))
    saleRDD.foreach(print)
    sc.stop()
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
if __name__ == '__main__':
    spark = SparkSession\
        .builder\
        .master("local[*]")\
        .appName("Simple Application")\
        .getOrCreate()
    bookDF = spark \
    .createDataFrame([("spark", 2), ("hadoop", 6), ("hadoop", 4), ("spark", 6)]) \
    .toDF("book", "amount")
    avgDF = bookDF.groupBy("book").agg(avg("amount"))
    avgDF.show()
    spark.stop()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/users.parquet"
>>> df = spark.read.format("parquet").load(filePath)
>>> df.show()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/users.parquet"
>>> df = spark.read.parquet(filePath)
>>> df.write.format("parquet").mode("overwrite").option("compression","snappy").
... save("file:///home/hadoop/otherusers")
>>> df.write.parquet("file:///home/hadoop/otherusers")
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.json"
>>> df = spark.read.format("json").load(filePath)
>>> df.show()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.json"
>>> df = spark.read.json(filePath)
>>> df.write.json("file:///home/hadoop/otherpeople")
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.csv"
>>> schema = "name STRING,age INT,job STRING"
>>> df = spark.read.format("csv").schema(schema).option("header","true").\
... option("sep",";").load(filePath)
>>> df.show()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.csv"
>>>  schema = "name STRING,age INT,job STRING"
>>>  df = spark.read.schema(schema).option("header","true").\
...  option("sep",";").csv(filePath)
>>> df.write.csv("file:///home/hadoop/anotherpeople")
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.txt"
>>> df = spark.read.format("text").load(filePath)
>>> df.show()
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.txt"
>>> df = spark.read.text(filePath)
>>> df.write.text("file:///home/hadoop/newpeople")
>>> df.write.format("text").save("file:///home/hadoop/newpeople")
>>> df = spark.createDataFrame([\
     ("Xiaomei","Female","21"),\
     ("Xiaoming","Male","22"),\
     ("Xiaoxue","Female","23")]).\
     toDF("name","sex","age")
>>> df.show()
>>> from pyspark.sql import SparkSession,Row
>>> from pyspark.sql.types import IntegerType, StringType, StructField, StructType
    >>> schema = StructType([StructField("name", StringType(), True),\
                         StructField("age", IntegerType(), True),\
                         StructField("sex", StringType(), True)])   
    >>> data = []
    >>> data.append(Row("Xiaomei", 21, "Female"))
    >>> data.append(Row("Xiaoming", 22, "Male"))
    >>> data.append(Row("Xiaoxue", 23, "Female"))
    >>> df = spark.createDataFrame(data, schema)
    >>> df.show() 
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.json"
>>>  df = spark.read.json(filePath)
>>> filePath = "file:///usr/local/spark/examples/src/main/resources/people.json"
>>> df = spark.read.json(filePath)
>>> df.show()
>>> df.createTempView("people")
>>> spark.sql("SELECT * FROM people").show()
>>> spark.sql("SELECT name FROM people where age > 20").show()
>>> from pyspark.sql import SparkSession,Row
>>> from pyspark.sql.functions import from_unixtime
>>> from pyspark.sql.types import IntegerType, StringType, LongType, StructField, StructType
>>> schema = StructType([StructField("name", StringType(), True),\
...                          StructField("age", IntegerType(), True),\
...                          StructField("create_time",LongType(),True)])
>>> data = []
>>> data.append(Row("Xiaomei",21,1580432800))
>>> data.append(Row("Xiaoming",22,1580436400))
>>> data.append(Row("Xiaoxue",23,1580438800))
>>> df = spark.createDataFrame(data, schema)
>>> df.show()
>>> df.createTempView("user_info")
>>> spark.sql("SELECT name,age,from_unixtime(create_time,'yyyy-MM-dd HH:mm:ss') FROM user_info").show()
>>> spark.udf.register("toUpperCaseUDF", lambda column : column.upper())
<function <lambda> at 0x7f2a9c03f8b0>
>>> spark.sql("SELECT toUpperCaseUDF(name),age,from_unixtime(create_time,'yyyy-MM-dd HH:mm:ss') FROM user_info").show()
>>> from pyspark.sql import Row
>>> people = spark.sparkContext.\
... textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").\
... map(lambda line: line.split(",")).\
... map(lambda p: Row(name=p[0], age=int(p[1])))
>>> schemaPeople = spark.createDataFrame(people)
#必须注册为临时表才能供下面的查询使用
>>> schemaPeople.createOrReplaceTempView("people") 
>>> personsDF = spark.sql("select name,age from people where age > 20")
#DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值
>>> personsRDD=personsDF.rdd.map(lambda p:"Name: "+p.name+ ","+"Age: "+str(p.age))
>>> personsRDD.foreach(print)
Name: Michael,Age: 29
Name: Andy,Age: 30
>>> from pyspark.sql.types import *
>>> from pyspark.sql import Row
#下面生成“表头”
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
>>> schema = StructType(fields)
#下面生成“表中的记录”
>>> lines = spark.sparkContext.\
... textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
>>> parts = lines.map(lambda x: x.split(","))
>>> people = parts.map(lambda p: Row(p[0], p[1].strip()))
#下面把“表头”和“表中的记录”拼装在一起
>>> schemaPeople = spark.createDataFrame(people, schema)
#注册一个临时表供后面的查询使用
>>> schemaPeople.createOrReplaceTempView("people")
>>> results = spark.sql("SELECT name,age FROM people")
>>> results.show()
sudo service mysql start
mysql -u root -p  #屏幕会提示输入密码
mysql> create database spark;
mysql> use spark;
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into student values(1,'Xueqian','F',23);
mysql> insert into student values(2,'Weiliang','M',24);
mysql> select * from student;
>>>    jdbcDF = spark.read \
        .format("jdbc") \
        .option("driver","com.mysql.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306/spark?useSSL=false") \
        .option("dbtable", "student") \
        .option("user", "root") \
        .option("password", "123456") \
        .load()
>>> jdbcDF.show()
from pyspark.sql import SparkSession
if __name__ == '__main__':
    spark = SparkSession. \
        Builder(). \
        appName('SparkReadMySQL'). \
        master('local[*]'). \
        getOrCreate()
    jdbcDF = spark.read \
        .format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306/spark?useSSL=false") \
        .option("dbtable", "student") \
        .option("user", "root") \
        .option("password", "123456") \
        .load()
    jdbcDF.show()
    spark.stop()
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

#设置模式信息 
schema = StructType([StructField("id", IntegerType(), True), \
           StructField("name", StringType(), True), \
           StructField("gender", StringType(), True), \
           StructField("age", IntegerType(), True)])

#设置两条数据,表示两个学生的信息
studentRDD = spark \
               .sparkContext \
               .parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]) \
               .map(lambda x:x.split(" "))
 
#创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
 
#建立Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)
 
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '123456'
prop['driver'] = "com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark?useSSL=false",'student','append', prop)
mysql> select * from student;
conda activate pyspark
pip install pandas
>>> import numpy as np
>>> import pandas as pd
>>> from pandas import Series,DataFrame
>>> obj=Series([3,5,6,8,9,2])
>>> obj
>>> obj2=Series([3,5,6,8,9,2],index=['a','b','c','d','e','f'])
>>> obj2
>>> obj2['a']
>>> obj2[['b','d','f']]
>>> import numpy as np
>>> import pandas as pd
>>> from pandas import Series,DataFrame
>>> data = {'sno':['95001', '95002', '95003', '95004'],
    'name':['Xiaoming','Zhangsan','Lisi','Wangwu'],
    'sex':['M','F','F','M'],
    'age':[22,25,24,23]}
>>> frame=DataFrame(data)
>>> frame
>>> frame=DataFrame(data,columns=['name','sno','sex','age'])
>>> frame
>>> frame=DataFrame(data,columns=['sno','name','sex','age','grade'],\
... index=['a','b','c','d'])
>>> frame
>>> frame['sno']
>>> frame.name
>>> frame.loc['b']

代码文件PysparkPandasDataframe.py

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType,StructField, StructType
import pandas as pd

if __name__ == '__main__':
    spark = SparkSession\
        .builder\
        .master("local[*]")\
        .appName("Simple Application")\
        .getOrCreate()
    # 定义pandas的DataFrame
    pd_df = pd.DataFrame({
        'id':[1,2,3,4,5],
        'name':['zhangsan ','lisi','wangwu','maliu','liuqi'],
        'age':[24,26,22,28,24]
    }
    )
    # 展示pandas中的DataFrame
    print(pd_df)
    # 定义PySpark的DataFrame的数据结构
    schema = StructType([StructField("id", IntegerType(), True), \
                         StructField("name", StringType(), True), \
                         StructField("age", IntegerType(), True)])
    # 将pandas中的DataFrame转换为PySPark中的DataFrame
    spark_df=spark.createDataFrame(pd_df,schema)
    # 展示PySpark中的DataFrame
    spark_df.show()
    # 将PySpark中的DataFrame转化为pandas中的DataFrame
    pd_df2=spark_df.toPandas()
    # 展示pandas中的DataFrame
    print(pd_df2)

假设有一个文本文件people_data.txt,其数据内容如下:

1,zhangsan,22
2,lisi,26
3,wangwu,28
4,maliu,24
pip install pyarrow

代码文件PysparkPandasFunction.py

# coding:utf8
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import IntegerType, StringType, StructField, StructType, Row
import pandas as pd

if __name__ == '__main__':
    spark = SparkSession\
        .builder\
        .master("local[*]")\
        .appName("Simple Application")\
        .getOrCreate()
    peopleRDD = spark.sparkContext\
        .textFile("file:///home/hadoop/people_data.txt")\
        .map(lambda line: line.split(","))\
        .map(lambda p: Row(id=int(p[0]), name=p[1], age=int(p[2])))
    schema = StructType([StructField("id", IntegerType(), True), \
                         StructField("name", StringType(), True), \
                         StructField("age", IntegerType(), True)])
    peopleDF = peopleRDD.toDF(schema)
# 注册自定义函数,需要包括两个步骤
# 第1步是使用装饰器@进行装饰
@F.pandas_udf(IntegerType())
# 定义自定义函数
    def myFunction(a:pd.Series) -> int:
        # a是接收参数,接收的是整列数据,所以需要指定数据类型为pd.Series
        # 对接收到的a进行计算
        result = a.sum()
        return result
    # 注册自定义函数的第2步:使用PySpark的register方法进行注册
    sum_func = spark.udf.register("sum_func",myFunction)
    # 在DSL风格语法中使用自定义函数
    sumDF = peopleDF.select(sum_func("age"))
    sumDF.show()    
    # 在SQL风格语法中使用自定义函数
    peopleDF.createOrReplaceTempView("people")
    sumDF2 = spark.sql("select sum_func(age) from people")
sumDF2.show()
spark.stop()

有一个电影评分数据集,数据格式如下:

1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275

程序MovieRating.py

from pyspark.sql.types import IntegerType, StringType, StructField, StructType
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    spark = SparkSession\
        .builder\
        .master("local[*]")\
        .appName("Simple Application")\
        .getOrCreate()
    # 1.读取数据集
    filePath = "file:///home/hadoop/ratings.dat"
    schema = StructType([StructField("user_id", StringType(), True),\
                         StructField("movie_id", StringType(), True), \
                         StructField("rating", IntegerType(), True), \
                         StructField("ts",StringType(),True)])
    ratingsDF = spark.read.format("csv").\
        option("sep","::").\
        option("header",False).\
        option("encoding","utf-8").\
        schema(schema).\
        load(filePath)

    print("1.数据集如下:")
    ratingsDF.show()
    ratingsDF.createTempView("ratings")

    # 2. 求每个用户的平均打分
    print("2.求每个用户的平均打分")
    ratingsDF.groupby("user_id").\
        avg("rating").\
        withColumnRenamed("avg(rating)","avg_rating").\
        withColumn("avg_rating",F.round("avg_rating",3)).\
        orderBy("avg_rating",ascending=False).\
        show()

    # 3. 求每部电影的平均打分
    print("3.求每部电影的平均打分")
    ratingsDF.createTempView("movie_ratings")
    spark.sql("""
        SELECT movie_id, ROUND(AVG(rating),3) AS avg_rating FROM movie_ratings GROUP BY movie_id ORDER BY avg_rating DESC
    """).show()

    # 4.查询大于平均打分的电影的数量
    print("4.查询大于平均打分的电影的数量")
    movieCount = ratingsDF.where(ratingsDF["rating"]>ratingsDF.select(F.avg(ratingsDF["rating"])).first()["avg(rating)"]).count()
    print("大于平均打分的电影的数量是:", movieCount)

    # 5. 查询高分(大于3分)电影中打分次数最多的用户,给出此人打分的平均值
    print(" 5.查询高分(大于3分)电影中打分次数最多的用户,给出此人打分的平均值")
    user_id = ratingsDF.where(ratingsDF["rating"]>3).\
        groupBy("user_id").\
        count().\
        withColumnRenamed("count","high_rating_count"). \
        orderBy("high_rating_count", ascending=False).\
        limit(1).\
        first()["user_id"]
    ratingsDF.filter(ratingsDF["user_id"]==user_id).\
        select(F.round(F.avg("rating"),3)).show()

    # 6.查询每个用户的平均打分、最低打分和最高打分
    print("6.查询每个用户的平均打分、最低打分和最高打分")
    ratingsDF.groupBy("user_id").\
        agg(
            F.round(F.avg("rating"),3).alias("avg_rating"),
            F.round(F.min("rating"), 3).alias("min_rating"),
            F.round(F.max("rating"), 3).alias("max_rating")
        ).show()

    # 7. 查询打分次数超过100次的电影的平均分排行榜的TOP10
    print("7.查询打分次数超过100次的电影的平均分排行榜的TOP10")
    ratingsDF.groupBy("movie_id").\
        agg(
            F.count("movie_id").alias("cnt"),
            F.round(F.avg("rating"),3).alias("avg_rating")
        ).where("cnt>100").\
        orderBy("avg_rating",ascending=False).\
        limit(10).\
        show()