作者:厦门大学大数据课程虚拟教研室 林徐
一、环境配置
本文在Windows下配置Spark访问Hive。如需在Linux上配置,请对应Linux上同样的目录即可。
1.检查PySpark环境正常运行;检查Hive环境正常运行;启动Hive元数据服务
hive –service metastore
2.先将%HIVE_HOME%\conf\hive-site.xml拷贝到%SPARK_HOME%\conf。此步骤是为了Spark能读取Hive相应的配置;
3.再将%HIVE_HOME%\lib下的MySQL连接驱动的Jar包(mysql-connector-java-5.1.36-bin.jar)拷贝到%SPARK_HOME%\jars目录下。Jar包的版本与MySQL数据库的版本配套。此步骤是为了Spark能够访问Hive的元数据库。
此时,正常启动PySpark交互程序,可在交互模式下正常访问Hive了。进入交互环境,在提示符后直接输入以下代码:
spark.sql(‘show tables’).show()
正常执行后,应该能够看到default库中的表。这时,可以配置Python IDE的开发环境了
4.检查pycharm或其他IDE中的PySpark的开发环境正常;
5.再将%HIVE_HOME%\lib下的MySQL连接驱动的Jar包(mysql-connector-java-5.1.36-bin.jar)拷贝到%PYTHONDIR%\Lib\site-packages\pyspark\jars目录下。Jar包的版本与MySQL数据库的版本配套。此步骤是为了在IDE环境中能够访问Hive的元数据库;
6.添加环境变量SPARK_CONF_DIR,变量值为%SPARK_HOME%\conf。此步骤是为了在IDE中运行Spark程序时,能够读取Spark配置目录下的相应配置信息;
此时,正常启动pycharm。可在IDE环境下正常访问Hive了。在pycharm的工程中新建一个Python文件,输入以下代码:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('sparkhive').master('local[*]').enableHiveSupport().getOrCreate()
spark.sql('show tables').show()
正常执行后,应该能够看到default库中的表。
二、读写Hive数据源
从Spark2.0开始,引入SparkSession 作为 DataSet 和 DataFrame API 的切入点,SparkSession封装了SparkConf、SparkContext 和 SQLContext。为了向后兼容,SQLContext 和 HiveContext也被保存下来。在实际写程序时,只需要定义一个SparkSession对象就可以了。不用使用SQLContext 和 HiveContext。
1.SQLContext 和 HiveContext方式读写Hive数据
(1)读取数据
from pyspark.sql import HiveContext
from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster("local").setAppName("sparkhive")
sc=SparkContext(conf=conf)
hive_context = HiveContext(sc)
stocks_df = hive_context.sql("SELECT * FROM stocks")
stocks_df.show(10)
(2)写入数据
from pyspark.sql import HiveContext
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster("local").setAppName("sparkhive")
sc=SparkContext(conf=conf)
# 定义DataFrame的结构(与stocks表的结构一致)
schema = StructType([
StructField("exchange_e", StringType(), True),
StructField("symbol", StringType(), True),
StructField("ymd", StringType(), True),
StructField("price_open", FloatType(), True),
StructField("price_high", FloatType(), True),
StructField("price_low", FloatType(), True),
StructField("price_close", FloatType(), True),
StructField("volume", IntegerType(), True),
StructField("price_adj_close", FloatType(), True)
])
hive_context = HiveContext(sc)
# 创建DataFrame
new_data = [("BJSSE","AAPL", "2023-07-04", 150.0, 155.0, 148.0, 152.0, 1000000, 151.0),
("SSE", "GOOG","2023-07-04", 2600.0, 2650.0, 2590.0, 2630.0, 500000, 2620.0)]
df_to_write = hive_context.createDataFrame(new_data, schema=schema)
# 注册为临时表以便进行后续操作
df_to_write.registerTempTable("temp_stocks")
# 将临时表中的数据插入到stocks表
hive_context.sql('''
INSERT INTO TABLE stocks
SELECT * FROM temp_stocks
''')
hive_context.sql("select * from stocks where exchange_e='SSE'").show()
此方法的读写操作也可以参看Spark2.1.0入门:连接Hive读写数据(DataFrame)(Python版)_厦大数据库实验室博客 https://dblab.xmu.edu.cn/blog/1729/
2.SparkSession方式读取Hive数据
在Spark中,使用SparkSession(从Spark 2.0开始)可以方便地读取和写入Hive表。以下是如何在Python中使用PySpark进行操作的例子:
(1)读取数据
from pyspark.sql import SparkSession
# 初始化SparkSession并启用Hive支持
spark = SparkSession.builder\
.appName("StocksDataWriteExample")\
.enableHiveSupport()\
.getOrCreate()
# 读取并显示stocks表的数据
spark.sql("SELECT * FROM stocks").show(10)
(2)写入数据
from pyspark.sql import SparkSession
# 初始化SparkSession并启用Hive支持
spark = SparkSession.builder \
.appName("StocksDataWriteExample") \
.enableHiveSupport() \
.getOrCreate()
# 定义数据和列结构(与stocks表结构一致)
columns = ["exchange_e", "symbol", "ymd", "price_open", "price_high", "price_low", "price_close", "volume", "price_adj_close"]
new_data = [("BJSSE","AAPL", "2023-07-04", 150.0, 155.0, 148.0, 152.0, 1000000, 151.0),
("SSE", "GOOG","2023-07-04", 2600.0, 2650.0, 2590.0, 2630.0, 500000, 2620.0)]
# 创建DataFrame
df_to_write = spark.createDataFrame(new_data, schema=columns)
# 写入数据到stocks表,这里假设mode为'append'(追加模式)
df_to_write.write \
.mode('append') \
.format('Hive') \
.saveAsTable('default.stocks')
(3)要注意的问题
Hive 3.0以后,默认建立的表是ORC格式的(不用在hive-site.xml中开启行级事务支持)。即可以支持INSERT,DELETE和UPDATE行级事务操作。但如果是在Hive交互命令行创建的表,在spark程序看来都是HiveFileFormat格式的表。因此,上面的代码中采用.format('Hive')。Spark会匹配相应的schema。要回避这个问题,也可以采用以下代码,即从一个临时表向目标表追加数据的方法。
创建一个与stocks表结构相同的临时表
df_to_write.createOrReplaceTempView("temp_stocks")
使用Hive SQL语句将临时表数据插入到stocks表
spark.sql("""
INSERT INTO TABLE default.stocks
SELECT * FROM temp_stocks
""")
spark.sql('select * from stocks limit 10').show()