Spark大数据分析案例之平均心率检测

大数据学习路线图

案例介绍

平均心率检测案例。本案例以实验室之前发布的另一篇博客文章《Spark+Kafka构建实时分析Dashboard案例介绍》为基础,涉及模拟数据生成,数据预处理、消息队列发送和接收消息、数据实时处理、数据实时推送和实时展示等数据处理全流程,所涉及的各种典型操作涵盖Linux、Spark、Kafka、JAVA、MySQL、Ajax、Html、Css、Js、Maven等系统和软件的安装和使用方法。通过本案例,将有助于综合运用大数据课程知识以及各种工具软件,实现数据全流程操作。同时在此感谢张少坤、吴维奇和喻小丽等三位同学在创作本案例中的贡献。

实验环境准备

任务清单

  1. JDK安装
  2. Hadoop安装
  3. Maven安装
  4. Spark安装
  5. scala安装
  6. Kafka安装
  7. MySQL安装
    ### 实验系统和软件要求
    Ubuntu: 16.04
    Java : 1.8及以上版本
    Scala: 2.11.8
    Spark: 2.1.0
    kafka: 0.10.1.0
    Maven: 3.5.0
    MySQL: 5.7.18

系统和软件的安装

JDK安装和Hadoop安装

请参照本教程Hadoop安装教程_单机/伪分布式配置_Hadoop2.6.0/Ubuntu14.04,(ps:请将hadoop改为实验要求的版本号,java的版本号改为8)

Maven安装,Spark安装,scala安装

请参照本教程Spark2.1.0入门:Spark的安装和使用,本套教程涵盖了三种软件的安装
#### Kafka安装
请参照本教程Kafka的安装和简单实例测试

MySQL安装

Ubuntu安装MySQL及常用操作

构建工程架构

spark-homework该链接为案例的代码,下载后输入以下命令。如果出现无法链接到该网页,可以在导航栏里输入pan.baidu.com/s/1ciylS6域名。

  1. cd ~/下载
  2. sudo mkdir -p /usr/local/spark/sparkcode
  3. sudo tar -zxvf spark-homework.tar.gz -C /usr/local/spark/sparkcode
  4. cd /usr/local/spark/sparkcode
  5. chown hadoop:hadoop spark-homework
  6. sudo chmod 777 ./spark-homework
Shell

我们将工程目录打包至/usr/local/spark/sparkcode下,方便操作,以下是工程目录:

1. common目录存放的是整个项目所公有的类。其中程序文件夹中包含beans, config, db和utils。Beans中存放实体类,config中存放相关配置的类,db中配置数据库的连接等,utils中存放产生Json的工具类等。
2. config目录存放的是整个项目的配置参数,比如kafka生产者的地址和端口号、数据库的配置,如账户名,密码等。Web浏览器访问的地址和端口号等。
3. producer目录存放的是Kafka生产者相关内容。
4. streaming目录存放的是对kafka数据操作并写入数据库的相关文件;
5. web目录存放的含有对http响应的相关处理和资源文件。如js,css,html文件。
注:代码解释只是部分主要功能代码。
数据相关
本案例采用的数据集是由应用程序producer随机产生的。该数据集表示的正常人的心跳速率。下面列出产生的数据格式定义:
1. name | 姓名
2. rate | 心跳率
3. dt | 产生数据的时间

这个案例实时检测平均心率,因此针对每条记录,我们只需要获取name和rate即可,然后发送给Kafka,接下来Spark Streaming再接收进行处理,将其写入MySQL数据库。Web通过间隔若干时间查询某个时间段内的心跳,并对其进行可视化。

数据预处理
本案例通过producer产生数据,并将产生的数据发给kafka,接着可以写如下的Java代码,代码文件名为Run.java,本段代码在/usr/local/spark/sparkcode/spark-homework/producer/src/main/java/com/test目录下,读者可以根据后面的步骤直接运行,不需要拷贝到编译器下:

  1. package com.test;
  2. import com.test.beans.RecordBean;
  3. import com.test.config.ConfigurationFactory;
  4. import com.test.config.objects.Config;
  5. import com.test.producer.Producer;
  6. import com.test.utils.JsonUtils;
  7. import org.apache.log4j.Logger;
  8. import java.util.Random;
  9. public class Run {
  10. private static final Logger LOGGER = Logger.getLogger(Run.class);
  11. private static final Random RANDOM = new Random();
  12. private static final Config CONFIG = ConfigurationFactory.load();
  13. public static void main(String[] args) {
  14. final Producer producer = new Producer();
  15. // catches ctrl+c action
  16. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  17. LOGGER.info("Generator application stopping ...");
  18. producer.close();
  19. }));
  20. // sleep time as milliseconds for each step
  21. int sleep = RANDOM.nextInt(CONFIG.getGenerator().getRandomRange()) + 10;
  22. RecordBean record;
  23. while (true) {
  24. try {
  25. record = generate();
  26. producer.produce(record.getType().name(), JsonUtils.serialize(record));
  27. Thread.sleep(sleep);
  28. } catch (Throwable t) {
  29. LOGGER.error(t.getMessage(), t);
  30. }
  31. }
  32. }
  33.  
  34. private static RecordBean generate() {
  35. RecordBean data = new RecordBean();
  36. data.setType(RecordBean.Types.fromNumeric(RANDOM.nextInt(3)));
  37. data.setValue((RANDOM.nextFloat() * 140)+60);
  38. return data;
  39. }
  40. }
Java

上述代码很简单,首先通过随机产生RecordBean实体,然后通过Json工具进行序列化,接着每隔sleep秒发送给kafka。
Streaming操作Kafka

通过streaming操作kafka获取数据,并将数据写入MySQL数据库,代码为App.scala。ps:该代码位于/usr/local/spark/sparkcode/spark-homework/streaming/src/main/scala/com/test下。

  1. package com.test
  2. import java.io.IOException
  3. import java.sql.Timestamp
  4. import java.util.Properties
  5. import com.test.beans.RecordBean
  6. import com.test.config.ConfigurationFactory
  7. import com.test.utils.JsonUtils
  8. import org.apache.kafka.clients.consumer.ConsumerRecord
  9. import org.apache.kafka.common.serialization.StringDeserializer
  10. import org.apache.log4j.Logger
  11. import org.apache.spark.rdd.RDD
  12. import org.apache.spark.sql.types._
  13. import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
  14. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  15. import org.apache.spark.streaming.kafka010.KafkaUtils
  16. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferBrokers
  17. import org.apache.spark.streaming.{Seconds, StreamingContext}
  18. object App {
  19. private[this] lazy val logger = Logger.getLogger(getClass)
  20. private[this] val config = ConfigurationFactory.load()
  21.  
  22. def jsonDecode(text: String): RecordBean = {
  23. try {
  24. JsonUtils.deserialize(text, classOf[RecordBean])
  25. } catch {
  26. case e: IOException =>
  27. logger.error(e.getMessage, e)
  28. null
  29. }
  30. }
  31. def main(args: Array[String]): Unit = {
  32. val spark = SparkSession.builder
  33. .appName("spark-kafka-streaming-example")
  34. .master("local[*]")
  35. .getOrCreate
  36. val streaming = new StreamingContext(spark.sparkContext, Seconds(config.getStreaming.getWindow))
  37. val servers = config.getProducer.getHosts.toArray.mkString(",")
  38. val params = Map[String, Object](
  39. "bootstrap.servers" -> servers,
  40. "key.deserializer" -> classOf[StringDeserializer],
  41. "value.deserializer" -> classOf[StringDeserializer],
  42. "auto.offset.reset" -> "latest",
  43. "group.id" -> "dashboard",
  44. "enable.auto.commit" -> (false: java.lang.Boolean)
  45. )
  46.  
  47. val topics = Array(config.getProducer.getTopic)
  48.  
  49. val stream = KafkaUtils.createDirectStream[String, String](
  50. streaming, PreferBrokers, Subscribe[String, String](topics, params))
  51.  
  52. val schema = StructType(
  53. StructField("name", StringType) ::
  54. StructField("rate", FloatType) ::
  55. StructField("dt", TimestampType) :: Nil
  56. )
  57. val host = config.getStreaming.getDb.getHost
  58. val db = config.getStreaming.getDb.getDb
  59. val url = s"jdbc:mysql://$host/$db"
  60. val table = config.getStreaming.getDb.getTable
  61. val props = new Properties
  62. props.setProperty("driver", "com.mysql.jdbc.Driver")
  63. props.setProperty("user", config.getStreaming.getDb.getUser)
  64. props.setProperty("password", config.getStreaming.getDb.getPass)
  65.  
  66. type Record = ConsumerRecord[String, String]
  67. stream.foreachRDD((rdd: RDD[Record]) => {
  68. // convert string to PoJo and generate rows as tuple group
  69. val pairs = rdd
  70. .map(row => (row.timestamp(), jsonDecode(row.value())))
  71. .map(row => (row._2.getType.name(), (1, row._2.getValue, row._1)))
  72.  
  73. val flatten = pairs
  74. .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, (y._3 + x._3) / 2))
  75. .map(f => Row.fromSeq(Seq(f._1, f._2._2 / f._2._1, new Timestamp(f._2._3))))
  76.  
  77. val sql = new SQLContext(flatten.sparkContext)
  78.  
  79. sql.createDataFrame(flatten, schema)
  80. .repartition(1)![](https://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/07/修改密码.png "")
  81. .write
  82. .mode(SaveMode.Append)
  83. .jdbc(url, table, props)
  84. })
  85. // create streaming context and submit streaming jobs
  86. streaming.start()
  87. // wait to killing signals etc.
  88. streaming.awaitTermination()
  89. }
  90. }
scala

编程实现

通过streaming操作kafka获取数据后,将数据写入MySQL数据库。我们可以使用如下代码创建数据库和表。

  1. service mysql start
  2. mysql -u root -p
Shell
  1. create database if not exits dashboash_test;
  2. use dashboash_test;
  3. create TABLE IF NOT EXISTS events(
  4. name VARCHAR(24) NOT NULL DEFAULT ' ',
  5. rate FLOAT DEFAULT NULL,
  6. dt DATETIME NOT NULL,
  7. PRIMARY KEY (name,dt)
  8. )
  9. ENGINE=InnoDB DEFAULT CHARSET=utf8=utf8_bin;
mysql

再次进入/usr/local/spark/sparkcode/spark-homework/config目录下,对common.conf文件的user和pass选项进行修改。

  1. cd /usr/local/spark/sparkcode/spark-homework/config
  2. sudo vim ./common.conf
Shell


编译运行Spark项目
输入以下代码

  1. cd spark-homework
  2. sudo /usr/local/maven/bin/mvn package -DskipTests
Shell

如果如下图所示,则说明成功。

开启kafka:

  1. cd /usr/local/kafka
  2. bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties
Shell

出现以下图片则运行成功

开启spark streaming 服务并且它会从kafka主题中处理数据到MySQL。重新开启一个终端,前面的终端千万不要关闭。

  1. cd /usr/local/spark/sparkcode/spark-homework
  2. java -Dconfig=./config/common.conf -jar streaming/target/spark-streaming-0.1.jar
Shell

开启kafka producer,并且它会将事件写入kafka主题中。重新开启一个终端,前面的终端千万不要关闭。

  1. cd /usr/local/spark/sparkcode/spark-homework/
  2. java -Dconfig=./config/common.conf -jar producer/target/kafka-producer-0.1.jar
Shell

成功后如下图所示:

开启web服务器,如此可以观察dashboard。重新开启一个终端,前面的终端千万不要关闭。输入以下代码

  1. cd /usr/local/spark/sparkcode/spark-homework
  2. java -Dconfig=./config/common.conf -jar web/target/web-0.1.jar
Shell

开启成功后如下图所示:

最后在浏览器中输入http://localhost:8080/进行浏览结果。三种颜色分别代表三个人,他们在不同时刻的平均心率。