基于Python和Flink的人体肥胖数据分析

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学计算机科学与技术系2023级研究生 许雅萍
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
时间:2024年6月
相关教材:林子雨编著《Flink编程基础(Java版)》(访问教材官网
相关案例:Flink大数据处理分析案例集锦
本案例采用数据集obesity_level.csv来源于kaggle网站,该Kaggle数据集提供了关于个人的全面信息。使用pandas进行数据清洗,保存到分布式文件系统HDFS中,接下来使用Python语言编写Flink程序进行数据分析,最后,采用python+plotly+matplotlib进行可视化。
数据集和代码下载:从百度网盘下载本案例的代码和数据集。(提取码是ziyu)

一、实验环境

(1)Linux:Ubuntu 20.04.1
(2)Python:3.10
(3)Hadoop:3.3.5
(4)Flink:1.17.0
(5)JDK : 1.8.0_402
(6)开发工具:Visual Studio Code
(7)Python package:pandas,pyflink,numpy,plotly,matplotlib

二、数据集

1、数据集说明

本次实验使用的数据集obesity_level.csv来源于kaggle 网站,下载网址为链接。
该 Kaggle 数据集提供了关于个人的全面信息,包括性别、年龄、身高、体重、超重家族史、饮食习惯、体力活动、交通方式以及相应的肥胖水平等关键属性。数据集为csv格式,共包含20758项数据,每项数据包含以下字段:

2、数据集清洗,预处理

(1)冗余去重
去除重复的行信息

duplicate_rows = data.duplicated().sum()
data.drop_duplicates(inplace=True)


不存在重复行,完成冗余行检验
(2)去除含有缺失信息的行
统计是否存在含有缺失值NaN的列,如果存在,将选择丢弃含有缺失值的行

# 获取含有NAN的列
nan_col1 = data.isnull().any()
data = data.dropna()


(3)数据格式转换
将关于年龄的信息列由浮点型转成整数类型,更符合现实世界的关于年龄的定义

data[['Age']] = data[['Age']].astype(int)

(4)统一数据格式
将CAEC(两餐之间的食品消费)该列的数据格式转化一致 即数值型 0 替换为 字符串型Never,和其余值[Sometimes, Frequently, Always]组成语义一致的信息

data['CAEC']=data['CAEC'].replace('0','Never')


类似的,将CALC(酒精消耗量)该列的数据格式转化一致 即 数值型0 替换为 字符串型Never,和其余值[Sometimes, Frequently]组成语义一致的信息

data['CALC']=data['CALC'].replace('0','Never')


(5)异常值过滤
去除每日食用主食数量(NCP)小于等于0 和每日耗水量(CH2O)小于等于0的含有异常不合理数值的行信息

row_indexs = data[(data["NCP"]<=0)| (data["CH2O"]<=0)].index
data.drop(row_indexs,inplace=True)

输出显示,不存在含有异常值的行

3、将数据集存放至分布式文件系统HDFS中

(1)启动HDFS

./sbin/start-dfs.sh

(2)在当前用户下新建一个目录data ,用于存放数据

./bin/hdfs dfs -mkdir -p /user/greench/data

(3)上传预处理后的数据集到HDFS 的data 目录下

./bin/hdfs dfs -put /home/greench/Desktop/big_data/final/input/
preprocessed_obesity_level.csv /user/greench/data

查看data目录下是否有上传的数据

./bin/hdfs dfs -ls /user/greench/data

三、使用Flink对数据集处理分析

1.数据从HDFS中加载

# define the field names and types
field_names = ["id", "Gender", "Age","Height","Weight","family_history_with_overweight",
                "FAVC","FCVC","NCP","CAEC","SMOKE","CH2O",
                "FAF","TUE","CALC","MTRANS","Obesity"]
field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BIGINT(),DataTypes.DOUBLE(),DataTypes.DOUBLE(),DataTypes.BIGINT(),
                DataTypes.BIGINT(),DataTypes.DOUBLE(),DataTypes.DOUBLE(),DataTypes.STRING(),DataTypes.BIGINT(),DataTypes.DOUBLE(),
                DataTypes.DOUBLE(),DataTypes.DOUBLE(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()]

# create a TableSource
input_path = "hdfs://localhost:9000/user/greench/data/preprocessed_obesity_level.csv"

2、进行数据分析

首先创建Table 环境,将HDFS的csv文件,根据定义的每列信息的列名字和数据类型,生成一个对应的flink stream table ,在当前的Table 环境中注册该表命名为csvTable, 之后的数据分析都是基于该表做SQL语句查询。

# Create a TableEnvironment
flink_stream_env = StreamExecutionEnvironment.get_execution_environment()
flink_stream_settings = EnvironmentSettings.in_batch_mode()#EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
flink_stream_table_env = StreamTableEnvironment.create(flink_stream_env, environment_settings=flink_stream_settings)

# define the field names and types
field_names = ["id", "Gender", "Age","Height","Weight","family_history_with_overweight",
                "FAVC","FCVC","NCP","CAEC","SMOKE","CH2O",
                "FAF","TUE","CALC","MTRANS","Obesity"]
field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BIGINT(),DataTypes.DOUBLE(),DataTypes.DOUBLE(),DataTypes.BIGINT(),
                DataTypes.BIGINT(),DataTypes.DOUBLE(),DataTypes.DOUBLE(),DataTypes.STRING(),DataTypes.BIGINT(),DataTypes.DOUBLE(),
                DataTypes.DOUBLE(),DataTypes.DOUBLE(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()]
# create a TableSource
input_path = "hdfs://localhost:9000/user/greench/data/preprocessed_obesity_level.csv"
csv_source = CsvTableSource(input_path, field_names, field_types)
# Register a TableSource
flink_stream_table_env.register_table_source("csvTable", csv_source)
tab=flink_stream_table_env.from_path('csvTable')

(1)统计数据集中所有信息来源者的性别分布,年龄分布,身高分布,体重分布,以及Obesity degree分布情况。

# Q1: Gender distribution----------------------------
result = flink_stream_table_env.sql_query("select Gender as Gender,COUNT(*) as CNT from csvTable group by Gender")
output_path="/home/greench/Desktop/big_data/final/output/Q1_Gender_dist.csv"
flink_stream_table_env.create_temporary_table(
            'Q1_Gender',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Gender', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q1_Gender').wait()
## Q1: Age distribution distribution@@------------------------------
result = flink_stream_table_env.sql_query(
"select (CASE \
                                        WHEN Age between 0 and 20 then '[0,20]' \
                                        WHEN Age between 20 and 30 then '[20,30]' \
                                        WHEN Age between 30 and 40 then '[30,40]' \
                                        WHEN Age between 40 and 50 then '[40,50]' \
                                        WHEN Age between 50 and 50 then '[50,60]' \
                                        ELSE '>60' \
                                    END) as Age_range, \
                                    count(*) as CNT \
                                from csvTable \
                                group by (CASE \
                                        WHEN Age between 0 and 20 then '[0,20]' \
                                        WHEN Age between 20 and 30 then '[20,30]' \
                                        WHEN Age between 30 and 40 then '[30,40]' \
                                        WHEN Age between 40 and 50 then '[40,50]' \
                                        WHEN Age between 50 and 50 then '[50,60]' \
                                        ELSE '>60' \
                                    END)\
                                order by Age_range desc")
output_path="/home/greench/Desktop/big_data/final/output/Q1_Age_dist.csv"
flink_stream_table_env.create_temporary_table(
            'Q1_Age',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Age_Range', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q1_Age').wait()
# Q1: Obesity degree distribution distribution
result = flink_stream_table_env.sql_query("select Obesity as Obesity ,COUNT(*) as CNT from csvTable group by Obesity")
output_path="/home/greench/Desktop/big_data/final/output/Q1_Obesity_dist.csv"
flink_stream_table_env.create_temporary_table(
            'Q1_Obesity',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Obesity', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q1_Obesity').wait()
## Q1: Height distribution distribution of female and male@@------------------------------
result = flink_stream_table_env.sql_query(
"select (CASE \
                                        WHEN Height between 1.4 and 1.5 then '[1.40m,1.50m]' \
                                        WHEN Height between 1.5 and 1.6 then '[1.50m,1.60m]' \
                                        WHEN Height between 1.6 and 1.7 then '[1.60m,1.70m]' \
                                        WHEN Height between 1.7 and 1.8 then '[1.70m,1.80m]' \
                                        WHEN Height between 1.8 and 1.9 then '[1.80m,1.90m]' \
                                        WHEN Height between 1.9 and 2.0 then '[1.90m,2.00m]' \
                                    END) as Height_range, \
                                    count(*) as CNT \
                                from csvTable \
                                group by (CASE \
                                        WHEN Height between 1.4 and 1.5 then '[1.40m,1.50m]' \
                                        WHEN Height between 1.5 and 1.6 then '[1.50m,1.60m]' \
                                        WHEN Height between 1.6 and 1.7 then '[1.60m,1.70m]' \
                                        WHEN Height between 1.7 and 1.8 then '[1.70m,1.80m]' \
                                        WHEN Height between 1.8 and 1.9 then '[1.80m,1.90m]' \
                                        WHEN Height between 1.9 and 2.0 then '[1.90m,2.00m]' \
                                    END)\
                                order by Height_range asc")
output_path="/home/greench/Desktop/big_data/final/output/Q1_Height_dist.csv"
flink_stream_table_env.create_temporary_table(
            'Q1_Height',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Height_Range', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q1_Height').wait()
## Q1: Weight distribution distribution of female and male@@------------------------------
result = flink_stream_table_env.sql_query(
"select (CASE \
                                        WHEN Weight between 35 and 55 then '[35,55]' \
                                        WHEN Weight between 55 and 75 then '[55,75]' \
                                        WHEN Weight between 75 and 95 then '[75,95]' \
                                        WHEN Weight between 95 and 115 then '[95,115]' \
                                        WHEN Weight between 115 and 135 then '[115,135]' \
                                        WHEN Weight between 135 and 155 then '[135,155]' \
                                        WHEN Weight between 155 and 175 then '[155,175]' \
                                    END) as Weight_range, \
                                    count(*) as CNT \
                                from csvTable \
                                group by (CASE \
                                        WHEN Weight between 35 and 55 then '[35,55]' \
                                        WHEN Weight between 55 and 75 then '[55,75]' \
                                        WHEN Weight between 75 and 95 then '[75,95]' \
                                        WHEN Weight between 95 and 115 then '[95,115]' \
                                        WHEN Weight between 115 and 135 then '[115,135]' \
                                        WHEN Weight between 135 and 155 then '[135,155]' \
                                        WHEN Weight between 155 and 175 then '[155,175]' \
                                    END)\
                                order by Weight_range asc")
output_path="/home/greench/Desktop/big_data/final/output/Q1_Weight_dist.csv"
flink_stream_table_env.create_temporary_table(
            'Q1_Weight',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Weight_Range', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q1_Weight').wait()

(2)统计不同肥胖程度人群的平均每日耗水量,平均每周运动频率,平均电子设备使用时间,平均食用蔬菜频次

## Q2-1 肥胖的原因分析之 CH2O(每日耗水量))平均值 @@-------------------------------------
result = flink_stream_table_env.sql_query(
"select Obesity, round(avg(CH2O),5) as CH2O_average FROM csvTable group by Obesity order by CH2O_average desc")
output_path="/home/greench/Desktop/big_data/final/output/Q2_CH2O_avg.csv"
flink_stream_table_env.create_temporary_table(
            'Q2_CH2O',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Obesity', DataTypes.STRING())
                        .column('CH2O_average', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q2_CH2O').wait()

## Q2-1 肥胖的原因分析之 FAF(运动频率)平均值 @@-------------------------------------
result = flink_stream_table_env.sql_query(
"select Obesity, round(avg(FAF),5) as FAF_average FROM csvTable group by Obesity order by FAF_average desc")
output_path="/home/greench/Desktop/big_data/final/output/Q2_FAF_avg.csv"
flink_stream_table_env.create_temporary_table(
            'Q2_FAF',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Obesity', DataTypes.STRING())
                        .column('FAF_average', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q2_FAF').wait()
## Q2-1 肥胖的原因分析之 TUE(使用电子设备的时间)平均值 @@-------------------------------------
result = flink_stream_table_env.sql_query(
"select Obesity, round(avg(TUE),5) as TUE_average FROM csvTable group by Obesity order by TUE_average desc")
output_path="/home/greench/Desktop/big_data/final/output/Q2_TUE_avg.csv"
flink_stream_table_env.create_temporary_table(
            'Q2_TUE',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Obesity', DataTypes.STRING())
                        .column('TUE_average', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q2_TUE').wait()
## Q2-1 肥胖的原因分析之 FCVC(食用蔬菜频率)平均值 @@-------------------------------------
result = flink_stream_table_env.sql_query(
"select Obesity, round(avg(FCVC),5) as average FROM csvTable group by Obesity order by average desc")
output_path="/home/greench/Desktop/big_data/final/output/Q2_FCVC_avg.csv"
flink_stream_table_env.create_temporary_table(
            'Q2_FCVC',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Obesity', DataTypes.STRING())
                        .column('average', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q2_FCVC').wait()

(3)统计各种肥胖类型中,有家族肥胖历史和没有家族肥胖历史人群的占比情况。

### Q3-3 肥胖的原因分析之 有家族遗传 中所有肥胖类型的占比情况 @@--------------------
result = flink_stream_table_env.sql_query(
"select Obesity, COUNT(*) as CNT,  round(1.0*count(*)/SUM(COUNT(*)) OVER(),3) as percentage\
 from csvTable\
 where family_history_with_overweight= 1\
 group by Obesity\
 order by percentage desc")
output_path="/home/greench/Desktop/big_data/final/output/Q3_HasFamily.csv"
flink_stream_table_env.create_temporary_table(
            'Q33_hasFamily',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        # .column('Obesity', DataTypes.STRING())
                        .column('Obesity', DataTypes.STRING())
                        # .column('family_history_with_overweight', DataTypes.BIGINT())
                        .column('CNT', DataTypes.BIGINT())
                        .column('percentage', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q33_hasFamily').wait()
### Q3-2 肥胖的原因分析之 没有家族遗传中 所有肥胖类型的占比情况 @@-------------------------
result = flink_stream_table_env.sql_query(
"select Obesity, COUNT(*) as CNT,  round(1.0*count(*)/SUM(COUNT(*)) OVER(),3) as percentage\
 from csvTable\
 where family_history_with_overweight= 0\
 group by Obesity\
 order by percentage desc")
output_path="/home/greench/Desktop/big_data/final/output/NoFamily.csv"
flink_stream_table_env.create_temporary_table(
            'Q32_noFamily',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        # .column('Obesity', DataTypes.STRING())
                        .column('Obesity', DataTypes.STRING())
                        # .column('family_history_with_overweight', DataTypes.BIGINT())
                        .column('CNT', DataTypes.BIGINT())
                        .column('percentage', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q32_noFamily').wait()
### Q3-1 肥胖的原因分析之 家族遗传 @@-------------------------------------
result = flink_stream_table_env.sql_query(
"select Obesity,family_history_with_overweight,COUNT(Obesity) as CNT from csvTable group by Obesity, family_history_with_overweight order by Obesity, family_history_with_overweight desc")
output_path="/home/greench/Desktop/big_data/final/output/Q3_Family.csv"
flink_stream_table_env.create_temporary_table(
            'Q31_Family',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        # .column('Obesity', DataTypes.STRING())
                        .column('Obesity', DataTypes.STRING())
                        .column('family_history_with_overweight', DataTypes.BIGINT())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q31_Family').wait()

(4)统计(CAEC)不同两餐之间的食品消费频率的人群中不同肥胖类型的数量情况

### Q4 肥胖的原因分析之 FAVC(是否频繁食用高热量食物)-----------------------------------------
result = flink_stream_table_env.sql_query(
"select FAVC, Obesity, COUNT(Obesity) as CNT \
from csvTable \
group by FAVC, Obesity \
order by FAVC, Obesity desc")
output_path="/home/greench/Desktop/big_data/final/output/Q4_FAVC.csv"
flink_stream_table_env.create_temporary_table(
            'Q4_FAVC',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('FAVC', DataTypes.BIGINT())
                        .column('Obesity', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q4_FAVC').wait()

四、实验结果可视化

采用python + plotly +matplotlib.pyplot进行可视化,绘制相应的图
(1)数据集中所有信息来源者的性别分布如下图所示,结果显示男女比例几乎为1:1,男女比例均衡。

数据集中所有信息来源者的年龄分布如下图所示,发现信息来源者的年龄多集中于小于三十岁这个年龄区间,信息来源者大多是青年人。

数据集中所有信息来源者的肥胖程度分布如下图所示,发现各种肥胖水平的人群数量分布比较均衡,肥胖水平涉及从体重不足,正常体重,到肥胖的多种等级,再到超重的多种等级,肥胖类型也比较多和全面。

数据集中所有信息来源者的身高分布如下图所示,处在1.60-1.80米的人群是最多的,这一结果是比较符合现实中的人群身高分布规律的。

数据集中所有信息来源者的体重分布如下图所示,信息收集者遍布多种体重范围的人群,处在体重过于轻和体重过于重的人群是比较少的,处在中间体重范围的人群比较多,符合现实的体重规律。

(2)统计不同肥胖程度人群的平均每日耗水量,如下图所示,平均耗水量前三的都是属于不同程度肥胖的人群,体重不足和正常体重的人群是平均耗水量最少的,推测饮水量高可能和造成肥胖有关系。

统计不同肥胖程度人群的平均每周运动频率,如下图所示,平均每周运动频率较高的前两名是体重不足和正常体重的人群,推测缺少运动不爱运动会是造成肥胖的因素之一。

统计不同肥胖程度人群的平均电子设备使用时间,如图所示,可知使用电子设备平均时间最久的是体重不足的人群,属于超重级别一和超重级别二的人群的使用电子设备平均时间在所有肥胖类型中处于中间的排名位置,使用电子设备时间长短可能不是影响肥胖的关键因素。

统计不同肥胖程度人群的平均食用蔬菜频次,如下图所示,体重过重的两种类型人群的平均食用蔬菜频次是所有肥胖类型中排名倒数三名的,从平均值指标来看,体重过重的人群相对于正常体重的人群来说,吃蔬菜频率偏低。

(3)统计有家族肥胖历史的人群中,不同肥胖类型人群的占比情况,如下图所示,发现有家族肥胖历史的人群中,体重不足和正常体重的人数是相对偏少的。

而统计没有家族肥胖历史的人群中,不同肥胖类型人群的分布情况,发现体重不足和正常体重的人群是最多。

综合如上两个图,有家族肥胖历史的人群中,体重不足和正常体重的人数是相对偏少的,没有家族肥胖历史的人群中,发体重不足和正常体重的人群是最多。说明有家族肥胖历史的人可能会比没有家族肥胖历史的人群更容易肥胖。
如下图所示,是统计各种肥胖类型中,有家族肥胖历史和没有家族肥胖历史人群的分布情况,发现处在肥胖类型(Obesity 或者 Overweight)的人群中有家族肥胖历史的人占大多数。

(4)统计(FAVC)不同食用高热量食物频率的人群中不同肥胖类型的数量情况,如图所示,大多数人是具有经常食用高热量食物的饮食习惯。尤其是Obesity类型的人群,经常食用高热量食物的比例远高于很少食用高热量食物的比例。

所有可视化代码:

import pandas as pd
import plotly.express as px
import matplotlib
import matplotlib.pyplot as plt
import numpy as np

### Q1 draw the Gender distribution -------------------------------
GenderNum = pd.read_csv("input/Q1_Gender_dist.csv", header=None)
GenderNum.columns = ["Gender", "Num"]
 
 
fig3 = px.pie(GenderNum,
              values = "Num",
              names = "Gender", 
              title = "收集人群的性别分布情况"
             )
fig3.show()
# ### draw the Obesity degree distribution -------------------------------
ObesityNum = pd.read_csv("input/Q1_Obesity_dist.csv", header=None)
ObesityNum.columns = ["Obesity", "Num"]
 
 
fig4 = px.pie(ObesityNum,
              values = "Num",
              names = "Obesity", 
              title = "收集人群的obesity_level分布情况"
             )
fig4.show()
### Q1 draw the age distribution -----------------------------
peopleNum = pd.read_csv("input/Q1_Age_dist.csv", header=None)
peopleNum.columns = ["Age_range", "Num"]
 
fig1 = px.bar(peopleNum,
              x = "Age_range",
              y = "Num", 
              title = "收集人群的年龄分布情况",
              color='Num'
             )
fig1.show()
 
fig2 = px.pie(peopleNum,
              values = "Num",
              names = "Age_range", 
              title = "收集人群的年龄分布情况"
             )
fig2.show()
### Q1 draw the Obesity degree distribution -------------------------------
ObesityNum = pd.read_csv("input/Q1_Obesity_dist.csv", header=None)
ObesityNum.columns = ["Obesity", "Num"]
 
 
fig4 = px.pie(ObesityNum,
              values = "Num",
              names = "Obesity", 
              title = "收集人群的obesity_level分布情况"
             )
fig4.show()
### Q1 draw the Height distribution -------------------------------
HeightNum = pd.read_csv("input/Q1_Height_dist.csv", header=None)
HeightNum.columns = ["Height", "Num"]
 
 
fig5 = px.pie(HeightNum,
              values = "Num",
              names = "Height", 
              title = "收集人群的Height(M)分布情况"
             )
fig5.show()
### Q1 draw the Weight distribution -------------------------------
WeightNum = pd.read_csv("input/Q1_Weight_dist.csv", header=None)
WeightNum.columns = ["Weight", "Num"]
 
 
fig6 = px.pie(WeightNum,
              values = "Num",
              names = "Weight", 
              title = "收集人群的Weight(Kg)分布情况"
             )
fig6.show()
### Q2:draw the CH2O_avg -----------------------------
CH2O_AVG = pd.read_csv("input/Q2_CH2O_avg.csv", header=None)
CH2O_AVG.columns = ["Obesity_degree", "Avg"]
 
fig_Q2_CH2O = px.bar(CH2O_AVG,
              x = "Obesity_degree",
              y = "Avg", 
              title = "不同Obesity_degree人群的平均每日耗水量",
              color='Avg'
             )
fig_Q2_CH2O.show()
### Q2 draw the FAF_avg -----------------------------
FAF_AVG = pd.read_csv("input/Q2_FAF_avg.csv", header=None)
FAF_AVG.columns = ["Obesity_degree", "Avg"]
 
fig_Q2_FAF = px.bar(FAF_AVG,
              x = "Obesity_degree",
              y = "Avg", 
              title = "不同Obesity_degree人群的平均运动频率",
              color='Avg'
             )
fig_Q2_FAF.show()
### draw the TUE_avg -----------------------------
TUE_AVG = pd.read_csv("input/Q2_TUE_avg.csv", header=None)
TUE_AVG.columns = ["Obesity_degree", "Avg"]
 
fig_Q2_TUE = px.bar(TUE_AVG,
              x = "Obesity_degree",
              y = "Avg", 
              title = "不同Obesity_degree人群的平均使用电子设备的时间",
              color='Avg'
             )
fig_Q2_TUE.show()
### Q2 draw the FCVC_avg -----------------------------
FCVC_AVG = pd.read_csv("input/Q2_FCVC_avg.csv", header=None)
FCVC_AVG.columns = ["Obesity_degree", "Avg"]
 
fig_Q2_FCVC = px.bar(FCVC_AVG,
              x = "Obesity_degree",
              y = "Avg", 
              title = "不同Obesity_degree人群的平均食用蔬菜频次",
              color='Avg'
             )
fig_Q2_FCVC.show()
## Q3: draw the has family with obesity history the dist of different obesity degree -----------------------------
HasFamily_dist = pd.read_csv("input/Q3_HasFamily.csv", header=None)
HasFamily_dist.columns = ["Obesity_degree", "Num","Percantage"]
 
fig_Q3_HasFamily = px.pie(HasFamily_dist,
              names = "Obesity_degree",
              values = "Num", 
              title = "有家族肥胖历史的人群中,不同Obesity_degree人群的分布"
            #   color='Num'
             )
fig_Q3_HasFamily.show()
## Q3: draw the not has family with obesity history the dist of different obesity degree -----------------------------
NoFamily_dist = pd.read_csv("input/Q3_NoFamily.csv", header=None)
NoFamily_dist.columns = ["Obesity_degree", "Num","Percantage"]
 
fig_Q3_NoFamily = px.bar(NoFamily_dist,
              x = "Obesity_degree",
              y = "Num", 
              title = "没有家族肥胖历史的人群中,不同Obesity_degree人群的分布",
              color='Num'
             )
fig_Q3_NoFamily.show()
### Q3:统计各种肥胖类型中,有家族肥胖历史和没有家族肥胖历史人群的分布情况-------------------------
data = pd.read_csv("input/Q3_Family.csv", header=None)
data = np.array(data)
labels =['0rmal_Weight','Insufficient_Weight','Obesity_Type_I','Obesity_Type_II','Obesity_Type_III','Overweight_Level_I','Overweight_Level_II']
col2 = data[:, 2]
has_family=[]
no_family=[]
cnt=0
for i in col2:
    cnt=cnt+1
    if cnt%2==0:
        no_family.append(i)
    else:
        has_family.append(i)
x = np.arange(len(labels))  # 标签位置
width = 0.2  # 柱状图的宽度,可以根据自己的需求和审美来改
fig, ax = plt.subplots(figsize=(28,12))
rects1 = ax.bar(x - width/2, has_family, width, label='Has famliy obesity history')
rects2 = ax.bar(x + width/2, no_family, width,label='NO famliy obesity history')
# 为y轴、标题和x轴等添加一些文本。
ax.set_ylabel('People Num', fontsize=20)
ax.set_xlabel('Obesity Degree', fontsize=20)
ax.set_title('Comparison of people with and without family history of obesity among people with different types of obesity',size=20)
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.tick_params(labelsize=20)
plt.rcParams['font.size'] = 20
plt.xlabel('Obesity Degree',fontsize=25)
plt.ylabel('People Num',fontsize=25)
ax.legend()
def autolabel(rects):
    """在*rects*中的每个柱状条上方附加一个文本标签,显示其高度"""
    for rect in rects:
        height = rect.get_height()
        ax.annotate('{}'.format(height),
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3),  # 3点垂直偏移
                    textcoords="offset points",
                    ha='center', va='bottom')
autolabel(rects1)
autolabel(rects2)
# fig.savefig('family.png')
plt.show()
### Q4:统计(FAVC)不同食用高热量食物频率的人群中不同肥胖类型的数量情况----------------------------
data = pd.read_csv("input/Q4_FAVC.csv", header=None)
data = np.array(data)
labels={'No','Yes'}
cols =['Overweight_Level_II','Overweight_Level_I','Obesity_Type_III','Obesity_Type_II','Obesity_Type_I','Insufficient_Weight','0rmal_Weight']
col2 = data[:, 2]
y1,y2,y3,y4,y5,y6,y7=[],[],[],[],[],[],[]
ylist = [[] for i in range(7)] 
for i in range(0,7):
    ylist[i].append(col2[i])
    ylist[i].append(col2[i+7])
    print(ylist[i])
x = np.arange(len(labels))  # 标签位置
width = 0.1  # 柱状图的宽度,可以根据自己的需求和审美来改
fig, ax = plt.subplots(figsize=(28,12))
rects1 = ax.bar(x - width*3.5, ylist[0], width, label=cols[0])
rects2 = ax.bar(x - width*2.5, ylist[1], width,label=cols[1])
rects3 = ax.bar(x - width*1.5, ylist[2], width,label=cols[2])
rects4 = ax.bar(x - width*0.5, ylist[3], width,label=cols[3])
rects5 = ax.bar(x + width*0.5, ylist[4], width,label=cols[4])
rects6 = ax.bar(x + width*1.5, ylist[5], width,label=cols[5])
rects7 = ax.bar(x + width*2.5, ylist[6], width,label=cols[6])
ax.set_title('Relationship between food consumption frequency and obesity levels',size=20)
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.tick_params(labelsize=20)
plt.rcParams['font.size'] = 20
plt.xlabel('Whether Frequently consuming of high-caloric food',fontsize=25)
plt.ylabel('People Num',fontsize=25)
ax.legend()
def autolabel(rects):
    """在*rects*中的每个柱状条上方附加一个文本标签,显示其高度"""
    for rect in rects:
        height = rect.get_height()
        ax.annotate('{}'.format(height),
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3),  # 3点垂直偏移
                    textcoords="offset points",
                    ha='center', va='bottom')
autolabel(rects1)
autolabel(rects2)
autolabel(rects3)
autolabel(rects4)
autolabel(rects5)
autolabel(rects6)
autolabel(rects7)
# fig.savefig('FAVC.png')
plt.show()

附录:

1.Data_Cleaning.py数据清洗完整代码:

## 利用 pandas 进行数据预处理,包括选取某些字段,进行格式转换、去除空值和重复值等 
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
# 读取原始数据集---------------------------------------
input_path='input/obesity_level.csv'
data=pd.read_csv(input_path)
### 1.冗余去重-------------------------------------------
# 统计是否有重复行
duplicate_rows = data.duplicated().sum()
# 去除重复行
data.drop_duplicates(inplace=True)
# 取去除无用的列 
data=data.drop(['SCC'],axis=1)
### 2. 查看是否有缺失值------------------------------------
#  获取含有NAN的行
nan_col1 = data.isnull().any(1)
# 去除含有缺失值的行信息
data = data.dropna()
### 3. 数据格式转换------------------------------------
# 将关于年龄的信息列由浮点型转成整数类型
data[['Age']] = data[['Age']].astype(int)
### 4. 统一数据格式------------------------------------
# 4.1 将CAEC(两餐之间的食品消费)该列的数据格式转化一致 即 0 -> Never
data['CAEC']=data['CAEC'].replace('0','Never')
# 4.2 将CALC(酒精消耗量)该列的数据格式转化一致 即 0 -> Never
data['CALC']=data['CALC'].replace('0','Never')
### 5. 异常值过滤------------------------------------
# 获取含有异常值的行
row_indexs = data[(data["NCP"]<=0)| (data["CH2O"]<=0)].index
#去除含有异常值的行
data.drop(row_indexs,inplace=True)
### 预处理后的数据写入新的csv文件,等待上传到HDFS作进一步的处理分析--------------
print(data.info())
output_path='input/preprocessed_obesity_level.csv'
data.to_csv(output_path,index=False,header=None)

2.Obesity_Analyze.py flink数据处理分析代码:

## 成功运行 ,结果正确
import argparse
import logging
import sys

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, CsvTableSource, CsvTableSink
# Create a TableEnvironment
flink_stream_env = StreamExecutionEnvironment.get_execution_environment()
flink_stream_settings = EnvironmentSettings.in_batch_mode()#EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
flink_stream_table_env = StreamTableEnvironment.create(flink_stream_env, environment_settings=flink_stream_settings)
# define the field names and types
field_names = ["id", "Gender", "Age","Height","Weight","family_history_with_overweight",
                "FAVC","FCVC","NCP","CAEC","SMOKE","CH2O",
                "FAF","TUE","CALC","MTRANS","Obesity"]
field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BIGINT(),DataTypes.DOUBLE(),DataTypes.DOUBLE(),DataTypes.BIGINT(),
                DataTypes.BIGINT(),DataTypes.DOUBLE(),DataTypes.DOUBLE(),DataTypes.STRING(),DataTypes.BIGINT(),DataTypes.DOUBLE(),
                DataTypes.DOUBLE(),DataTypes.DOUBLE(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()]
# create a TableSource
input_path = "hdfs://localhost:9000/user/greench/data/preprocessed_obesity_level.csv"
csv_source = CsvTableSource(input_path, field_names, field_types)
# Register a TableSource
flink_stream_table_env.register_table_source("csvTable", csv_source)
tab=flink_stream_table_env.from_path('csvTable')
# Q1: Gender distribution----------------------------
result = flink_stream_table_env.sql_query("select Gender as Gender,COUNT(*) as CNT from csvTable group by Gender")
output_path="/home/greench/Desktop/big_data/final/output/Q1_Gender_dist.csv"
flink_stream_table_env.create_temporary_table(
            'Q1_Gender',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Gender', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q1_Gender').wait()
## Q1: Age distribution distribution@@------------------------------
result = flink_stream_table_env.sql_query(
"select (CASE \
                                        WHEN Age between 0 and 20 then '[0,20]' \
                                        WHEN Age between 20 and 30 then '[20,30]' \
                                        WHEN Age between 30 and 40 then '[30,40]' \
                                        WHEN Age between 40 and 50 then '[40,50]' \
                                        WHEN Age between 50 and 50 then '[50,60]' \
                                        ELSE '>60' \
                                    END) as Age_range, \
                                    count(*) as CNT \
                                from csvTable \
                                group by (CASE \
                                        WHEN Age between 0 and 20 then '[0,20]' \
                                        WHEN Age between 20 and 30 then '[20,30]' \
                                        WHEN Age between 30 and 40 then '[30,40]' \
                                        WHEN Age between 40 and 50 then '[40,50]' \
                                        WHEN Age between 50 and 50 then '[50,60]' \
                                        ELSE '>60' \
                                    END)\
                                order by Age_range desc")
output_path="/home/greench/Desktop/big_data/final/output/Q1_Age_dist.csv"
flink_stream_table_env.create_temporary_table(
            'Q1_Age',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Age_Range', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q1_Age').wait()
# Q1: Obesity degree distribution distribution
result = flink_stream_table_env.sql_query("select Obesity as Obesity ,COUNT(*) as CNT from csvTable group by Obesity")
output_path="/home/greench/Desktop/big_data/final/output/Q1_Obesity_dist.csv"
flink_stream_table_env.create_temporary_table(
            'Q1_Obesity',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Obesity', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q1_Obesity').wait()
## Q1: Height distribution distribution of female and male@@------------------------------
result = flink_stream_table_env.sql_query(
"select (CASE \
                                        WHEN Height between 1.4 and 1.5 then '[1.40m,1.50m]' \
                                        WHEN Height between 1.5 and 1.6 then '[1.50m,1.60m]' \
                                        WHEN Height between 1.6 and 1.7 then '[1.60m,1.70m]' \
                                        WHEN Height between 1.7 and 1.8 then '[1.70m,1.80m]' \
                                        WHEN Height between 1.8 and 1.9 then '[1.80m,1.90m]' \
                                        WHEN Height between 1.9 and 2.0 then '[1.90m,2.00m]' \
                                    END) as Height_range, \
                                    count(*) as CNT \
                                from csvTable \
                                group by (CASE \
                                        WHEN Height between 1.4 and 1.5 then '[1.40m,1.50m]' \
                                        WHEN Height between 1.5 and 1.6 then '[1.50m,1.60m]' \
                                        WHEN Height between 1.6 and 1.7 then '[1.60m,1.70m]' \
                                        WHEN Height between 1.7 and 1.8 then '[1.70m,1.80m]' \
                                        WHEN Height between 1.8 and 1.9 then '[1.80m,1.90m]' \
                                        WHEN Height between 1.9 and 2.0 then '[1.90m,2.00m]' \
                                    END)\
                                order by Height_range asc")
output_path="/home/greench/Desktop/big_data/final/output/Q1_Height_dist.csv"
flink_stream_table_env.create_temporary_table(
            'Q1_Height',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Height_Range', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q1_Height').wait()
## Q1: Weight distribution distribution of female and male@@------------------------------
result = flink_stream_table_env.sql_query(
"select (CASE \
                                        WHEN Weight between 35 and 55 then '[35,55]' \
                                        WHEN Weight between 55 and 75 then '[55,75]' \
                                        WHEN Weight between 75 and 95 then '[75,95]' \
                                        WHEN Weight between 95 and 115 then '[95,115]' \
                                        WHEN Weight between 115 and 135 then '[115,135]' \
                                        WHEN Weight between 135 and 155 then '[135,155]' \
                                        WHEN Weight between 155 and 175 then '[155,175]' \
                                    END) as Weight_range, \
                                    count(*) as CNT \
                                from csvTable \
                                group by (CASE \
                                        WHEN Weight between 35 and 55 then '[35,55]' \
                                        WHEN Weight between 55 and 75 then '[55,75]' \
                                        WHEN Weight between 75 and 95 then '[75,95]' \
                                        WHEN Weight between 95 and 115 then '[95,115]' \
                                        WHEN Weight between 115 and 135 then '[115,135]' \
                                        WHEN Weight between 135 and 155 then '[135,155]' \
                                        WHEN Weight between 155 and 175 then '[155,175]' \
                                    END)\
                                order by Weight_range asc")
output_path="/home/greench/Desktop/big_data/final/output/Q1_Weight_dist.csv"
flink_stream_table_env.create_temporary_table(
            'Q1_Weight',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Weight_Range', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q1_Weight').wait()
## Q2-1 肥胖的原因分析之 CH2O(每日耗水量))平均值 @@-------------------------------------
result = flink_stream_table_env.sql_query(
"select Obesity, round(avg(CH2O),5) as CH2O_average FROM csvTable group by Obesity order by CH2O_average desc")
output_path="/home/greench/Desktop/big_data/final/output/Q2_CH2O_avg.csv"
flink_stream_table_env.create_temporary_table(
            'Q2_CH2O',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Obesity', DataTypes.STRING())
                        .column('CH2O_average', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q2_CH2O').wait()
## Q2-1 肥胖的原因分析之 FAF(运动频率)平均值 @@-------------------------------------
result = flink_stream_table_env.sql_query(
"select Obesity, round(avg(FAF),5) as FAF_average FROM csvTable group by Obesity order by FAF_average desc")
output_path="/home/greench/Desktop/big_data/final/output/Q2_FAF_avg.csv"
flink_stream_table_env.create_temporary_table(
            'Q2_FAF',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Obesity', DataTypes.STRING())
                        .column('FAF_average', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q2_FAF').wait()
## Q2-1 肥胖的原因分析之 TUE(使用电子设备的时间)平均值 @@-------------------------------------
result = flink_stream_table_env.sql_query(
"select Obesity, round(avg(TUE),5) as TUE_average FROM csvTable group by Obesity order by TUE_average desc")
output_path="/home/greench/Desktop/big_data/final/output/Q2_TUE_avg.csv"
flink_stream_table_env.create_temporary_table(
            'Q2_TUE',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Obesity', DataTypes.STRING())
                        .column('TUE_average', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q2_TUE').wait()
## Q2-1 肥胖的原因分析之 FCVC(食用蔬菜频率)平均值 @@-------------------------------------
result = flink_stream_table_env.sql_query(
"select Obesity, round(avg(FCVC),5) as average FROM csvTable group by Obesity order by average desc")
output_path="/home/greench/Desktop/big_data/final/output/Q2_FCVC_avg.csv"
flink_stream_table_env.create_temporary_table(
            'Q2_FCVC',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('Obesity', DataTypes.STRING())
                        .column('average', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q2_FCVC').wait()
### Q3-3 肥胖的原因分析之 有家族遗传 中所有肥胖类型的占比情况 @@--------------------
result = flink_stream_table_env.sql_query(
"select Obesity, COUNT(*) as CNT,  round(1.0*count(*)/SUM(COUNT(*)) OVER(),3) as percentage\
 from csvTable\
 where family_history_with_overweight= 1\
 group by Obesity\
 order by percentage desc")
output_path="/home/greench/Desktop/big_data/final/output/Q3_HasFamily.csv"
flink_stream_table_env.create_temporary_table(
            'Q33_hasFamily',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        # .column('Obesity', DataTypes.STRING())
                        .column('Obesity', DataTypes.STRING())
                        # .column('family_history_with_overweight', DataTypes.BIGINT())
                        .column('CNT', DataTypes.BIGINT())
                        .column('percentage', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q33_hasFamily').wait()
### Q3-2 肥胖的原因分析之 没有家族遗传中 所有肥胖类型的占比情况 @@-------------------------
result = flink_stream_table_env.sql_query(
"select Obesity, COUNT(*) as CNT,  round(1.0*count(*)/SUM(COUNT(*)) OVER(),3) as percentage\
 from csvTable\
 where family_history_with_overweight= 0\
 group by Obesity\
 order by percentage desc")
output_path="/home/greench/Desktop/big_data/final/output/NoFamily.csv"
flink_stream_table_env.create_temporary_table(
            'Q32_noFamily',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        # .column('Obesity', DataTypes.STRING())
                        .column('Obesity', DataTypes.STRING())
                        # .column('family_history_with_overweight', DataTypes.BIGINT())
                        .column('CNT', DataTypes.BIGINT())
                        .column('percentage', DataTypes.DOUBLE())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q32_noFamily').wait()
### Q3-1 肥胖的原因分析之 家族遗传 @@-------------------------------------
result = flink_stream_table_env.sql_query(
"select Obesity,family_history_with_overweight,COUNT(Obesity) as CNT from csvTable group by Obesity, family_history_with_overweight order by Obesity, family_history_with_overweight desc")
output_path="/home/greench/Desktop/big_data/final/output/Q3_Family.csv"
flink_stream_table_env.create_temporary_table(
            'Q31_Family',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        # .column('Obesity', DataTypes.STRING())
                        .column('Obesity', DataTypes.STRING())
                        .column('family_history_with_overweight', DataTypes.BIGINT())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q31_Family').wait()
### Q4 肥胖的原因分析之 FAVC(是否频繁食用高热量食物)-----------------------------------------
result = flink_stream_table_env.sql_query(
"select FAVC, Obesity, COUNT(Obesity) as CNT \
from csvTable \
group by FAVC, Obesity \
order by FAVC, Obesity desc")
output_path="/home/greench/Desktop/big_data/final/output/Q4_FAVC.csv"
flink_stream_table_env.create_temporary_table(
            'Q4_FAVC',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('FAVC', DataTypes.BIGINT())
                        .column('Obesity', DataTypes.STRING())
                        .column('CNT', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format('csv')
                .build())
result.execute().print()
result.execute_insert('Q4_FAVC').wait()

3.Draw_Picture.py 数据统计结果可视化代码:

import pandas as pd
import plotly.express as px
import matplotlib
import matplotlib.pyplot as plt
import numpy as np

### Q1 draw the Gender distribution -------------------------------
GenderNum = pd.read_csv("input/Q1_Gender_dist.csv", header=None)
GenderNum.columns = ["Gender", "Num"]
 
 
fig3 = px.pie(GenderNum,
              values = "Num",
              names = "Gender", 
              title = "收集人群的性别分布情况"
             )
fig3.show()
# ### draw the Obesity degree distribution -------------------------------
ObesityNum = pd.read_csv("input/Q1_Obesity_dist.csv", header=None)
ObesityNum.columns = ["Obesity", "Num"]
 
 
fig4 = px.pie(ObesityNum,
              values = "Num",
              names = "Obesity", 
              title = "收集人群的obesity_level分布情况"
             )
fig4.show()
### Q1 draw the age distribution -----------------------------
peopleNum = pd.read_csv("input/Q1_Age_dist.csv", header=None)
peopleNum.columns = ["Age_range", "Num"]
 
fig1 = px.bar(peopleNum,
              x = "Age_range",
              y = "Num", 
              title = "收集人群的年龄分布情况",
              color='Num'
             )
fig1.show()
 
fig2 = px.pie(peopleNum,
              values = "Num",
              names = "Age_range", 
              title = "收集人群的年龄分布情况"
             )
fig2.show()
### Q1 draw the Obesity degree distribution -------------------------------
ObesityNum = pd.read_csv("input/Q1_Obesity_dist.csv", header=None)
ObesityNum.columns = ["Obesity", "Num"]
 
 
fig4 = px.pie(ObesityNum,
              values = "Num",
              names = "Obesity", 
              title = "收集人群的obesity_level分布情况"
             )
fig4.show()
### Q1 draw the Height distribution -------------------------------
HeightNum = pd.read_csv("input/Q1_Height_dist.csv", header=None)
HeightNum.columns = ["Height", "Num"]
 
 
fig5 = px.pie(HeightNum,
              values = "Num",
              names = "Height", 
              title = "收集人群的Height(M)分布情况"
             )
fig5.show()
### Q1 draw the Weight distribution -------------------------------
WeightNum = pd.read_csv("input/Q1_Weight_dist.csv", header=None)
WeightNum.columns = ["Weight", "Num"]
 
 
fig6 = px.pie(WeightNum,
              values = "Num",
              names = "Weight", 
              title = "收集人群的Weight(Kg)分布情况"
             )
fig6.show()
### Q2:draw the CH2O_avg -----------------------------
CH2O_AVG = pd.read_csv("input/Q2_CH2O_avg.csv", header=None)
CH2O_AVG.columns = ["Obesity_degree", "Avg"]
 
fig_Q2_CH2O = px.bar(CH2O_AVG,
              x = "Obesity_degree",
              y = "Avg", 
              title = "不同Obesity_degree人群的平均每日耗水量",
              color='Avg'
             )
fig_Q2_CH2O.show()
### Q2 draw the FAF_avg -----------------------------
FAF_AVG = pd.read_csv("input/Q2_FAF_avg.csv", header=None)
FAF_AVG.columns = ["Obesity_degree", "Avg"]
 
fig_Q2_FAF = px.bar(FAF_AVG,
              x = "Obesity_degree",
              y = "Avg", 
              title = "不同Obesity_degree人群的平均运动频率",
              color='Avg'
             )
fig_Q2_FAF.show()
### draw the TUE_avg -----------------------------
TUE_AVG = pd.read_csv("input/Q2_TUE_avg.csv", header=None)
TUE_AVG.columns = ["Obesity_degree", "Avg"]
 
fig_Q2_TUE = px.bar(TUE_AVG,
              x = "Obesity_degree",
              y = "Avg", 
              title = "不同Obesity_degree人群的平均使用电子设备的时间",
              color='Avg'
             )
fig_Q2_TUE.show()
### Q2 draw the FCVC_avg -----------------------------
FCVC_AVG = pd.read_csv("input/Q2_FCVC_avg.csv", header=None)
FCVC_AVG.columns = ["Obesity_degree", "Avg"]
 
fig_Q2_FCVC = px.bar(FCVC_AVG,
              x = "Obesity_degree",
              y = "Avg", 
              title = "不同Obesity_degree人群的平均食用蔬菜频次",
              color='Avg'
             )
fig_Q2_FCVC.show()
## Q3: draw the has family with obesity history the dist of different obesity degree -----------------------------
HasFamily_dist = pd.read_csv("input/Q3_HasFamily.csv", header=None)
HasFamily_dist.columns = ["Obesity_degree", "Num","Percantage"]
 
fig_Q3_HasFamily = px.pie(HasFamily_dist,
              names = "Obesity_degree",
              values = "Num", 
              title = "有家族肥胖历史的人群中,不同Obesity_degree人群的分布"
            #   color='Num'
             )
fig_Q3_HasFamily.show()
## Q3: draw the not has family with obesity history the dist of different obesity degree -----------------------------
NoFamily_dist = pd.read_csv("input/Q3_NoFamily.csv", header=None)
NoFamily_dist.columns = ["Obesity_degree", "Num","Percantage"]
 
fig_Q3_NoFamily = px.bar(NoFamily_dist,
              x = "Obesity_degree",
              y = "Num", 
              title = "没有家族肥胖历史的人群中,不同Obesity_degree人群的分布",
              color='Num'
             )
fig_Q3_NoFamily.show()
### Q3:统计各种肥胖类型中,有家族肥胖历史和没有家族肥胖历史人群的分布情况-------------------------
data = pd.read_csv("input/Q3_Family.csv", header=None)
data = np.array(data)
labels =['0rmal_Weight','Insufficient_Weight','Obesity_Type_I','Obesity_Type_II','Obesity_Type_III','Overweight_Level_I','Overweight_Level_II']
col2 = data[:, 2]
has_family=[]
no_family=[]
cnt=0
for i in col2:
    cnt=cnt+1
    if cnt%2==0:
        no_family.append(i)
    else:
        has_family.append(i)
x = np.arange(len(labels))  # 标签位置
width = 0.2  # 柱状图的宽度,可以根据自己的需求和审美来改
fig, ax = plt.subplots(figsize=(28,12))
rects1 = ax.bar(x - width/2, has_family, width, label='Has famliy obesity history')
rects2 = ax.bar(x + width/2, no_family, width,label='NO famliy obesity history')
# 为y轴、标题和x轴等添加一些文本。
ax.set_ylabel('People Num', fontsize=20)
ax.set_xlabel('Obesity Degree', fontsize=20)
ax.set_title('Comparison of people with and without family history of obesity among people with different types of obesity',size=20)
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.tick_params(labelsize=20)
plt.rcParams['font.size'] = 20
plt.xlabel('Obesity Degree',fontsize=25)
plt.ylabel('People Num',fontsize=25)
ax.legend()
def autolabel(rects):
    """在*rects*中的每个柱状条上方附加一个文本标签,显示其高度"""
    for rect in rects:
        height = rect.get_height()
        ax.annotate('{}'.format(height),
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3),  # 3点垂直偏移
                    textcoords="offset points",
                    ha='center', va='bottom')
autolabel(rects1)
autolabel(rects2)
# fig.savefig('family.png')
plt.show()
### Q4:统计(FAVC)不同食用高热量食物频率的人群中不同肥胖类型的数量情况----------------------------
data = pd.read_csv("input/Q4_FAVC.csv", header=None)
data = np.array(data)
labels={'No','Yes'}
cols =['Overweight_Level_II','Overweight_Level_I','Obesity_Type_III','Obesity_Type_II','Obesity_Type_I','Insufficient_Weight','0rmal_Weight']
col2 = data[:, 2]
y1,y2,y3,y4,y5,y6,y7=[],[],[],[],[],[],[]
ylist = [[] for i in range(7)] 
for i in range(0,7):
    ylist[i].append(col2[i])
    ylist[i].append(col2[i+7])
    print(ylist[i])
x = np.arange(len(labels))  # 标签位置
width = 0.1  # 柱状图的宽度,可以根据自己的需求和审美来改
fig, ax = plt.subplots(figsize=(28,12))
rects1 = ax.bar(x - width*3.5, ylist[0], width, label=cols[0])
rects2 = ax.bar(x - width*2.5, ylist[1], width,label=cols[1])
rects3 = ax.bar(x - width*1.5, ylist[2], width,label=cols[2])
rects4 = ax.bar(x - width*0.5, ylist[3], width,label=cols[3])
rects5 = ax.bar(x + width*0.5, ylist[4], width,label=cols[4])
rects6 = ax.bar(x + width*1.5, ylist[5], width,label=cols[5])
rects7 = ax.bar(x + width*2.5, ylist[6], width,label=cols[6])
ax.set_title('Relationship between food consumption frequency and obesity levels',size=20)
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.tick_params(labelsize=20)
plt.rcParams['font.size'] = 20
plt.xlabel('Whether Frequently consuming of high-caloric food',fontsize=25)
plt.ylabel('People Num',fontsize=25)
ax.legend()
def autolabel(rects):
    """在*rects*中的每个柱状条上方附加一个文本标签,显示其高度"""
    for rect in rects:
        height = rect.get_height()
        ax.annotate('{}'.format(height),
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3),  # 3点垂直偏移
                    textcoords="offset points",
                    ha='center', va='bottom')
autolabel(rects1)
autolabel(rects2)
autolabel(rects3)
autolabel(rects4)
autolabel(rects5)
autolabel(rects6)
autolabel(rects7)
# fig.savefig('FAVC.png')
plt.show()