基于Flink的CVPR近五年论文数据的分析与处理

大数据学习路线图

【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。
作者:厦门大学信息学院2020级研究生 李毅男
指导老师:厦门大学数据库实验室 林子雨 博士/副教授
相关教材:林子雨、陶继平编著《Flink编程基础(Scala版)》(官网
相关案例:基于Scala语言的Flink数据处理分析案例集锦

本实验通过python爬取了CVPR近五年的论文数据,并通过Scala语言编写了Flink程序,最后使用python进行了数据可视化

一、实验环境

1. 数据分析环境

操作系统: Windows 10(用于数据分析),

IDE: IDEA 2021.1.1.x64

Flink: flink-1.12.4

Scala: scala-2.12.4

2. 数据获取与数据可视化环境

操作系统:Linux Ubuntu 16.04

IDE: Visual Studio Code

Anaconda3

python 3.6

Jupyter Notebook

安装完上述环境以后,为了支持Python可视化分析,还需要执行如下命令创建conda环境并安装numpy、matplotlib、wordcloud、pandas、beautifulsoup4等组件:

conda create -n BigData python=3.6
pip install numpy
pip install matplotlib
pip install wordcloud
pip install pandas
pip install beautifulsoup4
pip install requests

二、实验数据介绍

本次实验所采用的数据,从CVF官方网站(网址:http://openaccess.thecvf.com/)爬取,主要是最近5年(2016-2020)计算机视觉领域的顶会CVPR主会的文章数据,包括文章标题和全部作者。数据规模达到5164篇论文,8579名作者。本次大作业中主要计算分析了论文中的热点词汇和近五年的高产作者。

特别说明:实验所用数据均为网上爬取,没有得到官方授权使用,使用范围仅限本次实验使用,请勿用于商业用途

三、数据获取

可以通过百度云下载数据获取和处理的的源代码

链接:https://pan.baidu.com/s/13Odnp15ux-Zcyha5V39Nkg 
提取码:ziyu 

1. 数据爬取

CVPR的论文都存放在http://openaccess.thecvf.com/CVPR{年份}.py网页中,如http://openaccess.thecvf.com/CVPR2016.py

向该url发送get请求,将得到的response的content按utf-8解码使用BeautifulSoup进行HTML解析。在浏览器中按F12观察网页结构,发现其文章标题放在dt标签中,例:< dt class="ptitle">

通过Beautifulsoup的find_all函数,获得全部的文章标题。

在每个包含文章标题的dt标签中,还包含了指向这篇文章具体信息的网址,在a标签中,例如:
< a href="content_cvpr_2016/html/Hendricks_Deep_Compositional_Captioning_CVPR_2016_paper.html">Deep Compositional Captioning: Describing Novel Object Categories Without Paired Training Data< /a> 通过对这个文章的连接url发送get请求,并用Beautifulsoup进行解析,可以获得这篇文章的具体信息,包括了文章标题、摘要、作者名、以及这篇文章的pdf下载链接等,本次实验只爬取了文章作者名。

每次执行程序爬取指定某年的论文信息,此外,18年之后的论文,由于数目增加,官方将其按照开会日期存放,如18年的论文分别存放在了三个网页中:

urls=[

'https://openaccess.thecvf.com/CVPR2018?day=2018-06-19',

'https://openaccess.thecvf.com/CVPR2018?day=2018-06-20',

'https://openaccess.thecvf.com/CVPR2018?day=2018-06-21'

]

2. 数据存储与清洗

上一步获取的信息直接保存到list中,转为pandas的DataFrame然后直接存储为csv文件,表头为title和authors,之后将5年的5个csv文件连接到一起形成一个csv文件。但有一个问题,由于文章标题和作者名的字符串中会含有“,”而csv文件是以逗号来分割不同的列,所以,要替换作者名中的“,”为“;”,而对于文章标题,我删去了其中的引号和括号,为了统一词的形式,方便统计热点词。

3.数据获取源代码

爬虫代码:

import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
headers = {'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36'}#创建头部信息
urls=[
    'https://openaccess.thecvf.com/CVPR2020?day=2020-06-16',
    'https://openaccess.thecvf.com/CVPR2020?day=2020-06-17',
    'https://openaccess.thecvf.com/CVPR2020?day=2020-06-18'
]
alllist=[]
dayCount = 0

for url in urls:
    r=requests.get(url,headers=headers)
    content=r.content.decode('utf-8')
    soup = BeautifulSoup(content, 'html.parser')
    dts=soup.find_all('dt',class_='ptitle')
    hts='http://openaccess.thecvf.com/'
    #数据爬取
    for i in range(len(dts)):
        title=dts[i].a.text.strip()
        print('这是第'+str(i+dayCount)+'篇文章:', title)
        href=hts+dts[i].a['href']
        r = requests.get(href, headers=headers)
        content = r.content.decode('utf-8')
        soup = BeautifulSoup(content, 'html.parser')
        #print(title,href)
        div_author=soup.find(name='div',attrs={"id":"authors"})
        authors = div_author.text.strip().split(';')[0]
        print('第'+str(i+dayCount)+'篇文章的作者:', authors)
        value=(title, authors)
        alllist.append(value)
    dayCount+=len(dts)

name = ['title', 'authors']
papers = pd.DataFrame(columns=name, data=alllist)
print(papers.head())
papers.to_csv('CVPR2020.csv', encoding='utf-8')

将代码中的urls中CVPR2020换为其他年份,并注意后面的日期。

csv整合代码:

import pandas as pd
f1 = pd.read_csv('CVPR2016.csv')
f2 = pd.read_csv('CVPR2017.csv')
f3 = pd.read_csv('CVPR2018.csv')
f4 = pd.read_csv('CVPR2019.csv')
f5 = pd.read_csv('CVPR2020.csv')
f1 = f1.iloc[:,-2:]
f2 = f2.iloc[:,-2:]
f3 = f3.iloc[:,-2:]
f4 = f4.iloc[:,-2:]
f5 = f5.iloc[:,-2:]
file = [f1,f2,f3,f4,f5]
final = pd.concat(file,ignore_index=True)
final.to_csv('CVPR_cat.csv')

csv 数据清洗代码:

import pandas as pd
import re

data = pd.read_csv('CVPR_cat.csv')
data[u'authors'] = data[u'authors'].astype(str)
data[u'authors'] = data[u'authors'].apply(lambda x:re.sub(',\s+',';',x))
data[u'title'] = data[u'title'].astype(str)
d = data[u'title'].apply(lambda x:re.sub(',',' ',x))
data[u'title'] = d.apply(lambda x:re.sub('\"\(\)',' ',x))
data = data.iloc[:,1:3]
data.to_csv('CVPR_cat.csv', index=True, encoding='utf-8')

四、数据分析

数据分析主要使用到了Flink的DataSet API,统计了论文列表中的热点词汇和高产作者并进行了降序排序。

1. 计算论文的热点词汇

思路:将每个文章标题进行分词,然后统计每个词的出现次数并排序。相关步骤如下;

1、创建批处理执行环境bEnv

2、使用readCsvFile[PapersLog]读取csv文件并保存在自定义case class PapersLog(Index:Int, title:String,authors:String)中

3、使用flatMap将每一条论文标题按照空格字符分割

4、使用filter判断是否为空,并滤去一些英文停用词如“a,an,the”

5、使用Map给每一条单词赋值1

6、使用groupBy去重并用sum计算每一条的和。

7、使用sortPartition(field=1,Order.DESCENDING)对数值列进行倒序排序

6、setParallelism(1)不设并行,保证全局顺序正确

7、最后将结果writeAsCsv写入到csv文件中

代码如下:

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import scala.io.Source
object compute_cvpr {
    def main(args: Array[String]): Unit = {
      val bEnv = ExecutionEnvironment.getExecutionEnvironment
      val filePath="F:\\pyProject\\spider\\CVPR_cat.csv"
      val csv = bEnv.readCsvFile[PapersLog](filePath,ignoreFirstLine = true)
      //println(csv.hashCode)
      val stopEnword = Source.fromFile("src/main/scala/stopwords.txt").getLines()
      val stopWordList = stopEnword.toList
      val counts = csv.flatMap(_.title.split(" "))
        .filter(_.nonEmpty)
        .filter(stopword(_, stopWordList))
        .map((_,1))
        .groupBy(0)
        .sum(1)
        .sortPartition(field = 1, Order.DESCENDING)
        .setParallelism(1)
      counts.writeAsCsv("src/main/scala/CVPR/hotWords16-20.csv").setParallelism(1)
      bEnv.execute("batch wordCount")
    }
    def stopword(string: String, stopWordList: List[String]):Boolean = {
      !stopWordList.contains(string.toLowerCase())
    }
    case class
    PapersLog(index:Int,title:String,authors:String)
}

2. 计算高产作者

思路:将作者列表进行分词,然后统计每个名字的出现次数并排序。相关步骤如下;

1、创建批处理执行环境bEnv

2、使用readCsvFile[PapersLog]读取csv文件并保存在自定义case class PapersLog(Index:Int, title:String, authors:String)中

3、使用flatMap将每一条作者列表按照”;”字符分割

4、使用filter判断是否为空。

5、使用Map给每一条单词赋值1

6、使用groupBy去重并用sum计算每一条的和。

7、使用sortPartition(field=1,Order.DESCENDING)对数值列进行倒序排序

6、setParallelism(1)不设并行,保证全局顺序正确

7、最后将结果writeAsCsv写入到csv文件中

具体代码如下:

import org.apache.flink.api.common.operators.Order

import org.apache.flink.api.scala.ExecutionEnvironment

import org.apache.flink.api.scala._

object compute_author {

 def main(args: Array[String]): Unit = {

  val bEnv = ExecutionEnvironment.getExecutionEnvironment

  val filePath="F:\\pyProject\\spider\\CVPR_cat.csv"

  val csv = bEnv.readCsvFile[PapersLog](filePath,ignoreFirstLine = true)

  csv.print()

  val counts = csv.flatMap(_.authors.split(";"))

   .filter(_.nonEmpty)

   .map((_,1))

   .groupBy(0)

   .sum(1)

   .sortPartition(field = 1, Order.DESCENDING)

   .setParallelism(1)

counts.writeAsCsv("src/main/scala/CVPR/authors_all_DES.csv").setParallelism(1)

  bEnv.execute("batch wordCount")

 }

 case class

 PapersLog(Index:Int, title:String,authors:String)

}

上述程序使用依赖pom.xml文件为:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>

  <artifactId>Flink</artifactId>

  <version>1.0-SNAPSHOT</version>

  <properties>

    <maven.compiler.source>8</maven.compiler.source>

    <maven.compiler.target>8</maven.compiler.target>

    <flink.version>1.12.4</flink.version>

    <scala.version>2.12</scala.version>

  </properties>

  <dependencies>

    <dependency>

      <groupId>org.apache.flink</groupId>

      <artifactId>flink-scala_${scala.version}</artifactId>

      <version>${flink.version}</version>

      <scope>compile</scope>

    </dependency>

    <dependency>

      <groupId>org.apache.flink</groupId>

      <artifactId>flink-streaming-scala_${scala.version}</artifactId>

      <version>${flink.version}</version>

      <scope>compile</scope>

    </dependency>

  </dependencies>

</project>

五、数据可视化

​ 数据可视化使用python的matplotlib来画柱状图,用wordcloud画词云,使用pip安装在conda环境中

步骤如下:

1、使用pandas的read_csv方法读入保存文章标题的csv文件并用set_index转换为dict形式

2、设置词云的颜色

3、使用wordcloud的WordCloud画词云,字体为’TIMES.ttf’(新罗马体),背景颜色为白色,宽2000高1200,设置最多显示350个词,最大字体大小为500。

4、使用generate_from_frequencies(文章标题字典对象)生成词云。

5、使用matplotlib画图并保存

6、使用matplotlib从文章标题字典中读取前20高频词汇,画出柱状图并保存。

7、使用pandas的read_csv方法读入保存文章作者的csv文件并用set_index转换为dict形式

8、使用matplotlib从文章作者字典中读取前20高产作者,画出横向柱状图并保存。

词云图如下:

高频词柱状图如下:

高产作者条形图如下:

可视化代码如下:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import wordcloud
import random
import matplotlib.font_manager as fm

hotWord_all = pd.read_csv('CVPR/hotWords16-20_des.csv',header=None, names=['word', 'num'])
hotWord_all.head()
hotWord_dict = hotWord_all.set_index(['word'])['num'].to_dict()
# 设置词云
wc = wordcloud.WordCloud(
    font_path='TIMES.TTF',
    background_color='white',
    width=2000,height=1200,
    max_words=350,
    max_font_size=500,)
# 生成词云
wc.generate_from_frequencies(hotWord_dict)
# 画词云并保存
plt.imshow(wc)
plt.axis('off')
plt.savefig('hotwordcloud.png',dpi=600)
plt.show()
# 画热点词前20柱状图
fig = plt.figure(figsize=(14,6))
YaHei = fm.FontProperties(fname='MSYH.TTC')
TNR = fm.FontProperties(fname='TIMES.TTF')
hotWordTop20 = []
hotWordTop20_num = []
stopword = ['via', 'Using']
for i, (word, num) in enumerate(hotWord_dict.items()):
    if word in stopword:
        continue
    plt.text(word, num+5, num, ha='center', va='bottom', fontsize=8, fontproperties=TNR)#+0.05 表示高于图0.0
    hotWordTop20.append(word)
    hotWordTop20_num.append(num)
    if len(hotWordTop20)==20:
        break
plt.bar(hotWordTop20,hotWordTop20_num,color='rgby')
plt.title('包含热点词汇的文章数',fontproperties=YaHei) # 标题
plt.xlabel('热点词汇',fontproperties=YaHei) # x轴标签
plt.xticks(rotation=-45)
plt.ylabel('文\n章\n数',fontproperties=YaHei,rotation='horizontal') # y轴标签
plt.savefig('top20Word.png',dpi=600)
# plt.tight_layout()
plt.show()
# 读文章作者csv
authors_all = pd.read_csv('CVPR/authors_all_DES.csv',header=None, names=['authors', 'num'])
authors_all.head()
authors_dict = authors_all.set_index(['authors'])['num'].to_dict()
fig2 = plt.figure(figsize=(12,8))
authorsTop20 = []
authorsTop20_num = []
# 画前20高产作者条形图
for i, (author, num) in enumerate(authors_dict.items()):
    plt.text(num+1,author, num, ha='center', va='bottom', fontsize=8, fontproperties=TNR)#+0.05 表示高于图0.0
    authorsTop20.append(author)
    authorsTop20_num.append(num)
    if i==19:
        break

plt.barh(authorsTop20[::-1],authorsTop20_num[::-1],color='rgby') # 从大到小画图
plt.title('高产作者',fontproperties=YaHei) # 标题
plt.ylabel('作\n者\n名',fontproperties=YaHei,rotation='horizontal') # y轴标签
plt.xlabel('文章数',fontproperties=YaHei) # x轴标签
plt.savefig('top10authors.png',dpi=600, bbox_inches="tight")
plt.tight_layout()
plt.show()