林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码
第4章 Flink环境搭建和使用方法
- cd /home/hadoop
- sudo tar -zxvf ~/Downloads/flink-1.11.2-bin-scala_2.12.tgz -C /usr/local/
- cd /usr/local
- sudo mv ./flink-1.11.2 ./flink
- sudo chown -R hadoop:hadoop ./flink # hadoop是当前登录Linux系统的用户名
- vim ~/.bashrc
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
- source ~/.bashrc
- cd /usr/local/flink
- ./bin/start-cluster.sh
- jps
- cd /usr/local/flink/bin
- ./flink run /usr/local/flink/examples/batch/WordCount.jar
- cd /usr/local/hadoop
- ./sbin/start-dfs.sh
- jps
- ./sbin/stop-dfs.sh
- cd /usr/local/flink
- ./bin/start-scala-shell.sh local
- scala> 8*2+5
- scala>:quit
https://downloads.apache.org/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zip
- sudo unzip ~/Downloads/apache-maven-3.6.3-bin.zip -d /usr/local
- cd /usr/local
- sudo mv apache-maven-3.6.3/ ./maven
- sudo chown -R hadoop ./maven
- cd /usr/local/maven/conf
- vim settings.xml
- <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
- http://maven.apache.org/xsd/settings-1.0.0.xsd">
- <mirrors>
- <mirror>
- <id>aliyunmaven</id>
- <mirrorOf>*</mirrorOf>
- <name>阿里云公共仓库</name>
- <url>https://maven.aliyun.com/repository/public</url>
- </mirror>
- <mirror>
- <id>aliyunmaven</id>
- <mirrorOf>*</mirrorOf>
- <name>阿里云谷歌仓库</name>
- <url>https://maven.aliyun.com/repository/google</url>
- </mirror>
- <mirror>
- <id>aliyunmaven</id>
- <mirrorOf>*</mirrorOf>
- <name>阿里云阿帕奇仓库</name>
- <url>https://maven.aliyun.com/repository/apache-snapshots</url>
- </mirror>
- <mirror>
- <id>aliyunmaven</id>
- <mirrorOf>*</mirrorOf>
- <name>阿里云spring仓库</name>
- <url>https://maven.aliyun.com/repository/spring</url>
- </mirror>
- <mirror>
- <id>aliyunmaven</id>
- <mirrorOf>*</mirrorOf>
- <name>阿里云spring插件仓库</name>
- <url>https://maven.aliyun.com/repository/spring-plugin</url>
- </mirror>
- </mirrors>
- </settings>
- cd ~ #进入用户主目录
- mkdir -p ./flinkapp/src/main/scala
- package cn.edu.xmu.dblab
- import org.apache.flink.api.scala._
- object WordCount {
- def main(args: Array[String]): Unit = {
- //第1步:建立执行环境
- val env = ExecutionEnvironment.getExecutionEnvironment
- //第2步:创建数据源
- val text = env.fromElements(
- "hello, world!",
- "hello, world!",
- "hello, world!")
- //第3步:对数据集指定转换操作
- val counts = text.flatMap { _.toLowerCase.split(" ") }
- .map { (_, 1) }
- .groupBy(0)
- .sum(1)
- // 第4步:输出结果
- counts.print()
- }
- }
- <project>
- <groupId>cn.edu.xmu.dblab</groupId>
- <artifactId>wordcount</artifactId>
- <modelVersion>4.0.0</modelVersion>
- <name>WordCount</name>
- <packaging>jar</packaging>
- <version>1.0</version>
- <repositories>
- <repository>
- <id>alimaven</id>
- <name>aliyun maven</name>
- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
- </repository>
- </repositories>
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.12</artifactId>
- <version>1.11.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_2.12</artifactId>
- <version>1.11.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>1.11.2</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.4.6</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
- cd ~/flinkapp
- find .
- cd ~/flinkapp #一定把这个目录设置为当前目录
- /usr/local/maven/bin/mvn package
- cd ~/flinkapp
- /usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.WordCount ./target/wordcount-1.0.jar
- cd ~ #进入用户主目录
- mkdir -p ./flinkapp2/src/main/scala
- package cn.edu.xmu.dblab
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.windowing.time.Time
- object StreamWordCount{
- def main(args: Array[String]): Unit = {
- //第1步:建立执行环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- //第2步:创建数据源
- val source = env.socketTextStream("localhost",9999,'\n')
- //第3步:对数据集指定转换操作逻辑
- val dataStream = source.flatMap(_.split(" "))
- .map((_,1))
- .keyBy(0)
- .timeWindow(Time.seconds(2),Time.seconds(2))
- .sum(1)
- //第4步:指定计算结果输出位置
- dataStream.print()
- //第5步:指定名称并触发流计算
- env.execute("Flink Streaming Word Count")
- }
- }
- cd ~/flinkapp2
- find .
- cd ~/flinkapp2 #一定把这个目录设置为当前目录
- /usr/local/maven/bin/mvn package
- nc -lk 9999
- cd ~/flinkapp2
- /usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.StreamWordCount ./target/wordcount-1.0.jar
- cd /usr/local/flink/log
- tail -f flink*.out
- cd ~ #进入用户主目录
- sudo tar -zxvf /home/hadoop/download/ideaIC-2020.2.3.tar.gz -C /usr/local #解压文件
- cd /usr/local
- sudo mv ./idea-IU-202.7660.26 ./idea #重命名,方便操作
- sudo chown -R hadoop ./idea #为当前Linux用户hadoop赋予针对idea目录的权限
- cd /usr/local/idea
- ./bin/idea.sh
- package cn.edu.xmu.dblab
- import org.apache.flink.api.scala._
- object WordCount {
- def main(args: Array[String]): Unit = {
- //第1步:建立执行环境
- val env = ExecutionEnvironment.getExecutionEnvironment
- //第2步:创建数据源
- val text = env.fromElements(
- "hello, world!",
- "hello, world!",
- "hello, world!")
- //第3步:对数据集指定转换操作
- val counts = text.flatMap { _.toLowerCase.split(" ") }
- .map { (_, 1) }
- .groupBy(0)
- .sum(1)
- // 第4步:输出结果
- counts.print()
- }
- }
- <project>
- <groupId>dblab</groupId>
- <artifactId>WordCount</artifactId>
- <modelVersion>4.0.0</modelVersion>
- <name>WordCount</name>
- <packaging>jar</packaging>
- <version>1.0</version>
- <repositories>
- <repository>
- <id>alimaven</id>
- <name>aliyun maven</name>
- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
- </repository>
- </repositories>
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.12</artifactId>
- <version>1.11.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_2.12</artifactId>
- <version>1.11.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>1.11.2</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.4.6</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
- cd ~/flinkapp
- /usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.WordCount ./target/WordCount-1.0.jar
- sudo vim /etc/hostname
- sudo vim /etc/hosts
192.168.1.101 Master
192.168.1.102 Slave1
192.168.1.103 Slave2
- ping Master -c 3 #只ping 3次就会停止,否则要按Ctrl+c中断ping命令
- ping Slave1 -c 3
- cd /usr/lib
- sudo mkdir jvm #创建/usr/lib/jvm目录用来存放JDK文件
- cd ~ #进入hadoop用户的主目录
- cd Downloads
- sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm
- vim ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
- source ~/.bashrc
- java -version
- sudo apt-get install openssh-server
- ssh localhost
- exit
- cd ~/.ssh # 如果没有该目录,先执行一次ssh localhost
- rm ./id_rsa* # 删除之前生成的公匙(如果已经存在)
- ssh-keygen -t rsa # 执行该命令后,遇到提示信息,一直按回车就可以
- cat ./id_rsa.pub >> ./authorized_keys
- scp ~/.ssh/id_rsa.pub hadoop@Slave1:/home/hadoop/
- scp ~/.ssh/id_rsa.pub hadoop@Slave2:/home/hadoop/
- mkdir ~/.ssh # 如果不存在该文件夹需先创建,若已存在,则忽略本命令
- cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
- rm ~/id_rsa.pub # 用完以后就可以删掉
- ssh Slave1
- ssh Slave2
- sudo tar -zxf ~/Downloads/flink-1.11.2-bin-scala_2.12.tgz -C /usr/local/
- cd /usr/local
- sudo mv ./flink-1.11.2 ./flink
- sudo chown -R hadoop:hadoop ./flink # hadoop是当前登录Linux系统的用户名
- vim ~/.bashrc
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
- source ~/.bashrc
- cd /usr/local/flink/conf
- vim flink-conf.yaml
jobmanager.rpc.address: Master
taskmanager.tmp.dirs: /usr/local/flink/tmp
- cd /usr/local/flink/conf
- vim masters
Master:8081
- cd /usr/local/flink/conf
- vim workers
Master
Slave1
Slave2
- cd /usr/local/
- tar -zcf ~/flink.master.tar.gz ./flink
- cd ~
- scp ./flink.master.tar.gz Slave1:/home/hadoop
- scp ./flink.master.tar.gz Slave2:/home/hadoop
- sudo rm -rf /usr/local/flink/
- sudo tar -zxf ~/flink.master.tar.gz -C /usr/local
- sudo chown -R hadoop /usr/local/flink
- cd /usr/local/flink
- sudo mkdir tmp
- sudo chmod -R 755 ./tmp
- cd /usr/local/flink/
- ./bin/start-cluster.sh
- jps
- cd /usr/local/flink/bin
- ./flink run /usr/local/flink/examples/batch/WordCount.jar
- cd /usr/local/flink
- ./bin/stop-cluster.sh