第4章代码-林子雨编著-《Flink编程基础(Scala版)》

大数据学习路线图

林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码

第4章 Flink环境搭建和使用方法

cd  /home/hadoop
sudo  tar  -zxvf  ~/Downloads/flink-1.11.2-bin-scala_2.12.tgz  -C  /usr/local/
cd  /usr/local
sudo  mv  ./flink-1.11.2  ./flink
sudo  chown  -R  hadoop:hadoop  ./flink  # hadoop是当前登录Linux系统的用户名
vim ~/.bashrc
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
source ~/.bashrc
cd /usr/local/flink
./bin/start-cluster.sh
jps
cd /usr/local/flink/bin
./flink run /usr/local/flink/examples/batch/WordCount.jar
cd  /usr/local/hadoop
./sbin/start-dfs.sh
jps
./sbin/stop-dfs.sh
cd  /usr/local/flink
./bin/start-scala-shell.sh local
scala> 8*2+5
scala>:quit
https://downloads.apache.org/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zip
sudo unzip ~/Downloads/apache-maven-3.6.3-bin.zip -d /usr/local
cd /usr/local
sudo mv apache-maven-3.6.3/ ./maven
sudo chown -R hadoop ./maven
cd /usr/local/maven/conf
vim settings.xml
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
                      http://maven.apache.org/xsd/settings-1.0.0.xsd">
  <mirrors>
    <mirror>
     <id>aliyunmaven</id>
     <mirrorOf>*</mirrorOf>
     <name>阿里云公共仓库</name>
     <url>https://maven.aliyun.com/repository/public</url>
    </mirror>
     <mirror>
     <id>aliyunmaven</id>
     <mirrorOf>*</mirrorOf>
     <name>阿里云谷歌仓库</name>
     <url>https://maven.aliyun.com/repository/google</url>
    </mirror>
    <mirror>
     <id>aliyunmaven</id>
     <mirrorOf>*</mirrorOf>
     <name>阿里云阿帕奇仓库</name>
     <url>https://maven.aliyun.com/repository/apache-snapshots</url>
    </mirror>
    <mirror>
     <id>aliyunmaven</id>
     <mirrorOf>*</mirrorOf>
     <name>阿里云spring仓库</name>
     <url>https://maven.aliyun.com/repository/spring</url>
    </mirror>
    <mirror>
     <id>aliyunmaven</id>
     <mirrorOf>*</mirrorOf>
     <name>阿里云spring插件仓库</name>
     <url>https://maven.aliyun.com/repository/spring-plugin</url>
    </mirror>
  </mirrors>
</settings>
cd ~ #进入用户主目录
mkdir -p ./flinkapp/src/main/scala
package cn.edu.xmu.dblab

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {

    //第1步:建立执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //第2步:创建数据源
    val text = env.fromElements(
      "hello, world!",
      "hello, world!",
      "hello, world!")

    //第3步:对数据集指定转换操作
val counts = text.flatMap { _.toLowerCase.split(" ") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // 第4步:输出结果
    counts.print()
  }
}
<project>
    <groupId>cn.edu.xmu.dblab</groupId>
    <artifactId>wordcount</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>WordCount</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
   <dependencies>
     <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.12</artifactId>
          <version>1.11.2</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.12</artifactId>
          <version>1.11.2</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>      
</dependencies>  
<build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
cd ~/flinkapp
find .
cd ~/flinkapp    #一定把这个目录设置为当前目录
/usr/local/maven/bin/mvn package
cd ~/flinkapp
/usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.WordCount ./target/wordcount-1.0.jar
cd ~  #进入用户主目录
mkdir -p ./flinkapp2/src/main/scala
package cn.edu.xmu.dblab

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object StreamWordCount{
  def main(args: Array[String]): Unit = {

    //第1步:建立执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //第2步:创建数据源
    val source = env.socketTextStream("localhost",9999,'\n')

    //第3步:对数据集指定转换操作逻辑
    val dataStream = source.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      .timeWindow(Time.seconds(2),Time.seconds(2))
      .sum(1)

    //第4步:指定计算结果输出位置
    dataStream.print()

    //第5步:指定名称并触发流计算
    env.execute("Flink Streaming Word Count")
  }
}
cd ~/flinkapp2
find .
cd ~/flinkapp2    #一定把这个目录设置为当前目录
/usr/local/maven/bin/mvn package
nc  -lk  9999
cd ~/flinkapp2
/usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.StreamWordCount ./target/wordcount-1.0.jar
cd /usr/local/flink/log
tail -f flink*.out
cd ~  #进入用户主目录
sudo tar -zxvf /home/hadoop/download/ideaIC-2020.2.3.tar.gz -C /usr/local  #解压文件
cd /usr/local
sudo mv ./idea-IU-202.7660.26 ./idea   #重命名,方便操作
sudo chown -R hadoop ./idea   #为当前Linux用户hadoop赋予针对idea目录的权限
cd /usr/local/idea
./bin/idea.sh
package cn.edu.xmu.dblab

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {

    //第1步:建立执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //第2步:创建数据源
    val text = env.fromElements(
      "hello, world!",
      "hello, world!",
      "hello, world!")

    //第3步:对数据集指定转换操作
    val counts = text.flatMap { _.toLowerCase.split(" ") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // 第4步:输出结果
    counts.print()
  }
}
<project>
    <groupId>dblab</groupId>
    <artifactId>WordCount</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>WordCount</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
cd ~/flinkapp
/usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.WordCount ./target/WordCount-1.0.jar
sudo vim /etc/hostname
sudo vim /etc/hosts
192.168.1.101   Master
192.168.1.102   Slave1
192.168.1.103   Slave2
ping Master -c 3   #只ping 3次就会停止,否则要按Ctrl+c中断ping命令
ping Slave1 -c 3
cd /usr/lib
sudo mkdir jvm #创建/usr/lib/jvm目录用来存放JDK文件
cd ~ #进入hadoop用户的主目录
cd Downloads
sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm
vim ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
source ~/.bashrc
java -version
sudo apt-get install openssh-server
ssh localhost
exit
cd ~/.ssh              # 如果没有该目录,先执行一次ssh localhost
rm ./id_rsa*           # 删除之前生成的公匙(如果已经存在)
ssh-keygen -t rsa       # 执行该命令后,遇到提示信息,一直按回车就可以
cat ./id_rsa.pub >> ./authorized_keys
scp ~/.ssh/id_rsa.pub hadoop@Slave1:/home/hadoop/
scp ~/.ssh/id_rsa.pub hadoop@Slave2:/home/hadoop/
mkdir ~/.ssh       # 如果不存在该文件夹需先创建,若已存在,则忽略本命令
cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
rm ~/id_rsa.pub    # 用完以后就可以删掉
ssh Slave1
ssh Slave2
sudo  tar  -zxf  ~/Downloads/flink-1.11.2-bin-scala_2.12.tgz  -C  /usr/local/
cd  /usr/local
sudo  mv  ./flink-1.11.2  ./flink
sudo  chown  -R  hadoop:hadoop  ./flink  # hadoop是当前登录Linux系统的用户名
vim  ~/.bashrc
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
source  ~/.bashrc
cd  /usr/local/flink/conf
vim flink-conf.yaml
jobmanager.rpc.address: Master
taskmanager.tmp.dirs: /usr/local/flink/tmp
cd  /usr/local/flink/conf
vim masters
Master:8081
cd  /usr/local/flink/conf
vim workers
Master
Slave1
Slave2
cd  /usr/local/
tar  -zcf  ~/flink.master.tar.gz  ./flink
cd  ~
scp  ./flink.master.tar.gz  Slave1:/home/hadoop
scp  ./flink.master.tar.gz  Slave2:/home/hadoop
sudo  rm  -rf  /usr/local/flink/
sudo  tar  -zxf  ~/flink.master.tar.gz  -C  /usr/local
sudo  chown  -R  hadoop  /usr/local/flink
cd /usr/local/flink
sudo mkdir tmp
sudo chmod -R 755 ./tmp
cd  /usr/local/flink/
./bin/start-cluster.sh
jps
cd /usr/local/flink/bin
./flink run /usr/local/flink/examples/batch/WordCount.jar
cd /usr/local/flink
./bin/stop-cluster.sh