Spark上机练习题:统计微博信息

大数据技术原理与应用

本部分Spark上机练习题,是与林子雨编著《Spark编程基础》教程(官网)配套的题目,在学习完《Spark编程基础》第5章RDD编程的内容以后,可以顺利完成本题目。

【题目】Spark上机练习题:统计微博信息


假设有一个数据文件,它包含如下格式的两列数据:

11111111    12743457
11111111    16386587
11111111    19764388
11111111    12364375
11111111    13426275
11111111    12356363
11111111    13256236
11111111    10000032
11111111    10000001
11111111    10000001
11111111    10000001

其中,第1列和第2列都是表示用户ID,表中的数据是表示第1列的用户关注了第2列用户。
(1)在spark-shell交互式环境中执行,请统计出一共有多少个不同的ID;
(2)在spark-shell交互式环境中执行,统计出一共有多少个不同的(ID,ID)对;
(3)在spark-shell交互式环境中执行,统计出每个用户的粉丝数量;
(4)使用Scala语言编写独立应用程序,统计出每个用户的粉丝数量,并且把统计结果写入到HDFS文件中。

参考答案

1.假设有一个数据文件,它包含如下格式的两列数据:

11111111    12743457
11111111    16386587
11111111    19764388
11111111    12364375
11111111    13426275
11111111    12356363
11111111    13256236
11111111    10000032
11111111    10000001
11111111    10000001
11111111    10000001
11111112  12743457

其中,第1列和第2列都是表示用户ID,表中的数据是表示第1列的用户关注了第2列用户。
(1)请统计出一共有多少个不同的ID;
登录Linux系统,打开一个命令行终端(可以使用快捷键CTRL+ALT+T),进入Linux Shell环境。假设读者已经把Spark安装到了“/usr/local/spark”目录,并且读者的所有练习代码都被保存在“/usr/local/spark/mycode/exercise”目录下(如果不存在该目录,请使用mkdir命令自行创建)。下面请使用如下命令创建一个blog子目录,用来保存本练习题目的各种文件:

cd /usr/local/spark/mycode/exercise
mkdir blog

然后,在blog目录下新建一个文本文件bloginfo.txt,在该文件中录入如下数据:

11111111    12743457
11111111    16386587
11111111    19764388
11111111    12364375
11111111    13426275
11111111    12356363
11111111    13256236
11111111    10000032
11111111    10000001
11111111    10000001
11111111    10000001
11111112  12743457

然后,启动进入spark-shell交互式环境:

cd /usr/local/spark
./bin/spark-shell

然后,在spark-shell交互式环境中,执行如下命令统计出一共有多少个不同的ID:

scala> val lines = sc.textFile("/usr/local/spark/mycode/exercise/blog/bloginfo.txt")
scala> val totalUserNum = lines.flatMap(line=>line.split("\t")).map(userid=>(userid,1)).reduceByKey(_+_)
scala> totalUserNum.count()

上面代码中, lines.flatMap(line=>line.split(“\t”))操作会把lines中的所有RDD元素(每个RDD元素都是一行文本,类似“11111111 12743457”)都进行拆分,比如把“11111111 12743457”拆分成“11111111″和”12743457”。最终得到的新的RDD,每个RDD元素都是类似“11111111 “和”12743457”这样的单个字符串。然后,map(userid=>(userid,1))操作会把每个RDD元素转换成键值对形式,比如,把“11111111″和”12743457”分别转换成(“11111111”,1)和(“12743457”,1)。最后,reduceByKey(+)操作就会把按照key来对所有键值对进行归约计算,得到的RDD中的每个元素的key都是不同的,因此, totalUserNum.count()操作得到的是不同的key的数量,也就代表了不同用户的数量。

(2)统计出一共有多少个不同的(ID,ID)对;
在spark-shell交互式环境中,执行如下命令统计出一共有多少个不同的(ID,ID)对:

scala> val lines = sc.textFile("/usr/local/spark/mycode/exercise/blog/bloginfo.txt")
scala> val pairNum = lines.map(id => (id, 1)).reduceByKey(_ + _).count()

(3)统计出每个用户的粉丝数量。
bloginfo.txt中,每行数据的含义是第1列用户关注第2列用户,所以,每当用户u有一个粉丝时,就会在bloginfo.txt中存在一条表示关注信息的记录,所以,只要统计出第2列中有多少条记录包含用户u,就可以知道用户u一共有多少个粉丝。
在spark-shell交互式环境中,执行如下命令统计出一共有多少个不同的(ID,ID)对:

scala> val lines = sc.textFile("/usr/local/spark/mycode/exercise/bloginfo.txt")
scala> val totalFansNum = lines.map(line => line.split("\t")(1)).map(id => (id, 1)).reduceByKey(_ + _)
scala> totalFansNum.collect()

(4)使用Scala语言编写独立应用程序,统计出每个用户的粉丝数量,并且把统计结果写入到HDFS文件中。
使用如下命令,创建代码文件目录:

cd /usr/local/spark/mycode/exercise/blog
mkdir -p src/main/scala

然后,使用vim编辑器创建一个代码文件CountFansNum.scala,命令如下:

cd /usr/local/spark/mycode/exercise/blog/src/main/scala
vim CountFansNum.scala

在CountFansNum.scala中输入如下代码:

//CountFansNum.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object CountFansNum{
    def main(args:Array[String]){
        if (args.length<2){
            println("Usage: CountFansNum inputfile outputfile")
            System.exit(1)
}
        val conf = new SparkConf().setAppName("CountFansNum").setMaster("local")
        val sc = new SparkContext(conf)
        val lines = sc.textFile(args(0)).filter(line=>line.trim().length>0)
    val totalFansNum = lines.map(line=>line.split(" ")(1)).map(id=>(id,1)).reduceByKey(_+_) 
    totalFansNum.saveAsTextFile(args(1))    
}
}

然后,保存文件并退出vim编辑器。
下面,需要在“/usr/local/spark/mycode/exercise/blog”目录下新建一个 simple.sbt文件,用来支持sbt打包编译,命令如下:

cd /usr/local/spark/mycode/exercise/blog
vim simple.sbt

然后,在simple.sbt文件中输入如下内容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

下面,使用sbt编译打包工具对代码文件进行编译打包,命令如下:

cd /usr/local/spark/mycode/exercise/blog
/usr/local/sbt/sbt package

然后,新建一个终端,在里面输入如下命令启动Hadoop:

cd /usr/local/hadoop
./sbin/start-dfs.sh

下面,可以运行程序,对输入数据bloginfo.txt进行统计,并把结果输出到分布式文件系统HDFS中,具体命令如下:

cd /usr/local/spark/mycode/exercise/blog
/usr/local/spark/bin/spark-submit  \
>--class "CountFansNum"  \
> /usr/local/spark/mycode/exercise/blog/target/scala-2.11/simple-project_2.11-1.0.jar  \
> file:///usr/local/spark/mycode/exercise/blog/bloginfo.txt   \
> hdfs://localhost:9000/user/hadoop/CountFansNumResult.txt

然后,可以采用如下命令查看HDFS中的CountFansNumResult.txt文件内容:

cd /usr/local/hadoop
./bin/hdfs dfs -cat /user/hadoop/CountFansNumResult.txt/*