Spark2.1.0入门:通过JDBC连接数据库(DataFrame)(Python版)

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
返回Spark教程首页
推荐纸质教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》

这里以关系数据库MySQL为例。首先,请参考厦门大学数据库实验室博客教程(Ubuntu安装MySQL),在Linux系统中安装好MySQL数据库。这里假设你已经成功安装了MySQL数据库。下面我们要新建一个测试Spark程序的数据库,数据库名称是“spark”,表的名称是“student”。

请执行下面命令在Linux中启动MySQL数据库,并完成数据库和表的创建,以及样例数据的录入:

  1. service mysql start
  2. mysql -u root -p
  3. //屏幕会提示你输入密码
Shell 命令

输入密码后,你就可以进入“mysql>”命令提示符状态,然后就可以输入下面的SQL语句完成数据库和表的创建:

  1. mysql> create database spark;
  2. mysql> use spark;
  3. mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
  4. mysql> alter table student change id id int auto_increment primary key;
  5. mysql> insert into student values(1,'Xueqian','F',23);
  6. mysql> insert into student values(2,'Weiliang','M',24);
  7. mysql> select * from student;
mysql

上面已经创建好了我们所需要的MySQL数据库和表,下面我们编写Spark应用程序连接MySQL数据库并且读写数据。

Spark支持通过JDBC方式连接到其他数据库获取数据生成DataFrame。

首先,请进入Linux系统(本教程统一使用hadoop用户名登录),打开火狐(FireFox)浏览器,下载一个MySQL的JDBC驱动(下载)。在火狐浏览器中下载时,一般默认保存在hadoop用户的当前工作目录的“下载”目录下,所以,可以打开一个终端界面,输入下面命令查看:

  1. cd ~
  2. cd 下载
Shell 命令

就可以看到刚才下载到的MySQL的JDBC驱动程序,文件名称为mysql-connector-java-5.1.40.tar.gz(你下载的版本可能和这个不同)。现在,使用下面命令,把该驱动程序拷贝到spark的安装目录下:

  1. sudo tar -zxf ~/下载/mysql-connector-java-5.1.40.tar.gz -C /usr/local/spark/jars
  2. cd /usr/local/spark/jars
  3. ls
Shell 命令

这时就可以在/usr/local/spark/jars目录下看到这个驱动程序文件所在的文件夹mysql-connector-java-5.1.40,进入这个文件夹,就可以看到驱动程序文件mysql-connector-java-5.1.40-bin.jar。
请输入下面命令启动已经安装在Linux系统中的mysql数据库(如果前面已经启动了MySQL数据库,这里就不用重复启动了)。

  1. service mysql start
Shell 命令

下面,我们要启动一个pyspark,而且启动的时候,要附加一些参数。启动pyspark时,必须指定mysql连接驱动jar包。

  1. cd /usr/local/spark
  2. ./bin/pyspark \
  3. --jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \
  4. --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar
Shell 命令

上面的命令行中,在一行的末尾加入斜杠\,是为了告诉spark-shell,命令还没有结束。

启动进入pyspark以后,可以执行以下命令连接数据库,读取数据,并显示:

  1. >>> jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()
  2. >>> jdbcDF.show()
Python

下面我们再来看一下如何往MySQL中写入数据。
为了看到MySQL数据库在Spark程序执行前后发生的变化,我们先在Linux系统中新建一个终端,使用下面命令查看一下MySQL数据库中的数据库spark中的表student的内容:

  1. service mysql start //如果前面已经启动MySQL数据库,这里不用再执行这条命令
  2. mysql -u root -p
Shell 命令

执行上述命令后,屏幕上会提示你输入MySQL数据库密码,我们这里给数据库设置的用户是root,密码是hadoop。
因为之前我们已经在MySQL数据库中创建了一个名称为spark的数据库,并创建了一个名称为student的表,现在查看一下:

  1. mysql> use spark;
  2. Database changed
  3.  
  4. mysql> select * from student;
  5. //上面命令执行后返回下面结果
  6. +------+----------+--------+------+
  7. | id | name | gender | age |
  8. +------+----------+--------+------+
  9. | 1 | Xueqian | F | 23 |
  10. | 2 | Weiliang | M | 24 |
  11. +------+----------+--------+------+
  12. 2 rows in set (0.00 sec)
mysql

现在我们开始在pyspark中编写程序,往spark.student表中插入两条记录。
下面,我们要启动一个pyspark,而且启动的时候,要附加一些参数。启动pyspark时,必须指定mysql连接驱动jar包(如果你前面已经采用下面方式启动了pyspark,就不需要重复启动了):

  1. cd /usr/local/spark
  2. ./bin/pyspark \
  3. --jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \
  4. --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar
Shell 命令

上面的命令行中,在一行的末尾加入斜杠\,是为了告诉spark-shell,命令还没有结束。

启动进入pyspark以后,可以执行以下命令连接数据库,写入数据,程序如下(你可以把下面程序一条条拷贝到pyspark中执行)

  1. >>> from pyspark.sql.types import Row
  2. >>> from pyspark.sql.types import StructType
  3. >>> from pyspark.sql.types import StructField
  4. >>> from pyspark.sql.types import StringType
  5. >>> from pyspark.sql.types import IntegerType
  6. >>> studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
  7. //下面要设置模式信息
  8. >>> schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
  9. >>> rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3])))
  10. //建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
  11. >>> studentDF = spark.createDataFrame(rowRDD, schema)
  12. >>> prop = {}
  13. >>> prop['user'] = 'root'
  14. >>> prop['password'] = 'hadoop'
  15. >>> prop['driver'] = "com.mysql.jdbc.Driver"
  16. >>> studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)
Python

在pyspark中执行完上述程序后,我们可以看一下效果,看看MySQL数据库中的spark.student表发生了什么变化。请在刚才的另外一个窗口的MySQL命令提示符下面继续输入下面命令:

mysql> select * from student;
+------+-----------+--------+------+
| id   | name      | gender | age  |
+------+-----------+--------+------+
|    1 | Xueqian   | F      |   23 |
|    2 | Weiliang  | M      |   24 |
|    3 | Rongcheng | M      |   26 |
|    4 | Guanhua   | M      |   27 |
+------+-----------+--------+------+
4 rows in set (0.00 sec)