林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码
第4章 Flink环境搭建和使用方法
cd /home/hadoop
sudo tar -zxvf ~/Downloads/flink-1.11.2-bin-scala_2.12.tgz -C /usr/local/
cd /usr/local
sudo mv ./flink-1.11.2 ./flink
sudo chown -R hadoop:hadoop ./flink # hadoop是当前登录Linux系统的用户名
vim ~/.bashrc
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
source ~/.bashrc
cd /usr/local/flink
./bin/start-cluster.sh
jps
cd /usr/local/flink/bin
./flink run /usr/local/flink/examples/batch/WordCount.jar
cd /usr/local/hadoop
./sbin/start-dfs.sh
jps
./sbin/stop-dfs.sh
cd /usr/local/flink
./bin/start-scala-shell.sh local
scala> 8*2+5
scala>:quit
https://downloads.apache.org/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zip
sudo unzip ~/Downloads/apache-maven-3.6.3-bin.zip -d /usr/local
cd /usr/local
sudo mv apache-maven-3.6.3/ ./maven
sudo chown -R hadoop ./maven
cd /usr/local/maven/conf
vim settings.xml
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
http://maven.apache.org/xsd/settings-1.0.0.xsd">
<mirrors>
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云谷歌仓库</name>
<url>https://maven.aliyun.com/repository/google</url>
</mirror>
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云阿帕奇仓库</name>
<url>https://maven.aliyun.com/repository/apache-snapshots</url>
</mirror>
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云spring仓库</name>
<url>https://maven.aliyun.com/repository/spring</url>
</mirror>
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云spring插件仓库</name>
<url>https://maven.aliyun.com/repository/spring-plugin</url>
</mirror>
</mirrors>
</settings>
cd ~ #进入用户主目录
mkdir -p ./flinkapp/src/main/scala
package cn.edu.xmu.dblab
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]): Unit = {
//第1步:建立执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//第2步:创建数据源
val text = env.fromElements(
"hello, world!",
"hello, world!",
"hello, world!")
//第3步:对数据集指定转换操作
val counts = text.flatMap { _.toLowerCase.split(" ") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
// 第4步:输出结果
counts.print()
}
}
<project>
<groupId>cn.edu.xmu.dblab</groupId>
<artifactId>wordcount</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>WordCount</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
cd ~/flinkapp
find .
cd ~/flinkapp #一定把这个目录设置为当前目录
/usr/local/maven/bin/mvn package
cd ~/flinkapp
/usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.WordCount ./target/wordcount-1.0.jar
cd ~ #进入用户主目录
mkdir -p ./flinkapp2/src/main/scala
package cn.edu.xmu.dblab
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWordCount{
def main(args: Array[String]): Unit = {
//第1步:建立执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//第2步:创建数据源
val source = env.socketTextStream("localhost",9999,'\n')
//第3步:对数据集指定转换操作逻辑
val dataStream = source.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
.timeWindow(Time.seconds(2),Time.seconds(2))
.sum(1)
//第4步:指定计算结果输出位置
dataStream.print()
//第5步:指定名称并触发流计算
env.execute("Flink Streaming Word Count")
}
}
cd ~/flinkapp2
find .
cd ~/flinkapp2 #一定把这个目录设置为当前目录
/usr/local/maven/bin/mvn package
nc -lk 9999
cd ~/flinkapp2
/usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.StreamWordCount ./target/wordcount-1.0.jar
cd /usr/local/flink/log
tail -f flink*.out
cd ~ #进入用户主目录
sudo tar -zxvf /home/hadoop/download/ideaIC-2020.2.3.tar.gz -C /usr/local #解压文件
cd /usr/local
sudo mv ./idea-IU-202.7660.26 ./idea #重命名,方便操作
sudo chown -R hadoop ./idea #为当前Linux用户hadoop赋予针对idea目录的权限
cd /usr/local/idea
./bin/idea.sh
package cn.edu.xmu.dblab
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]): Unit = {
//第1步:建立执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//第2步:创建数据源
val text = env.fromElements(
"hello, world!",
"hello, world!",
"hello, world!")
//第3步:对数据集指定转换操作
val counts = text.flatMap { _.toLowerCase.split(" ") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
// 第4步:输出结果
counts.print()
}
}
<project>
<groupId>dblab</groupId>
<artifactId>WordCount</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>WordCount</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
cd ~/flinkapp
/usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.WordCount ./target/WordCount-1.0.jar
sudo vim /etc/hostname
sudo vim /etc/hosts
192.168.1.101 Master
192.168.1.102 Slave1
192.168.1.103 Slave2
ping Master -c 3 #只ping 3次就会停止,否则要按Ctrl+c中断ping命令
ping Slave1 -c 3
cd /usr/lib
sudo mkdir jvm #创建/usr/lib/jvm目录用来存放JDK文件
cd ~ #进入hadoop用户的主目录
cd Downloads
sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm
vim ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
source ~/.bashrc
java -version
sudo apt-get install openssh-server
ssh localhost
exit
cd ~/.ssh # 如果没有该目录,先执行一次ssh localhost
rm ./id_rsa* # 删除之前生成的公匙(如果已经存在)
ssh-keygen -t rsa # 执行该命令后,遇到提示信息,一直按回车就可以
cat ./id_rsa.pub >> ./authorized_keys
scp ~/.ssh/id_rsa.pub hadoop@Slave1:/home/hadoop/
scp ~/.ssh/id_rsa.pub hadoop@Slave2:/home/hadoop/
mkdir ~/.ssh # 如果不存在该文件夹需先创建,若已存在,则忽略本命令
cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
rm ~/id_rsa.pub # 用完以后就可以删掉
ssh Slave1
ssh Slave2
sudo tar -zxf ~/Downloads/flink-1.11.2-bin-scala_2.12.tgz -C /usr/local/
cd /usr/local
sudo mv ./flink-1.11.2 ./flink
sudo chown -R hadoop:hadoop ./flink # hadoop是当前登录Linux系统的用户名
vim ~/.bashrc
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
source ~/.bashrc
cd /usr/local/flink/conf
vim flink-conf.yaml
jobmanager.rpc.address: Master
taskmanager.tmp.dirs: /usr/local/flink/tmp
cd /usr/local/flink/conf
vim masters
Master:8081
cd /usr/local/flink/conf
vim workers
Master
Slave1
Slave2
cd /usr/local/
tar -zcf ~/flink.master.tar.gz ./flink
cd ~
scp ./flink.master.tar.gz Slave1:/home/hadoop
scp ./flink.master.tar.gz Slave2:/home/hadoop
sudo rm -rf /usr/local/flink/
sudo tar -zxf ~/flink.master.tar.gz -C /usr/local
sudo chown -R hadoop /usr/local/flink
cd /usr/local/flink
sudo mkdir tmp
sudo chmod -R 755 ./tmp
cd /usr/local/flink/
./bin/start-cluster.sh
jps
cd /usr/local/flink/bin
./flink run /usr/local/flink/examples/batch/WordCount.jar
cd /usr/local/flink
./bin/stop-cluster.sh