借助于Arrow实现PySpark和Pandas之间的数据交换

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
相关教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》(访问教材官网
Apache Arrow是一个高效的列式数据格式,可以在PySpark中实现JVM和Python进程之间的数据交换。这对于使用Numpy和Pandas的Python用户来说,是可以带来很多好处的。不过,它的使用并不是自动发生的,而是需要经过一些安装和配置工作。

一、安装Apache PyArrow

在PySpark中使用Arrow,需要安装PyArrow。PyArrow目前(2020年6月)和Python 3.5, 3.6, 3.7以及3.8保持兼容。

(1)方法1:使用Conda安装PyArrow

可以执行如下命令,安装最新版本的PyArrow:

  1. conda install -c conda-forge pyarrow
Shell 命令

(2)方法2:使用pip安装PyArrow

可以执行如下命令,安装最新版本的PyArrow:

  1. pip install pyarrow
Shell 命令

二、Spark DataFrame和Pandas DataFrame之间的数据交换

当需要把Spark DataFrame转换成Pandas DataFrame时,可以调用toPandas();当需要从Pandas DataFrame创建Spark DataFrame时,可以采用createDataFrame(pandas_df)。但是,需要注意的是,在调用这些操作之前,需要首先把Spark的参数spark.sql.execution.arrow.enabled设置为true,因为这个参数在默认情况下是false。
下面是一个参考实例代码:

  1. import numpy as np
  2. import pandas as pd
  3.  
  4. # Enable Arrow-based columnar data transfers
  5. spark.conf.set("spark.sql.execution.arrow.enabled", "true")
  6.  
  7. # Generate a Pandas DataFrame
  8. pdf = pd.DataFrame(np.random.rand(100, 3))
  9.  
  10. # Create a Spark DataFrame from a Pandas DataFrame using Arrow
  11. df = spark.createDataFrame(pdf)
  12.  
  13. # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
  14. result_pdf = df.select("*").toPandas()
Python

下面是另外一段代码,也是实现了Spark DataFrame和Pandas DataFrame之间的数据交换,供参考:

  1. import pandas as pd
  2. from pyspark.sql import SparkSession
  3. spark= SparkSession\
  4. .builder \
  5. .appName("Dataframe") \
  6. .getOrCreate()
  7.  
  8. data=pd.DataFrame([[1,2],[3,4]],columns=['a','b'])
  9. data_values=data.values.tolist()
  10. data_coulumns=list(data.columns)
  11. df=spark.createDataFrame(data)
  12.  
  13. #将pandas.DataFrame转为spark.dataFrame
  14. spark_df = spark.createDataFrame(data_values, data_coulumns)
  15. print('spark.dataFram=',spark_df.show())
  16.  
  17. #将spark.dataFrame转为pandas.DataFrame
  18. Spark.conf.set(“spark.sql.execution.arrow.enabled”,”true")
  19. pandas_df = spark_df.toPandas()
  20. print('pandas.DataFrame=',pandas_df)
  21.  
  22. #将spark.dataFrame存入hive
  23. spark_df.createOrReplaceTempView('table_test')
  24. spark.sql(
  25. "create table tmp.table_test SELECT * FROM table_test"
  26. )
Python