大数据原理与应用 第十六章 Spark 学习指南

大数据学习路线图


点击这里观看厦门大学林子雨老师主讲《大数据技术原理与应用》授课视频
【版权声明:本指南为厦门大学林子雨编著的《大数据技术原理与应用》教材配套学习资料,版权所有,转载请注明出处,请勿用于商业用途】

注:第十六章Spark,本章为2016年新增章节,不在2015年8月1日由人民邮电出版社出版发行的《大数据技术原理与应用》中,会被放入到教材的下一个版本中。

Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象。Spark 最大的特点就是快,可比 Hadoop MapReduce 的处理速度快 100 倍。本指南将介绍 Spark 的安装与基本使用。请务必仔细阅读完厦门大学林子雨编著的《大数据技术原理与应用》第16章节(点击这里下载第十六章Spark的pdf电子书),再结合本指南进行学习。

一、安装 Spark

访问Spark官方下载地址,按照如下图下载。
1
该部分介绍了单机中 Spark 的安装。我们选择Spark 1.6.2版本教学。该教程的具体运行环境如下:

  • Hadoop 2.6.0以上
  • Java JDK 1.7以上
  • Spark 1.6.2

安装Hadoop

Spark的安装过程较为简单,在已安装好 Hadoop 的前提下,经过简单配置即可使用。
如果仍没有安装Hadoop,请访问Hadoop安装教程_单机/伪分布式配置_Hadoop2.6.0/Ubuntu14.04,依照教程学习安装即可。

安装JAVA JDK

安装Hadoop的过程就已经要求安装JAVA JDK了。如果没有,请参考Hadoop安装教程_单机/伪分布式配置_Hadoop2.6.0/Ubuntu14.04进行安装配置。

安装Spark

  1. sudo tar -zxf ~/下载/spark-1.6.2-bin-without-hadoop.tgz -C /usr/local/
  2. cd /usr/local
  3. sudo mv ./spark-1.6.2-bin-without-hadoop/ ./spark
  4. sudo chown -R hadoop:hadoop ./spark # 此处的 hadoop 为你的用户名
Shell 命令

安装后,还需要修改Spark的配置文件spark-env.sh

  1. cd /usr/local/spark
  2. cp ./conf/spark-env.sh.template ./conf/spark-env.sh
Shell 命令

编辑spark-env.sh文件(vim ./conf/spark-env.sh),在第一行添加以下配置信息:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

配置完成后就可以直接使用,不需要像Hadoop运行启动命令。
通过运行Spark自带的示例,验证Spark是否安装成功。

  1. cd /usr/local/spark
  2. bin/run-example SparkPi
Shell 命令

执行时会输出非常多的运行信息,输出结果不容易找到,可以通过 grep 命令进行过滤(命令中的 2>&1 可以将所有的信息都输出到 stdout 中,否则由于输出日志的性质,还是会输出到屏幕中):

  1. bin/run-example SparkPi 2>&1 | grep "Pi is"
Shell 命令

这里涉及到Linux Shell中管道的知识,详情可以参考Linux Shell中的管道命令
过滤后的运行结果如下图示,可以得到π 的 5 位小数近似值:

二、使用 Spark Shell 编写代码

学习Spark程序开发,建议首先通过spark-shell交互式学习,加深Spark程序开发的理解。
该部分介绍了 Spark Shell 的基本使用。Spark shell 提供了简单的方式来学习 API,也提供了交互的方式来分析数据。

Spark Shell 支持 Scala 和 Python,该部分教程选择使用 Scala 来进行介绍。

启动Spark Shell

  1. bin/spark-shell
Shell 命令

启动spark-shell后,会自动创建名为sc的spark context对象和名为sqlContext的sql context对象,如图:
2

加载text文件

spark创建sc,可以加载本地文件和HDFS文件创建RDD。这里用Spark自带的本地文件README.md文件测试。

  1. val textFile = sc.textFile("file:///usr/local/spark/README.md")
scala

加载HDFS文件和本地文件都是使用textFile,区别是添加前缀(hdfs://和file://)进行标识。

简单RDD操作

  1. //获取RDD文件textFile的第一行内容
  2. textFile.first()
  3. //获取RDD文件textFile所有项的计数
  4. textFile.count()
  5. //抽取含有“Spark”的行,返回一个新的RDD
  6. val lineWithSpark = textFile.filter(line => line.contains("Spark"))
  7. //统计新的RDD的行数
  8. lineWithSpark.count()
scala

可以通过组合RDD操作进行组合,可以实现简易MapReduce操作

  1. //找出文本中每行的最多单词数
  2. textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
scala

更多RDD的操作,请访问Spark官方文档RDD操作

退出Spark Shell

输入exit,即可退出spark shell

  1. :quit
scala

三、独立应用程序编程

接着我们通过一个简单的应用程序 SimpleApp 来演示如何通过 Spark API 编写一个独立应用程序。使用 Scala 编写的程序需要使用 sbt 进行编译打包,相应的,Java 程序使用 Maven 编译打包,而 Python 程序通过 spark-submit 直接提交。

Scala独立应用编程

  1. 安装sbt
    sbt是一款Spark用来对scala编写程序进行打包的工具,这里简单介绍sbt的安装过程,感兴趣的读者可以参考官网资料了解更多关于sbt的内容。
    Spark 中没有自带 sbt,这里直接给出sbt-launch.jar的下载地址,直接点击下载即可。
    我们选择安装在 /usr/local/sbt 中:

    1. sudo mkdir /usr/local/sbt
    2. sudo chown -R hadoop /usr/local/sbt # 此处的 hadoop 为你的用户名
    3. cd /usr/local/sbt
    Shell 命令

    下载后,执行如下命令拷贝至 /usr/local/sbt 中:

    1. cp ~/下载/sbt-launch.jar .
    Shell 命令

    接着在 /usr/local/sbt 中创建 sbt 脚本(vim ./sbt),添加如下内容:

    1. #!/bin/bash
    2. SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
    3. java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"
    Shell 命令

    保存后,为 ./sbt 脚本增加可执行权限:

    1. chmod u+x ./sbt
    Shell 命令

    最后运行如下命令,检验 sbt 是否可用(请确保电脑处于联网状态,首次运行会处于 “Getting org.scala-sbt sbt 0.13.11 ...” 的下载状态,请耐心等待。笔者等待了 7 分钟才出现第一条下载提示):

    1. ./sbt sbt-version
    Shell 命令

    只要能得到如下图的版本信息就没问题:

    4

  2. Scala应用程序代码
    在终端中执行如下命令创建一个文件夹 sparkapp 作为应用程序根目录:

    1. cd ~ # 进入用户主文件夹
    2. mkdir ./sparkapp # 创建应用程序根目录
    3. mkdir -p ./sparkapp/src/main/scala # 创建所需的文件夹结构
    Shell 命令

    在 ./sparkapp/src/main/scala 下建立一个名为 SimpleApp.scala 的文件(vim ./sparkapp/src/main/scala/SimpleApp.scala),添加代码如下:

    1. /* SimpleApp.scala */
    2. import org.apache.spark.SparkContext
    3. import org.apache.spark.SparkContext._
    4. import org.apache.spark.SparkConf
    5.  
    6. object SimpleApp {
    7. def main(args: Array[String]) {
    8. val logFile = "file:///usr/local/spark/README.md" // Should be some file on your system
    9. val conf = new SparkConf().setAppName("Simple Application")
    10. val sc = new SparkContext(conf)
    11. val logData = sc.textFile(logFile, 2).cache()
    12. val numAs = logData.filter(line => line.contains("a")).count()
    13. val numBs = logData.filter(line => line.contains("b")).count()
    14. println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
    15. }
    16. }
    scala

    该程序计算 /usr/local/spark/README 文件中包含 "a" 的行数 和包含 "b" 的行数。代码第8行的 /usr/local/spark 为 Spark 的安装目录,如果不是该目录请自行修改。不同于 Spark shell,独立应用程序需要通过 val sc = new SparkContext(conf) 初始化 SparkContext,SparkContext 的参数 SparkConf 包含了应用程序的信息。

    该程序依赖 Spark API,因此我们需要通过 sbt 进行编译打包。 ./sparkapp 中新建文件 simple.sbt(vim ./sparkapp/simple.sbt),添加内容如下,声明该独立应用程序的信息以及与 Spark 的依赖关系:

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.10.5"
    
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2"
    

    文件 simple.sbt 需要指明 Spark 和 Scala 的版本。在上面的配置信息中,scalaVersion用来指定scala的版本,sparkcore用来指定spark的版本,这两个版本信息都可以在之前的启动 Spark shell 的过程中,从屏幕的显示信息中找到。下面就是笔者在启动过程当中,看到的相关版本信息(备注:屏幕显示信息会很长,需要往回滚动屏幕仔细寻找信息)。

    3

  3. 使用 sbt 打包 Scala 程序
    为保证 sbt 能正常运行,先执行如下命令检查整个应用程序的文件结构:

    1. cd ~/sparkapp
    2. find .
    Shell 命令

    文件结构应如下图所示:

    SimpleApp的文件结构

    接着,我们就可以通过如下代码将整个应用程序打包成 JAR(首次运行同样需要下载依赖包 ):

    1. /usr/local/sbt/sbt package
    Shell 命令

    打包成功的话,会输出如下图内容:

    SimpleApp的文件结构

    生成的 jar 包的位置为 ~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar。

  4. 通过 spark-submit 运行程序
    最后,我们就可以将生成的 jar 包通过 spark-submit 提交到 Spark 中运行了,命令如下:

    1. /usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar
    2. # 上面命令执行后会输出太多信息,可以不使用上面命令,而使用下面命令查看想要的结果
    3. /usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar 2>&1 | grep "Lines with a:"
    Shell 命令

    最终得到的结果如下:

    Lines with a: 58, Lines with b: 26
    

自此,你就完成了你的第一个 Spark 应用程序了。

Java独立应用编程

  1. 安装maven
    ubuntu中没有自带安装maven,需要手动安装maven。可以访问maven官方下载自己下载。这里直接给出apache-maven-3.3.9-bin.zip的下载地址,直接点击下载即可。
    选择安装在/usr/local/maven中:

    1. sudo unzip ~/下载/apache-maven-3.3.9-bin.zip -d /usr/local
    2. cd /usr/local
    3. sudo mv apache-maven-3.3.9/ ./maven
    4. sudo chown -R hadoop ./maven
    Shell 命令
  2. Java应用程序代码
    在终端执行如下命令创建一个文件夹sparkapp2作为应用程序根目录

    1. cd ~ #进入用户主文件夹
    2. mkdir -p ./sparkapp2/src/main/java
    Shell 命令

    在 ./sparkapp2/src/main/java 下建立一个名为 SimpleApp.java 的文件(vim ./sparkapp2/src/main/java/SimpleApp.java),添加代码如下:

    1. /*** SimpleApp.java ***/
    2. import org.apache.spark.api.java.*;
    3. import org.apache.spark.api.java.function.Function;
    4.  
    5. public class SimpleApp {
    6. public static void main(String[] args) {
    7. String logFile = "file:///usr/local/spark/README.md"; // Should be some file on your system
    8. JavaSparkContext sc = new JavaSparkContext("local", "Simple App",
    9. "file:///usr/local/spark/", new String[]{"target/simple-project-1.0.jar"});
    10. JavaRDD<String> logData = sc.textFile(logFile).cache();
    11.  
    12. long numAs = logData.filter(new Function<String, Boolean>() {
    13. public Boolean call(String s) { return s.contains("a"); }
    14. }).count();
    15.  
    16. long numBs = logData.filter(new Function<String, Boolean>() {
    17. public Boolean call(String s) { return s.contains("b"); }
    18. }).count();
    19.  
    20. System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    21. }
    22. }
    Java

    该程序依赖Spark Java API,因此我们需要通过Maven进行编译打包。在./sparkapp2中新建文件pom.xml(vim ./sparkapp2/pom.xml),添加内容如下,声明该独立应用程序的信息以及与Spark的依赖关系:

    <project>
        <groupId>edu.berkeley</groupId>
        <artifactId>simple-project</artifactId>
        <modelVersion>4.0.0</modelVersion>
        <name>Simple Project</name>
        <packaging>jar</packaging>
        <version>1.0</version>
        <repositories>
            <repository>
                <id>Akka repository</id>
                <url>http://repo.akka.io/releases</url>
            </repository>
        </repositories>
        <dependencies>
            <dependency> <!-- Spark dependency -->
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.0.0-preview</version>
            </dependency>
        </dependencies>
    </project>
    

    关于Spark dependency的依赖关系,可以访问The Central Repository。搜索spark-core可以找到相关依赖关系信息。

  3. 使用maven打包java程序
    为了保证maven能够正常运行,先执行如下命令检查整个应用程序的文件结构:

    1. cd ~/sparkapp2
    2. find
    Shell 命令

    文件结构如下图:

    接着,我们可以通过如下代码将这整个应用程序打包成Jar(注意:电脑需要保持连接网络的状态,而且首次运行同样下载依赖包,同样消耗几分钟的时间):

    1. /usr/local/maven/bin/mvn package
    Shell 命令

    如出现下图,说明生成Jar包成功:

  4. 通过spark-submit 运行程序
    最后,可以通过将生成的jar包通过spark-submit提交到Spark中运行,如下命令:

    1. /usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp2/target/simple-project-1.0.jar
    2. # 上面命令执行后会输出太多信息,可以不使用上面命令,而使用下面命令查看想要的结果
    3. /usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp2/target/simple-project-1.0.jar 2>&1 | grep "Lines with a"
    Shell 命令

    最后得到的结果如下:

    Lines with a: 58, Lines with b: 26
    

扩展阅读

如果想了解更多有关Spark的学习内容,请访问Spark快速入门指南 – Spark安装与基础使用,主要对官方入门教程进行了翻译,可以帮助读者快速入门Spark。