Spark大数据分析案例之平均心率检测(2022版)

大数据学习路线图

案例介绍

本案例以《Spark+Kafka构建实时分析Dashboard案例介绍》为基础,且在之前已有一个2017年的版本《Spark大数据分析案例之平均心率检测》,但由于该版所使用的Kafka、Spark版本较旧,现着手推出以Spark3.2.0、Kafka2.6.0为基础的实时分析Dashboard案例(即2022版)。

该案例涉及模拟数据生成,数据预处理、消息队列发送和接收消息、数据实时处理、数据实时推送和实时展示等数据处理全流程,所涉及的各种典型操作涵盖Linux、Spark、Kafka、JAVA、MySQL、Ajax、Html、Css、Js、Maven等系统和软件的安装和使用方法。通过本案例,将有助于综合运用大数据课程知识以及各种工具软件,实现数据全流程操作。
本教程具体运行环境如下:
* Ubuntu 18.04
* Hadoop 3.1.3
* JavaJDK 1.8
* Spark 3.2.0
* Maven 3.6.3
* MySQL 5.7.18
* Scala 2.12
* Kafka 2.6.0

一、环境安装

1.Ubuntu 18.04安装、Java安装和Hadoop安装

如果你的计算机上已经安装了Hadoop,本步骤可以略过。如果没有安装Hadoop,请访问Hadoop3.1.3安装教程_单机/伪分布式配置_Hadoop3.1.3/Ubuntu18.04(16.04),依照教程学习安装即可。注意,在这个Hadoop安装教程中,就包含了Ubuntu环境安装、Java的安装,所以,按照这个教程,就可以完成Ubuntu、JDK和Hadoop这三者的安装。

2.Spark安装、Maven安装、Scala安装

如果你的计算机上已经安装了Spark与Maven,本步骤可以略过。如果没有安装,请访问
Spark2.1.0入门:Spark的安装与使用,依照本教程学习安装即可。
注意:
1. 上述教程所安装版本为Spark2.1.0非本次案例所用版本,我们只需将Spark2.1.0换成Spark3.2.0即可,只需替换压缩包,安装方法不变。Spark3.2.0压缩包可以点击这里到百度云盘下载(提取码:ziyu)
2. Spark3.2.0自带Scala2.12,所以无需再安装Scala,Maven的安装位于上述教程的最后部分,请各位记得往下翻。

3.Kafka安装

如果你的计算机上已经安装了Kafka2.6.0,本步骤可以略过。如果安装的版本较老,请先删除旧版本sudo rm -rf /usr/local/kafka #你的Kafka路径,然后访问Kafka的安装和简单案例测试。注意上述教程所安装的Kafka版本非本案例所用版本,请访问Kafka2.6.0官方下载页面

4.MySQL安装

如果你的计算机上已经安装了MySQL,本步骤可以略过。如果没有安装,请访问Ubuntu安装MySQL及常用操作,注意,Ubuntu 18.04安装MySQL时,可能不需要你设置密码,,所以在登录数据库时会出现Mysql:ERROR 1698 (28000): Access denied for user 'root'@'localhost',点击此处查看解决方法。(ps:请记住你设置的密码)

二、构建工程架构

1.代码解析

点击这里从百度网盘下载spark-homework压缩包,这是本案例的代码。下载后输入下列命令。

cd ~/Downloads
sudo mkdir -p /usr/local/spark/sparkcode
sudo tar -zxvf spark-homework.tar.gz -C /usr/local/spark/sparkcode
cd /usr/local/spark/sparkcode
chown hadoop:hadoop spark-homework
sudo chmod 777 ./spark-homework

我们将工程目录打包至/usr/local/spark/sparkcode下,方便操作,以下是工程目录
工程结构说明:
1. common目录存放的是整个项目所公有的类。其中程序文件夹中包含beans, config, db和utils。Beans中存放实体类,config中存放相关配置的类,db中配置数据库的连接等,utils中存放产生Json的工具类等。
2. config目录存放的是整个项目的配置参数,比如kafka生产者的地址和端口号、数据库的配置,如账户名,密码等。Web浏览器访问的地址和端口号等。
3. producer目录存放的是Kafka生产者相关内容。
4. streaming目录存放的是对kafka数据操作并写入数据库的相关文件;
5. web目录存放的含有对http响应的相关处理和资源文件。如js,css,html文件。
注:代码解释只是部分主要功能代码。
数据相关
本案例采用的数据集是由应用程序producer随机产生的。该数据集表示的正常人的心跳速率。下面列出产生的数据格式定义:
1. name | 姓名
2. rate | 心跳率
3. dt | 产生数据的时间

这个案例实时检测平均心率,因此针对每条记录,我们只需要获取name和rate即可,然后发送给Kafka,接下来Spark Streaming再接收进行处理,将其写入MySQL数据库。Web通过间隔若干时间查询某个时间段内的心跳,并对其进行可视化。

数据预处理
本案例通过producer产生数据,并将产生的数据发给kafka,接着可以写如下的Java代码,代码文件名为Run.java,本段代码在/usr/local/spark/sparkcode/spark-homework/producer/src/main/java/com/test目录下,读者可以根据后面的步骤直接运行,不需要拷贝到编译器下:

package com.test;
import com.test.beans.RecordBean;
import com.test.config.ConfigurationFactory;
import com.test.config.objects.Config;
import com.test.producer.Producer;
import com.test.utils.JsonUtils;
import org.apache.log4j.Logger;
import java.util.Random;
public class Run {
    private static final Logger LOGGER = Logger.getLogger(Run.class);
    private static final Random RANDOM = new Random();
    private static final Config CONFIG = ConfigurationFactory.load();
    public static void main(String[] args) {
        final Producer producer = new Producer();
        // catches ctrl+c action
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            LOGGER.info("Generator application stopping ...");
            producer.close();
        }));
        // sleep time as milliseconds for each step
        int sleep = RANDOM.nextInt(CONFIG.getGenerator().getRandomRange()) + 10;
        RecordBean record;
        while (true) {
            try {
                record = generate();
                producer.produce(record.getType().name(), JsonUtils.serialize(record));
                Thread.sleep(sleep);
            } catch (Throwable t) {
                LOGGER.error(t.getMessage(), t);
            }
        }
    }
    private static RecordBean generate() {
        RecordBean data = new RecordBean();
        data.setType(RecordBean.Types.fromNumeric(RANDOM.nextInt(3)));
        data.setValue((RANDOM.nextFloat() * 140)+60);
        return data;
    }
}

上述代码很简单,首先通过随机产生RecordBean实体,然后通过Json工具进行序列化,接着每隔sleep秒发送给kafka。

Streaming操作Kafka
通过streaming操作kafka获取数据,并将数据写入MySQL数据库,代码为App.scala。ps:该代码位于/usr/local/spark/sparkcode/spark-homework/streaming/src/main/scala/com/test下。

package com.test
import java.io.IOException
import java.sql.Timestamp
import java.util.Properties
import com.test.beans.RecordBean
import com.test.config.ConfigurationFactory
import com.test.utils.JsonUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferBrokers
import org.apache.spark.streaming.{Seconds, StreamingContext}
object App {
  private[this] lazy val logger = Logger.getLogger(getClass)
  private[this] val config = ConfigurationFactory.load()

  def jsonDecode(text: String): RecordBean = {
    try {
      JsonUtils.deserialize(text, classOf[RecordBean])
    } catch {
      case e: IOException =>
        logger.error(e.getMessage, e)
        null
    }
  }
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("spark-kafka-streaming-example")
      .master("local[*]")
      .getOrCreate
    val streaming = new StreamingContext(spark.sparkContext, Seconds(config.getStreaming.getWindow))
    val servers = config.getProducer.getHosts.toArray.mkString(",")
    val params = Map[String, Object](
      "bootstrap.servers" -> servers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest",
      "group.id" -> "dashboard",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array(config.getProducer.getTopic)

    val stream = KafkaUtils.createDirectStream[String, String](
      streaming, PreferBrokers, Subscribe[String, String](topics, params))

    val schema = StructType(
      StructField("name", StringType) ::
        StructField("rate", FloatType) ::
        StructField("dt", TimestampType) :: Nil
    )
    val host = config.getStreaming.getDb.getHost
    val db = config.getStreaming.getDb.getDb
    val url = s"jdbc:mysql://$host/$db"
    val table = config.getStreaming.getDb.getTable
    val props = new Properties
    props.setProperty("driver", "com.mysql.jdbc.Driver")
    props.setProperty("user", config.getStreaming.getDb.getUser)
    props.setProperty("password", config.getStreaming.getDb.getPass)

    type Record = ConsumerRecord[String, String]
    stream.foreachRDD((rdd: RDD[Record]) => {
      // convert string to PoJo and generate rows as tuple group
      val pairs = rdd
        .map(row => (row.timestamp(), jsonDecode(row.value())))
        .map(row => (row._2.getType.name(), (1, row._2.getValue, row._1)))

      val flatten = pairs
        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, (y._3 + x._3) / 2))
        .map(f => Row.fromSeq(Seq(f._1, f._2._2 / f._2._1, new Timestamp(f._2._3))))

      val sql = new SQLContext(flatten.sparkContext)

      sql.createDataFrame(flatten, schema)
        .repartition(1)![](https://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/07/修改密码.png "")
        .write
        .mode(SaveMode.Append)
        .jdbc(url, table, props)
    })
    // create streaming context and submit streaming jobs
    streaming.start()
    // wait to killing signals etc.
    streaming.awaitTermination()
  }
}

Web读取MySQL数据并展示
MetricsResource.java文件负责连接数据库并从数据库中读取数据,该文件内容如下:

package com.test.http.resources;

import com.test.db.MySqlConnection;
import com.test.http.responses.MessageResponse;
import com.test.http.responses.MetricResponse;
import com.test.http.utils.ResponseUtils;
import com.test.http.utils.Types;

import org.apache.log4j.Logger;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.Calendar;
import java.util.TimeZone;
@Path("/metrics")
public class MetricsResource {
    private static final Logger LOGGER = Logger.getLogger(MetricsResource.class);

    @GET
    @Produces(Types.JSON)
    public Response getDashboardMetric(
            @QueryParam("start") long start,
            @QueryParam("end") long end,
            @QueryParam("delay") int millis) {
        Connection connection = null;
        PreparedStatement st = null;
        ResultSet rs = null;
        TimeZone.setDefault(TimeZone.getTimeZone("GMT+8"));

        try {
            if (start == 0 || end == 0) {
                final Timestamp now = new Timestamp(System.currentTimeMillis());

                end = now.getTime();
                start = end - millis;
            }

            long temp = new Timestamp(start).getTime();
            final Timestamp startDate = new Timestamp(temp);
            final Timestamp endDate = new Timestamp(temp+millis);


            connection = MySqlConnection.getConnection();

            String sql = "SELECT name, AVG(rate) AS rate, from_unixtime(AVG(unix_timestamp(dt))) AS dt " +
                    "FROM events WHERE dt BETWEEN '"+startDate.toString()+"' AND '"+endDate.toString()+"' GROUP BY name ORDER BY name ASC";

            st = connection.prepareStatement(sql);
            //TimeZone.setDefault(TimeZone.getTimeZone("GMT+8"));
            //st.setTimestamp(1, startDate,Calendar.getInstance(TimeZone.getTimeZone("GMT+8")));
            //st.setTimestamp(1, startDate);
            //st.setTimestamp(2, endDate);
            //st.setString(1,startDate.toString());
            //st.setString(2,endDate.toString());

            rs = st.executeQuery();

            final MetricResponse response = new MetricResponse();

            Timestamp lastTime = null;
            while (rs.next()) {
                MetricResponse.Metric metric = new MetricResponse.Metric();
                metric.setName(rs.getString("name"));
                metric.setRate(rs.getFloat("rate"));
                metric.setDate(rs.getTimestamp("dt").getTime() / 1000);
                response.addMetric(metric);

                if (lastTime == null || lastTime.getTime() < rs.getTimestamp("dt").getTime()) {
                    lastTime = rs.getTimestamp("dt");
                }
            }

            if (lastTime == null) {
                lastTime = endDate;
            }

            response.setLastTime(lastTime);
            return Response.ok().entity(ResponseUtils.toJson(response)).build();
        } catch (SQLException e) {
            LOGGER.error(e.getMessage(), e);

            return Response.status(500)
                    .entity(ResponseUtils.toJson(new MessageResponse(false, "connection error")))
                    .build();
        } finally {
            MySqlConnection.close(rs);
            MySqlConnection.close(st);
            MySqlConnection.close(connection);
        }
    }
}

上述文件通过定义字符串类型的sql的查询语句,从数据库中读取相关的数据。

main.html文件负责处理数据并展示效果,该文件中的代码内容如下:

<!doctype html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport"
          content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>dashboard</title>

    <link href="/assets/css/bootstrap.min.css" rel="stylesheet">
    <link href="/assets/css/bootstrap-responsive.min.css" rel="stylesheet">
    <link href="/assets/css/epoch.min.css" rel="stylesheet">
</head>
<body>

<div class="navbar navbar-inverse navbar-fixed-top">
    <div class="navbar-inner">
        <div class="container">
            <button type="button" class="btn btn-navbar" data-toggle="collapse"
                    data-target=".nav-collapse">
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
            </button>

            <a class="brand" href="/">Dashboard</a>

            <div class="nav-collapse collapse">
                <ul class="nav">
                    <li class="">
                        <a href="/">Home</a>
                    </li>
                    <li class="">
                        <a href="/install">Installation</a>
                    </li>
                    <li class="">
                        <a href="/architecture">Architecture</a>
                    </li>
                </ul>
            </div>
        </div>
    </div>

    <div class="container">
        <div class="row">
            <div class="span2"></div>

            <div class="span8">
                <div id="graph" class="epoch"></div>
                <div class="message"></div>
            </div>

            <div class="span2"></div>
        </div>
    </div>
</div>

</body>

<style>
    #graph {
        width: 100%;
        height: 400px;
        border: none;
        margin-top: 60px;
    }

    .message {
        color: darkred;
        font: 14px "Helvetica Neue", helvetica, arial, sans-serif;
    }
</style>

<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/d3.min.js"></script>
<script type="text/javascript" src="/assets/js/epoch.min.js"></script>

<script>
    $(function () {
        var delay = 5000;
        var now = parseInt(new Date().getTime() / 1000);

        // default value
        var def = {time: now, y: 0};

        // epoch data definition
        var data = [
            {
                label: 'zsk',
                values: []
            },
            {
                label: 'wwq',
                values: []
            },
            {
                label: 'yxl',
                values: []
            }
        ];

        // epoch meta data
        var meta_data = {
            type: 'time.area',
            data: data,
            axes: ['left', 'right', 'bottom'],
            windowSize: 10,
            time: {time: 10, right: 5, left: 5},
            historySize: 240
        };

        // create epoch graph
        var graph = $('#graph').epoch(meta_data);

        var start = 0;
        var end = 0;

        function receive() {
            // use ajax to pull database records
            var meta = {
                url: "/metrics",
                timeout: delay,
                async: false,
                data: {start: start, end: end, delay: delay}
            };

            $.ajax(meta).done(push);
        }

        function push(results) {
            if (typeof results.lastTime === "undefined") {
                return;
            }

            if (start == 0) {
                start = results.lastTime;
            } else {
                start = end;
            }

            if (typeof results.metrics === "undefined") {
                return;
            }

            end = start + delay;

            var new_data = [];

            for (var i = 0; i < data.length; i++) {
                for (var j = 0; j < results.metrics.length; j++) {
                    if (results.metrics[j].name == data[i].label) {
                        // add new record to element group
                        // each element is specific market data
                        new_data[j] = {
                            time: results.metrics[j].date,
                            y: results.metrics[j].rate
                        };
                    }
                }

                // use default data if one of markets is missing
                // just for test data
                if (typeof new_data[i] == "undefined") {
                    new_data[i] = def;
                    new_data[i].time = parseInt(start / 1000);
                }
            }

            // send records to graph
            graph.push(new_data);
        }

        // runs receive method each {delay} milliseconds
        setInterval(receive, delay);

        // first execution
        receive();
    });
</script>
</html>

上述代码中function定义了一个数据结构用来存放从MySQL中读取的数据、实时更新数据并展示。

三、编程实现

1.数据库创建

通过streaming操作kafka获取数据后,将数据写入MySQL数据库。我们可以使用如下代码创建数据库和表。

service mysql start
mysql -u root -p
drop database dashboard_test;
create database dashboard_test;
use dashboard_test;
create TABLE IF NOT EXISTS events(
     name VARCHAR(24) NOT NULL DEFAULT  ' ',
     rate FLOAT DEFAULT NULL,
     dt DATETIME NOT NULL,
     PRIMARY KEY (name,dt)
     )ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

再次进入/usr/local/spark/sparkcode/spark-homework/config目录下,对common.conf文件的user和pass选项进行修改。

cd /usr/local/spark/sparkcode/spark-homework/config
sudo vim  ./common.conf


注意:此处的user一般为root,密码为你登录MySQL用户root的密码,我这里设置的时“123456”,请更改为你自己的密码,其他地方无需变化。

2.打包Spark项目

这里我们用maven打包编译spark-homework工程,输入下列代码

cd /usr/local/spark/sparkcode/spark-homework
/usr/local/maven/bin/mvn package -DskipTests

此处由于需要下载maven的包,所需时间较长,切记不可退出,在等待的时候,不妨站起身来活动一下筋骨^^
如果打包成功,则会出现下图提示

3.运行Spark项目

开启Kafka

cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties

出现下图说明开启成功

开启Spark-Streaming
开启spark streaming 服务并且它会从kafka主题中处理数据到MySQL。重新开启一个终端,前面的终端千万不要关闭。

cd /usr/local/spark/sparkcode/spark-homework
java -Dconfig=./config/common.conf -jar streaming/target/spark-streaming-0.1.jar

若Spark-Streaming成功连接到MySQL,会出现下图

表示其已在正常工作
开启Kafka producer
开启kafka producer,并且它会将事件写入kafka主题中。重新开启一个终端,前面的终端千万不要关闭。

cd /usr/local/spark/sparkcode/spark-homework/
java -Dconfig=./config/common.conf -jar producer/target/kafka-producer-0.1.jar

成功后如图所示

开启Web服务器
开启web服务器,如此可以观察dashboard。重新开启一个终端,前面的终端千万不要关闭。输入以下代码

cd /usr/local/spark/sparkcode/spark-homework
java -Dconfig=./config/common.conf -jar web/target/web-0.1.jar 

开启成功后如下图所示:

最后在浏览器中输入http://localhost:8080/进行浏览结果。三种颜色分别代表三个人,他们在不同时刻的平均心率。

上述步骤均未出现错误,但是最后开启Web后,在浏览器中不能观察到图像。
问题解析:在网页读取数据库内容时,是用Ubuntu系统的时间去与数据库中的时间进行配对,在工程代码中我们使用的是GMT+8,即国区时间,但是安装Ubuntu18.04时,系统自动使用美区时间,导致系统时间与数据库时间不匹配。
解决方案:更改系统时间

将time zone改为shanghai,然后刷新网页。