第13章的代码-林子雨编著《大数据基础编程、实验和案例教程(第3版)》教材

大数据学习路线图

林子雨编著《大数据基础编程、实验和案例教程(第3版)》(教材官网)教材中的命令行和代码,在纸质教材中的印刷效果不是很好,可能会影响读者对命令行和代码的理解,为了方便读者正确理解命令行和代码或者直接拷贝命令行和代码用于上机实验,这里提供全书配套的所有命令行和代码。
查看教材所有章节的代码

第13章 大数据课程综合实验案例

(温馨提示:代码框上方的复制代码按钮,也就是“两张A4纸图标”,用鼠标点击复制代码按钮,就可以把代码框中的代码复制到粘贴板,粘贴到其他地方。但是,有的浏览器可能不支持该功能)

教材第182页

cd /usr/local
sudo mkdir bigdatacase
#这里会提示你输入当前用户(本教程是hadoop用户名)的密码
#下面给hadoop用户赋予针对bigdatacase目录的各种操作权限
sudo chown -R hadoop:hadoop ./bigdatacase
cd bigdatacase
#下面创建一个dataset目录,用于保存数据集
mkdir dataset
#下面就可以解压缩user.zip文件
cd ~  //表示进入hadoop用户的目录
cd Downloads
unzip user.zip -d /usr/local/bigdatacase/dataset
cd /usr/local/bigdatacase/dataset
ls
head -5 raw_user.csv

教材第183页

cd /usr/local/bigdatacase/dataset
#下面删除raw_user中的第1行
sed -i '1d' raw_user.csv
#上面的1d表示删除第1行,同理,3d表示删除第3行,nd表示删除第n行
#下面删除small_user中的第1行
sed -i '1d' small_user.csv
#下面再用head命令去查看文件的前5行记录,就看不到字段名称这一行了
head -5 raw_user.csv
head -5 small_user.csv
cd /usr/local/bigdatacase/dataset
vim pre_deal.sh
#!/bin/bash
#下面设置输入文件,把用户执行pre_deal.sh命令时提供的第一个参数作为输入文件名称
infile=$1
#下面设置输出文件,把用户执行pre_deal.sh命令时提供的第二个参数作为输出文件名称
outfile=$2
#注意,最后的$infile> $outfile必须跟在}’这两个字符的后面
awk -F "," 'BEGIN{
srand();
        id=0;
        Province[0]="山东";Province[1]="山西";Province[2]="河南";Province[3]="河北";Province[4]="陕西";Province[5]="内蒙古";Province[6]="上海市";
        Province[7]="北京市";Province[8]="重庆市";Province[9]="天津市";Province[10]="福建";Province[11]="广东";Province[12]="广西";Province[13]="云南"; 
        Province[14]="浙江";Province[15]="贵州";Province[16]="新疆";Province[17]="西藏";Province[18]="江西";Province[19]="湖南";Province[20]="湖北";
        Province[21]="黑龙江";Province[22]="吉林";Province[23]="辽宁"; Province[24]="江苏";Province[25]="甘肃";Province[26]="青海";Province[27]="四川";
        Province[28]="安徽"; Province[29]="宁夏";Province[30]="海南";Province[31]="香港";Province[32]="澳门";Province[33]="台湾";
    }
    {
        id=id+1;
        value=int(rand()*34);       
        print id"\t"$1"\t"$2"\t"$3"\t"$5"\t"substr($6,1,10)"\t"Province[value]
    }' $infile> $outfile

教材第185页

cd /usr/local/bigdatacase/dataset
bash ./pre_deal.sh small_user.csv user_table.txt
head -10 user_table.txt
cd /usr/local/hadoop
./sbin/start-dfs.sh
jps

教材第186页

cd /usr/local/hadoop
./bin/hdfs dfs -mkdir -p /bigdatacase/dataset
cd /usr/local/hadoop
./bin/hdfs dfs -put /usr/local/bigdatacase/dataset/user_table.txt /bigdatacase/dataset
cd /usr/local/hadoop
./bin/hdfs dfs -cat /bigdatacase/dataset/user_table.txt | head -10
service mysql start  #可以在Linux的任何目录下执行该命令
cd /usr/local/hive
./bin/hive   #启动Hive

教材第187页

hive> create database dblab;
hive> use dblab;

教材第208页

hive>  CREATE EXTERNAL TABLE dblab.bigdata_user(id INT,uid STRING,item_id STRING,behavior_type INT,item_category STRING,visit_date DATE,province STRING) COMMENT 'Welcome to xmudblab!' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '/bigdatacase/dataset';
hive> use dblab; //使用dblab数据库
hive> show tables; //显示数据库中所有表
hive> show create table bigdata_user; //查看bigdata_user表的各种属性;
hive> desc bigdata_user;
hive> select * from bigdata_user limit 10;
hive> select behavior_type from bigdata_user limit 10;

教材第188页

hive> select behavior_type from bigdata_user limit 10; #查看前10位用户对商品的行为
hive> select visit_date, item_category from bigdata_user limit 20;
hive> select e.bh, e.it from (select behavior_type as bh, item_category as it from bigdata_user) as e  limit 20;
hive> select count(*) from bigdata_user;
hive> select count(distinct uid) from bigdata_user;

教材第189页

hive>select count(*) from (select uid,item_id,behavior_type,item_category,visit_date,province from bigdata_user group by uid,item_id,behavior_type,item_category,visit_date,province       having count(*)=1)a;
hive> select count(*) from bigdata_user where behavior_type='1' and visit_date<'2014-12-13' and visit_date>'2014-12-10';
hive> select count(distinct uid), day(visit_date) from bigdata_user where behavior_type='4' group by day(visit_date);

教材第190页

hive> select count(*) from bigdata_user where province='江西' and visit_date='2014-12-12' and behavior_type='4';
hive> select count(*) from bigdata_user where visit_date='2014-12-11'and behavior_type='4';#查询有多少用户在2014-12-11购买了商品
hive> select count(*) from bigdata_user where visit_date ='2014-12-11';#查询有多少用户在2014-12-11点击了该店
hive> select count(*) from bigdata_user where uid=10001082 and visit_date='2014-12-12';#查询用户10001082在2014-12-12点击网站的次数
hive> select count(*) from bigdata_user where visit_date='2014-12-12';#查询所有用户在这一天点击该网站的次数

教材第191页

hive> select uid from bigdata_user where behavior_type='4' and visit_date='2014-12-12' group by uid having count(behavior_type='4')>5;#查询某一天在该网站购买商品超过5次的用户id
hive> create table scan(province STRING,scan INT) COMMENT 'This is the search of bigdataday' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;#创建新的数据表进行存储
hive> insert overwrite table scan select province,count(behavior_type) from bigdata_user where behavior_type='1' group by province;#导入数据
hive> select * from scan;#显示结果
hive> create table dblab.user_action(id STRING,uid STRING, item_id STRING, behavior_type STRING, item_category STRING, visit_date DATE, province STRING) COMMENT 'Welcome to XMU dblab! ' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;

教材第192页

cd /usr/local/hadoop
./bin/hdfs dfs -ls /user/hive/warehouse/dblab.db/
hive> INSERT OVERWRITE TABLE dblab.user_action select * from dblab.bigdata_user;
hive> select * from user_action limit 10;

教材第193页

mysql -u root -p

教材第218页

mysql> show databases; #显示所有数据库
mysql> create database dblab; #创建dblab数据库
mysql> use dblab; #使用数据库
mysql>show variables like "char%";

教材第194页

mysql> CREATE TABLE dblab.user_action (id varchar(50),uid varchar(50),item_id varchar(50),behavior_type varchar(10),item_category varchar(50), visit_date DATE,province varchar(20)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mysql> exit;

        hadoop.proxyuser.hadoop.hosts
        *


        hadoop.proxyuser.hadoop.groups
        *
cd /usr/local/hive
./bin/hive --service hiveserver2 -hiveconf hive.server2.thrift.port=10000

教材第195页

sudo netstat -anp|grep 10000

```java
import java.sql.*;
import java.sql.SQLException;

public class HivetoMySQL {
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
private static String driverName_mysql = "com.mysql.jdbc.Driver";
public static void main(String[] args) throws SQLException {
try {
Class.forName(driverName);
}catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.exit(1);
}
Connection con1 = DriverManager.getConnection("jdbc:hive2://localhost:10000/default", "hive", "hive");//后两个参数是用户名密码

    if(con1 == null)
        System.out.println("连接失败");
    else {
        Statement stmt = con1.createStatement();
        String sql = "select * from dblab.user_action";
        System.out.println("Running: " + sql);
        ResultSet res = stmt.executeQuery(sql);

        //InsertToMysql
        try {
            Class.forName(driverName_mysql);
            Connection con2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/dblab?useUnicode=true&characterEncoding=utf8&useSSL=false","root","123456");
            String sql2 = "insert into user_action(id,uid,item_id,behavior_type,item_category,visit_date,province) values (?,?,?,?,?,?,?)";
            PreparedStatement ps = con2.prepareStatement(sql2);
            while (res.next()) {
                ps.setString(1,res.getString(1));
                ps.setString(2,res.getString(2));
                ps.setString(3,res.getString(3));
                ps.setString(4,res.getString(4));
                ps.setString(5,res.getString(5));
                ps.setDate(6,res.getDate(6));
                ps.setString(7,res.getString(7));
                ps.executeUpdate();
            }
            ps.close();
            con2.close();
            res.close();
            stmt.close();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    con1.close();
}

}

### 教材第197页
```bash
mysql -u root -p
mysql> use dblab;
mysql> select * from user_action limit 10;
cd /usr/local/hadoop
./sbin/start-all.sh
cd /usr/local/hbase
./bin/start-hbase.sh

教材第198页

cd /usr/local/bigdatacase/dataset
/usr/local/hadoop/bin/hdfs dfs -get /user/hive/warehouse/dblab.db/user_action .
 #将HDFS上的user_action数据复制到本地当前目录,注意'.'表示当前目录
cat ./user_action/* | head -10   #查看前10行数据
cat ./user_action/00000* > user_action.output #将00000*文件复制一份重命名为user_action.output,*表示通配符
head user_action.output  #查看user_action.output前10行
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List; 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes; 
public class ImportHBase extends Thread {
    public Configuration config;
    public Connection conn;
    public Table table;
    public Admin admin;
    public ImportHBase() {
        config = HBaseConfiguration.create();
//      config.set("hbase.master", "master:60000");
//      config.set("hbase.zookeeper.quorum", "master");
        try {
            conn = ConnectionFactory.createConnection(config);
            admin = conn.getAdmin();
            table = conn.getTable(TableName.valueOf("user_action"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    } 
    public static void main(String[] args) throws Exception {
        if (args.length == 0) {       //第一个参数是该jar所使用的类,第二个参数是数据集所存放的路径
            throw new Exception("You must set input path!");
        }
        String fileName = args[args.length-1];  //输入的文件路径是最后一个参数
        ImportHBase test = new ImportHBase();
        test.importLocalFileToHBase(fileName);
    }
    public void importLocalFileToHBase(String fileName) {
        long st = System.currentTimeMillis();
        BufferedReader br = null;
        try {
            br = new BufferedReader(new InputStreamReader(new FileInputStream(
                    fileName)));
            String line = null;
            int count = 0;
            while ((line = br.readLine()) != null) {
                count++;
                put(line);
                if (count % 10000 == 0)
                    System.out.println(count);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally { 
            if (br != null) {
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            try {
                table.close(); // must close the client
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        long en2 = System.currentTimeMillis();
        System.out.println("Total Time: " + (en2 - st) + " ms");
    }
    @SuppressWarnings("deprecation")
    public void put(String line) throws IOException {
        String[] arr = line.split("\t", -1);
        String[] column = {"id","uid","item_id","behavior_type","item_category","date","province"};

        if (arr.length == 7) {
            Put put = new Put(Bytes.toBytes(arr[0]));// rowkey
            for(int i=1;i<arr.length;i++){
                put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes(column[i]),Bytes.toBytes(arr[i]));
            }
            table.put(put); // put to server
        }
    }
    public void get(String rowkey, String columnFamily, String column,
            int versions) throws IOException {
        long st = System.currentTimeMillis();
        Get get = new Get(Bytes.toBytes(rowkey));
        get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        Scan scanner = new Scan(get);
        scanner.readVersions(versions);
        ResultScanner rsScanner = table.getScanner(scanner);
        for (Result result : rsScanner) {
            final List<Cell> list = result.listCells();
            for (final Cell kv : list) {
                System.out.println(Bytes.toStringBinary(kv.getValueArray()) + "\t"
                        + kv.getTimestamp()); // mid + time
            }
        }
        rsScanner.close();
        long en2 = System.currentTimeMillis();
        System.out.println("Total Time: " + (en2 - st) + " ms");
    }
}

教材第200页

hbase> truncate 'user_action'
hbase> create 'user_action',{NAME=>'f1',VERSIONS=>5}
$ cd /usr/local/bigdatacase/hbase
$ /usr/local/hadoop/bin/hadoop jar /usr/local/bigdatacase/hbase/ImportHBase.jar ImportHBase /usr/local/bigdatacase/dataset/user_action.output

教材第201页

habse> scan 'user_action',{LIMIT=>10}  #只查询前面10行

教材第202页

cd ~/anaconda3/bin
pip install matplotlib
pip install pandas
pip install pymysql
pip install seaborn
~/anaconda3/lib/python3.7/site-packages/matplotlib/mpl-data/fonts/ttf
font.serif          : DejaVu Serif, Bitstream Vera Serif, Computer Modern Roman, New Century Schoolbook, Century Schoolbook L, Utopia, ITC Bookman, Bookman, Nimbus Roman No9 L, Times New Roman, Times, Palatino, Charter, serif, simhei
rm -rf ~/.cache/matplotlib
rm -rf ~/.matplotlib

教材第203页

plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签SimHei
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
cd ~
vim visualization.py
import pandas as pd
import pymysql
import matplotlib.pyplot as plt
# 建立数据库连接
db_connection = pymysql.connect(
    host="localhost",  # 修改为你的数据库主机地址
    user="root", # 修改为你的数据库用户名
    password="123456", # 修改为你的数据库密码
    database="dblab" # 读取步骤三中已经存入的数据库
)
# 从数据库读取数据
query = "SELECT * FROM user_action;" # 定义一个查询,查询user_action表
data = pd.read_sql_query(query, db_connection) # 执行查询并加载结果至data
# 关闭数据库连接
db_connection.close()
 cd ~
python3 visualization.py

教材第204页

# 将behavior_type字段转换为数值型
behavior_mapping = {'1': '浏览', '2': '收藏', '3': '加购物车', '4': '购买'}
data['behavior_type'] = data['behavior_type'].map(behavior_mapping)

# 统计不同行为类型的数量
behavior_counts = data['behavior_type'].value_counts()

# 创建一个饼图来显示行为类型比例
plt.figure(figsize=(8, 6))
# 设置中文字体为黑体,解决负号显示问题
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 绘制饼图
plt.pie(behavior_counts, labels=behavior_counts.index, autopct='%1.1f%%', startangle=140)
plt.title('用户行为类型分布')
plt.savefig('商品行为分析.png',dpi=500)
cd ~
python3 visualization.py

教材第205页

import matplotlib.dates as mdates
# 将日期列转换为日期类型
data['visit_date'] = pd.to_datetime(data['visit_date'])

# 按照日期和行为类型分组,计算数量
daily_behavior_counts = data.groupby(['visit_date', 'behavior_type']).size().unstack()

# 绘制每天不同行为类型的数量变化趋势
plt.figure(figsize=(10, 6))

# 绘制图表
daily_behavior_counts.plot()
ax = plt.gca()  # 表明设置图片的各个轴
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))  # 设置横坐标标签日期格式
plt.title('每日用户行为趋势图')
plt.xlabel('日期')
plt.ylabel('行为数量')
plt.legend(title='行为')
plt.savefig('消费者行为趋势分析.png',dpi=500)

教材第206页

# 按照地区和行为类型分组,计算数量
region_behavior_counts = data.groupby(['province', 'behavior_type']).size().unstack()

# 分为两个子图,一张显示浏览行为,一张显示其他行为
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 12))
region_behavior_counts['浏览'].plot(kind='bar', stacked=True, ax=ax1)
region_behavior_counts.drop(columns='浏览').plot(kind='bar', stacked=True, ax=ax2)

# 设置子图标题、横纵坐标标签等
ax1.set_title('不同地区用户浏览行为分布')
ax1.set_xlabel('地区')
ax1.set_ylabel('数量')
ax1.legend(title='行为类型')

ax2.set_title('不同地区用户其他行为分布')
ax2.set_xlabel('地区')
ax2.set_ylabel('数量')
ax2.legend(title='行为类型')

plt.tight_layout()  # 确保子图布局紧凑
plt.savefig('地区用户行为分布.png', dpi=500)

教材第208页

# 筛选出购买行为的数据
purchased_data = data[data["behavior_type"] == '购买']
# 统计每个商品的购买数量
top_10_items = purchased_data["item_id"].value_counts().head(10)

# 可视化销量前10的商品
plt.figure(figsize=(10, 6))
top_10_items.plot(kind="bar")
plt.title("销量前10的商品")
plt.xlabel("商品ID")
plt.ylabel("销量")
plt.xticks(rotation=45)
plt.savefig('销量前10的商品.png', dpi=500)

教材第209页

# 筛选出购买行为的数据
purchased_data = data[data["behavior_type"] == '购买']
# 按地区和商品ID分组,计算销量
region_item_sales=purchased_data.groupby(['province','item_id']).size().reset_index(name='sales')
# 筛选销量前十的商品
top_10_items_sales = region_item_sales[region_item_sales['item_id'].isin(top_10_items.index)]
# 绘制地区与销量前十商品的关联分析
plt.figure(figsize=(12, 6))
plt.scatter(top_10_items_sales['province'],top_10_items_sales['item_id'], s=top_10_items_sales['sales']*5, alpha=0.5)
plt.title('地区与销量前十商品关联分析')
plt.xlabel('地区')
plt.ylabel('商品ID')
plt.xticks(rotation=45)
plt.savefig('地区与商品销量关联分析.png', dpi=500)

教材第210页

import seaborn as sns

# 筛选出购买行为的数据
purchased_data = data[data["behavior_type"] == '购买']

# 按地区和商品ID分组,计算销量
region_item_sales=purchased_data.groupby(['province','item_id']).size().reset_index(name='sales')

# 筛选销量前十的商品
top_10_items_sales = region_item_sales[region_item_sales['item_id'].isin(top_10_items.index)]

# 构建透视表以便绘制热力图
pivot_table = top_10_items_sales.pivot_table(index='province', columns='item_id', values='sales', aggfunc='sum', fill_value=0)

# 绘制地区与销量前十商品的关联分析热力图
plt.figure(figsize=(12, 8))
sns.heatmap(pivot_table, cmap='YlGnBu', annot=True, fmt='d', linewidths=.5)
plt.title('地区与销量前十商品关联分析热力图')
plt.xlabel('商品ID')
plt.ylabel('地区')
plt.savefig('地区与商品销量关联热力图.png', dpi=500)

教材第211页

# 计算每天的平均活跃用户数量
daily_active_users = data.groupby('visit_date')['uid'].nunique()

# 绘制每天的平均活跃用户数量趋势
plt.figure(figsize=(10, 6))
daily_active_users.plot()
ax = plt.gca()  # 表明设置图片的各个轴,plt.gcf()表示图片本身
ax.xaxis.set_major_formatter(mdate.DateFormatter('%Y-%m-%d'))  # 横坐标标签显示的日期格式
plt.title('活跃用户数量趋势')
plt.xlabel('日期')
plt.ylabel('活跃用户数量')
plt.xticks(rotation=45)
plt.savefig('活跃用户数量趋势.png', dpi=500)