Spark上机练习题:统计人口平均年龄

2019年1月20日寒假大数据师资培训班

本部分Spark上机练习题,是与林子雨编著《Spark编程基础》教程(官网)配套的题目,在学习完《Spark编程基础》第5章RDD编程的内容以后,可以顺利完成本题目。

【题目】Spark上机练习题:统计人口平均年龄


(1)请编写Spark应用程序,该程序可以在本地文件系统中生成一个数据文件peopleage.txt,数据文件包含若干行(比如1000行,或者100万行等等)记录,每行记录只包含两列数据,第1列是序号,第2列是年龄。效果如下:

1    89
2    67
3    69
4    78

(2)请编写Spark应用程序,对本地文件系统中的数据文件peopleage.txt的数据进行处理,计算出所有人口的平均年龄。
(3)请编写Spark应用程序,该程序可以在分布式文件系统HDFS中生成一个数据文件peopleage.txt,数据文件包含若干行(比如1000行,或者100万行等等)记录,每行记录只包含两列数据,第1列是序号,第2列是年龄。效果如下:

1    89
2    67
3    69
4    78

(4)请编写Spark应用程序,对分布式文件系统HDFS中的数据文件peopleage.txt的数据进行处理,计算出所有人口的平均年龄。

参考答案

(1)请编写Spark应用程序,该程序可以在本地文件系统中生成一个数据文件peopleage.txt,数据文件包含若干行(比如1000行,或者100万行等等)记录,每行记录只包含两列数据,第1列是序号,第2列是年龄。

登录Linux系统,打开一个命令行终端(可以使用快捷键CTRL+ALT+T),进入Linux Shell环境。假设读者已经把Spark安装到了“/usr/local/spark”目录,并且读者的所有练习代码都被保存在“/usr/local/spark/mycode/exercise”目录下(如果不存在该目录,请使用mkdir命令自行创建)。下面请使用如下命令创建一个peopleage子目录,用来保存本练习题目的各种文件:

cd /usr/local/spark/mycode/exercise
mkdir peopleage

下面,请在peopleage目录下建立src/main/scala代码目录,命令如下:

cd /usr/local/spark/mycode/exercise/peopleage
mkdir -p src/main/scala

这个目录是专门用来保存scala代码文件的。下面创建一个代码文件GeneratePeopleAge.scala,用来生成数据文件peopleage.txt,命令如下:

cd src/main/scala
vim GeneratePeopleAge.scala

执行上述命令以后,会打开vim编辑器(如果不会使用vim编辑器,可以参考vim编辑器的使用方法),然后,可以在这个代码文件GeneratePeopleAge.scala中输入如下代码:

//代码文件GeneratePeopleAge.scala
import java.io.FileWriter
import java.io.File
import scala.util.Random

object GeneratePeopleAge{

    def main(args:Array[String]){
            val fileWriter = new FileWriter(new File("/usr/local/spark/mycode/exercise/peopleage/peopleage.txt"),false)
            val rand = new Random()
            for (i <- 1 to 1000){//这里是生成数据的行数
                fileWriter.write(i+" "+rand.nextInt(100))
                fileWriter.write(System.getProperty("line.separator"))
}
        fileWriter.flush()
        fileWriter.close()
}
}

然后,保存文件并退出vim编辑器。
下面,需要在“/usr/local/spark/mycode/exercise/peopleage”目录下新建一个 simple.sbt文件,用来支持sbt打包编译,命令如下:

cd /usr/local/spark/mycode/exercise/peopleage
vim simple.sbt

然后,在simple.sbt文件中输入如下内容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

下面,使用sbt编译打包工具对代码文件进行编译打包,命令如下:

cd /usr/local/spark/mycode/exercise/peopleage
/usr/local/sbt/sbt package

打包成功以后,可以使用如下命令运行程序,生成数据文件:

cd /usr/local/spark/mycode/exercise/peopleage
/usr/local/spark/bin/spark-submit   \
> --class "GeneratePeopleAge"  \
> /usr/local/spark/mycode/exercise/peopleage/target/scala-2.11/simple-project_2.11-1.0.jar

执行结束以后,可以看到,已经生成了数据文件“/usr/local/spark/mycode/exercise/peopleage/peopleage.txt”。
可以使用如下命令查看文件内容:

cd /usr/local/spark/mycode/exercise/peopleage
cat peopleage.txt

(2)请编写Spark应用程序,对本地文件系统中的数据文件peopleage.txt的数据进行处理,计算出所有人口的平均年龄。

创建代码文件CountAvgAge.scala,命令如下:

cd /usr/local/spark/mycode/exercise/peopleage
cd src/main/scala
vim CountAvgAge.scala

在CountAvgAge.scala代码文件中输入如下代码:

//CountAvgAge.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object CountAvgAge{
    def main(args:Array[String]){
        if (args.length<1){
            println("Usage: CountAvgAge inputdatafile")
            System.exit(1)
}
        val conf = new SparkConf().setAppName("Count Average Age")
        val sc = new SparkContext(conf)
        val lines = sc.textFile(args(0),3)
        val count = lines.count()
        val totalAge = lines.map(line=>line.split(" ")(1)).map(t=>t.trim.toInt).collect().reduce((a,b)=>a+b)
        println("Total Age is: "+totalAge+"; Number of People is:"+count)
        val avgAge : Double = totalAge.toDouble / count.toDouble
        println("Average Age is:"+avgAge)
}
}

保存代码文件并退出vim编辑器。然后,执行如下命令对代码进行编译打包:

cd /usr/local/spark/mycode/exercise/peopleage
/usr/local/sbt/sbt package

打包成功以后,可以使用如下命令运行程序,得到统计结果:

cd /usr/local/spark/mycode/exercise/peopleage
/usr/local/spark/bin/spark-submit   \
> --class "CountAvgAge"  \
> /usr/local/spark/mycode/exercise/peopleage/target/scala-2.11/simple-project_2.11-1.0.jar  \
> file:///usr/local/spark/mycode/exercise/peopleage/peopleage.txt

执行以后,就可以得到类似如下的统计结果:

Total Age is:48047; Number of People is:1000
Average Age is:48.047

(3)请编写Spark应用程序,该程序可以在分布式文件系统HDFS中生成一个数据文件peopleage.txt,数据文件包含若干行(比如1000行,或者100万行等等)记录,每行记录只包含两列数据,第1列是序号,第2列是年龄。
请新建一个终端(可以使用快捷方式CTRL+ALT+T),进入Linux Shell环境,然后,启动Hadoop,命令如下:

cd /usr/local/hadoop
./bin/start-dfs.sh

启动结束后,使用如下命令,查看是否启动成功:

jps

如果可以看到DataNode、NameNode和SecondaryNameNode三个进程,就说明启动成功了。
下面请在分布式文件系统中HDFS中查询一下是否存在“/user/hadoop”目录,可以使用如下命令:

cd  /usr/local/hadoop
./bin/hdfs dfs -ls /user/hadoop

如果该目录不存在,系统会提示你该目录不存在。如果已经存在,则不需要新建该目录,如果不存在,则可以使用如下命令创建该目录:

cd  /usr/local/hadoop
./bin/hdfs dfs -mkdir -p /user/hadoop

然后,就可以编写Spark程序,向HDFS中写入一个数据文件,命令如下:

cd /usr/local/spark/mycode/exercise/peopleage
cd src/main/scala
vim GeneratePeopleAgeHDFS.scala

在GeneratePeopleAgeHDFS.scala代码文件中输入下以下代码:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.Random

object GeneratePeopleAgeHDFS {
    def main(args: Array[String]) {
        val outputFile =  "hdfs://localhost:9000/user/hadoop/peopleage.txt"
        //val outputFile1 = "file:///usr/local/spark/mycode/exercise/peopleage/peopleage1.txt"
    val conf = new SparkConf().setAppName("GeneratePeopleAgeHDFS").setMaster("local[2]")
        val sc = new SparkContext(conf)
            val rand = new Random()
        val array = new Array[String](1000)
        for (i<-1 to 1000){
            array(i-1)=i+" "+rand.nextInt(100)
}
                val rdd = sc.parallelize(array)
        rdd.foreach(println)
        rdd.saveAsTextFile(outputFile)      
    }
}

下面,使用sbt编译打包工具对代码文件进行编译打包,命令如下:

cd /usr/local/spark/mycode/exercise/peopleage
/usr/local/sbt/sbt package

打包成功以后,可以使用如下命令运行程序,生成数据文件:

cd /usr/local/spark/mycode/exercise/peopleage
/usr/local/spark/bin/spark-submit   \
> --class "GeneratePeopleAgeHDFS"  \
> /usr/local/spark/mycode/exercise/peopleage/target/scala-2.11/simple-project_2.11-1.0.jar

执行结束以后,可以看到,已经生成了数据文件peopleage.txt,这个文件是在HDFS中,可以使用如下命令查看文件内容:

cd /usr/local/hadoop
./bin/hdfs dfs -cat /user/hadoop/peopleage.txt/*

(4)请编写Spark应用程序,对分布式文件系统HDFS中的数据文件peopleage.txt的数据进行处理,计算出所有人口的平均年龄。
可以直接使用前面第(2)步已经写好的代码文件CountAvgAge.scala,由于此前已经编译打包了,所以,现在可以直接运行程序,可以使用如下命令运行程序,得到统计结果:

cd /usr/local/spark/mycode/exercise/peopleage
/usr/local/spark/bin/spark-submit   \
> --class "CountAvgAge"  \
> /usr/local/spark/mycode/exercise/peopleage/target/scala-2.11/simple-project_2.11-1.0.jar  \
> /user/hadoop/peopleage.txt

注意,上面文件的路径是“/user/hadoop/peopleage.txt”,就是表示访问的是HDFS文件,和下面的第二种路径格式是等价的:

cd /usr/local/spark/mycode/exercise/peopleage
/usr/local/spark/bin/spark-submit   \
> --class "CountAvgAge"  \
> /usr/local/spark/mycode/exercise/peopleage/target/scala-2.11/simple-project_2.11-1.0.jar  \
> hdfs://localhost:9000/user/hadoop/peopleage.txt

也和下面的第三种路径形式也是等价的:

cd /usr/local/spark/mycode/exercise/peopleage
/usr/local/spark/bin/spark-submit   \
> --class "CountAvgAge"  \
> /usr/local/spark/mycode/exercise/peopleage/target/scala-2.11/simple-project_2.11-1.0.jar  \
> peopleage.txt

执行以后,就可以得到类似如下的统计结果:

Total Age is:48047; Number of People is:1000
Average Age is:48.047