PySpark 读写Hive数据源

大数据学习路线图

作者:厦门大学大数据课程虚拟教研室 林徐

一、环境配置

本文在Windows下配置Spark访问Hive。如需在Linux上配置,请对应Linux上同样的目录即可。
1.检查PySpark环境正常运行;检查Hive环境正常运行;启动Hive元数据服务

hive –service metastore

2.先将%HIVE_HOME%\conf\hive-site.xml拷贝到%SPARK_HOME%\conf。此步骤是为了Spark能读取Hive相应的配置;
3.再将%HIVE_HOME%\lib下的MySQL连接驱动的Jar包(mysql-connector-java-5.1.36-bin.jar)拷贝到%SPARK_HOME%\jars目录下。Jar包的版本与MySQL数据库的版本配套。此步骤是为了Spark能够访问Hive的元数据库。
此时,正常启动PySpark交互程序,可在交互模式下正常访问Hive了。进入交互环境,在提示符后直接输入以下代码:

spark.sql(‘show tables’).show()

正常执行后,应该能够看到default库中的表。这时,可以配置Python IDE的开发环境了

4.检查pycharm或其他IDE中的PySpark的开发环境正常;
5.再将%HIVE_HOME%\lib下的MySQL连接驱动的Jar包(mysql-connector-java-5.1.36-bin.jar)拷贝到%PYTHONDIR%\Lib\site-packages\pyspark\jars目录下。Jar包的版本与MySQL数据库的版本配套。此步骤是为了在IDE环境中能够访问Hive的元数据库;
6.添加环境变量SPARK_CONF_DIR,变量值为%SPARK_HOME%\conf。此步骤是为了在IDE中运行Spark程序时,能够读取Spark配置目录下的相应配置信息;
此时,正常启动pycharm。可在IDE环境下正常访问Hive了。在pycharm的工程中新建一个Python文件,输入以下代码:

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('sparkhive').master('local[*]').enableHiveSupport().getOrCreate()
spark.sql('show tables').show()

正常执行后,应该能够看到default库中的表。

二、读写Hive数据源

从Spark2.0开始,引入SparkSession 作为 DataSet 和 DataFrame API 的切入点,SparkSession封装了SparkConf、SparkContext 和 SQLContext。为了向后兼容,SQLContext 和 HiveContext也被保存下来。在实际写程序时,只需要定义一个SparkSession对象就可以了。不用使用SQLContext 和 HiveContext。

1.SQLContext 和 HiveContext方式读写Hive数据

(1)读取数据

from pyspark.sql import HiveContext
from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster("local").setAppName("sparkhive")
sc=SparkContext(conf=conf)
hive_context = HiveContext(sc)
stocks_df = hive_context.sql("SELECT * FROM stocks")
stocks_df.show(10)


(2)写入数据

from pyspark.sql import HiveContext
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster("local").setAppName("sparkhive")
sc=SparkContext(conf=conf)
# 定义DataFrame的结构(与stocks表的结构一致)
schema = StructType([
    StructField("exchange_e", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("ymd", StringType(), True),
    StructField("price_open", FloatType(), True),
    StructField("price_high", FloatType(), True),
    StructField("price_low", FloatType(), True),
    StructField("price_close", FloatType(), True),
    StructField("volume", IntegerType(), True),
    StructField("price_adj_close", FloatType(), True)
])
hive_context = HiveContext(sc)
# 创建DataFrame
new_data = [("BJSSE","AAPL", "2023-07-04", 150.0, 155.0, 148.0, 152.0, 1000000, 151.0),
            ("SSE", "GOOG","2023-07-04", 2600.0, 2650.0, 2590.0, 2630.0, 500000, 2620.0)]
df_to_write = hive_context.createDataFrame(new_data, schema=schema)
# 注册为临时表以便进行后续操作
df_to_write.registerTempTable("temp_stocks")
# 将临时表中的数据插入到stocks表
hive_context.sql('''
    INSERT INTO TABLE stocks
    SELECT * FROM temp_stocks
''')
hive_context.sql("select * from stocks where exchange_e='SSE'").show()

此方法的读写操作也可以参看Spark2.1.0入门:连接Hive读写数据(DataFrame)(Python版)_厦大数据库实验室博客 https://dblab.xmu.edu.cn/blog/1729/

2.SparkSession方式读取Hive数据

在Spark中,使用SparkSession(从Spark 2.0开始)可以方便地读取和写入Hive表。以下是如何在Python中使用PySpark进行操作的例子:
(1)读取数据

from pyspark.sql import SparkSession

# 初始化SparkSession并启用Hive支持
spark = SparkSession.builder\
    .appName("StocksDataWriteExample")\
    .enableHiveSupport()\
    .getOrCreate()
# 读取并显示stocks表的数据
spark.sql("SELECT * FROM stocks").show(10)

(2)写入数据

from pyspark.sql import SparkSession

# 初始化SparkSession并启用Hive支持
spark = SparkSession.builder \
        .appName("StocksDataWriteExample") \
        .enableHiveSupport() \
        .getOrCreate()

# 定义数据和列结构(与stocks表结构一致)
columns = ["exchange_e", "symbol", "ymd", "price_open", "price_high", "price_low", "price_close", "volume", "price_adj_close"]
new_data = [("BJSSE","AAPL", "2023-07-04", 150.0, 155.0, 148.0, 152.0, 1000000, 151.0),
            ("SSE", "GOOG","2023-07-04", 2600.0, 2650.0, 2590.0, 2630.0, 500000, 2620.0)]
# 创建DataFrame
df_to_write = spark.createDataFrame(new_data, schema=columns)
# 写入数据到stocks表,这里假设mode为'append'(追加模式)
df_to_write.write \
.mode('append') \
.format('Hive') \
.saveAsTable('default.stocks')

(3)要注意的问题
Hive 3.0以后,默认建立的表是ORC格式的(不用在hive-site.xml中开启行级事务支持)。即可以支持INSERT,DELETE和UPDATE行级事务操作。但如果是在Hive交互命令行创建的表,在spark程序看来都是HiveFileFormat格式的表。因此,上面的代码中采用.format('Hive')。Spark会匹配相应的schema。要回避这个问题,也可以采用以下代码,即从一个临时表向目标表追加数据的方法。
创建一个与stocks表结构相同的临时表

df_to_write.createOrReplaceTempView("temp_stocks")

使用Hive SQL语句将临时表数据插入到stocks表

spark.sql("""
    INSERT INTO TABLE default.stocks
    SELECT * FROM temp_stocks
""")
spark.sql('select * from stocks limit 10').show()