Spark上机练习题:统计人口性别和身高

大数据学习路线图

本部分Spark上机练习题,是与林子雨编著《Spark编程基础》教程(官网)配套的题目,在学习完《Spark编程基础》第5章RDD编程的内容以后,可以顺利完成本题目。

【题目】Spark上机练习题:统计人口性别和身高


(1)请编写Spark应用程序,该程序可以在本地文件系统中生成一个数据文件peopleinfo.txt,该文件包含了序号、性别和身高三个列,形式如下:

1    F    170
2    M    178
3    M    174
4    F    165

(2)编写Spark应用程序,该程序对HDFS文件中的数据文件peopleinfo.txt进行统计,计算得到男性总数、女性总数、男性最高身高、女性最高身高、男性最低身高、女性最低身高。

参考答案

(1)请编写Spark应用程序,该程序可以在本地文件系统中生成一个数据文件peopleinfo.txt,该文件包含了序号、性别和身高三个列。
登录Linux系统,打开一个命令行终端(可以使用快捷键CTRL+ALT+T),进入Linux Shell环境。假设读者已经把Spark安装到了“/usr/local/spark”目录,并且读者的所有练习代码都被保存在“/usr/local/spark/mycode/exercise”目录下(如果不存在该目录,请使用mkdir命令自行创建)。下面请使用如下命令创建一个peopleinfo子目录,用来保存本练习题目的各种文件:

cd /usr/local/spark/mycode/exercise
mkdir peopleinfo

下面,请在peopleinfo目录下建立src/main/scala代码目录,命令如下:

cd /usr/local/spark/mycode/exercise/peopleinfo
mkdir -p src/main/scala

这个目录是专门用来保存scala代码文件的。下面创建一个代码文件GeneratePeopleInfoHDFS.scala,用来生成数据文件peopleinfo.txt,命令如下:

cd src/main/scala
vim GeneratePeopleInfoHDFS.scala

执行上述命令以后,会打开vim编辑器(如果不会使用vim编辑器,可以参考vim编辑器的使用方法),然后,可以在这个代码文件GeneratePeopleInfoHDFS.scala中输入如下代码:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.Random

object GeneratePeopleInfoHDFS {
    def getRandomGender():String={
    val rand = new Random()
    val randNum = rand.nextInt(2)+1
    if (randNum % 2 == 0) {"M"} else {"F"}
}
    def main(args: Array[String]) {
        val outputFile =  "hdfs://localhost:9000/user/hadoop/peopleinfo.txt"
        //val outputFile1 = "file:///usr/local/spark/mycode/exercise/peopleage/peopleage1.txt"
    val conf = new SparkConf().setAppName("GeneratePeopleAgeHDFS").setMaster("local[2]")
        val sc = new SparkContext(conf)
            val rand = new Random()
        val array = new Array[String](1000)

        for (i<-1 to 1000){
        var height = rand.nextInt(230)
        if (height<50) {height = height + 50}
        var gender = getRandomGender()
        if (height <100 && gender == "M") {height = height+100}
        if (height <100 && gender == "F") {height = height+50}
                array(i-1)=i+" "+gender+" "+height
}
                val rdd = sc.parallelize(array)
        rdd.foreach(println)
        rdd.saveAsTextFile(outputFile)      
    }
}

然后,保存文件并退出vim编辑器。
下面,需要在“/usr/local/spark/mycode/exercise/peopleinfo”目录下新建一个 simple.sbt文件,用来支持sbt打包编译,命令如下:

cd /usr/local/spark/mycode/exercise/peopleinfo
vim simple.sbt

然后,在simple.sbt文件中输入如下内容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

下面,使用sbt编译打包工具对代码文件进行编译打包,命令如下:

cd /usr/local/spark/mycode/exercise/peopleinfo
/usr/local/sbt/sbt package

打包成功以后,可以使用如下命令运行程序,生成数据文件:

cd /usr/local/spark/mycode/exercise/peopleinfo
/usr/local/spark/bin/spark-submit   \
> --class "GeneratePeopleInfoHDFS"  \
> /usr/local/spark/mycode/exercise/peopleinfo/target/scala-2.11/simple-project_2.11-1.0.jar

执行结束以后,可以看到,已经生成了数据文件peopleage.txt,这个文件是在HDFS中,可以使用如下命令查看文件内容:

cd /usr/local/hadoop
./bin/hdfs dfs -cat /user/hadoop/peopleinfo.txt/*

(2)编写Spark应用程序,该程序对HDFS文件中的数据文件peopleinfo.txt进行统计,计算得到男性总数、女性总数、男性最高身高、女性最高身高、男性最低身高、女性最低身高。

创建代码文件CountPeopleInfo.scala,命令如下:

cd /usr/local/spark/mycode/exercise/peopleinfo
cd src/main/scala
vim CountPeopleInfo.scala

在CountPeopleInfo.scala代码文件中输入如下代码:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object CountPeopleInfo{
    def main(args:Array[String]){
        if (args.length<1){
            println("Usage: CountPeopleInfo inputdatafile")
            System.exit(1)
}
        val conf = new SparkConf().setAppName("CountPeopleInfo").setMaster("local")
        val sc = new SparkContext(conf)
        val lines = sc.textFile(args(0),3)
        val maleInfo = lines.filter(line=>line.contains("M")).map(line=>line.split(" ")).map(t=>(t(1)+" "+t(2)))
val femaleInfo = lines.filter(line=>line.contains("F")).map(line=>line.split(" ")).map(t=>(t(1)+" "+ t(2)))
    val maleHeightInfo = maleInfo.map(t=>t.split(" ")(1).toInt)
    val femaleHeightInfo = femaleInfo.map(t=>t.split(" ")(1).toInt)
    val lowestMale = maleHeightInfo.sortBy(x=>x,true).first()
    val lowestFemale = femaleHeightInfo.sortBy(p=>p,true).first()
    val highestMale = maleHeightInfo.sortBy(p=>p,false).first()
    val highestFemale = femaleHeightInfo.sortBy(p=>p,false).first()
    println("Number of Male:"+ maleInfo.count())
    println("Number of Female:"+femaleInfo.count())
    println("Lowest Male:"+lowestMale)
    println("Lowest Female:"+lowestFemale)
    println("HighestMale:"+highestMale)
    println("HighestFemale:"+highestFemale)
}
}

保存代码文件并退出vim编辑器。然后,执行如下命令对代码进行编译打包:

cd /usr/local/spark/mycode/exercise/peopleinfo
/usr/local/sbt/sbt package

打包成功以后,可以使用如下命令运行程序,得到统计结果:

cd /usr/local/spark/mycode/exercise/peopleinfo
/usr/local/spark/bin/spark-submit   \
> --class "CountPeopleInfo"  \
> /usr/local/spark/mycode/exercise/peopleage/target/scala-2.11/simple-project_2.11-1.0.jar  \
> hdfs://localhost:9000/user/hadoop/peopleinfo.txt

执行以后,就可以得到类似如下的统计结果:

Number of Male:524
Number of Female:476
Lowest Male:101
Lowest Female:100
HighestMale:228
HighestFemale:229