第4章代码-林子雨编著-《Flink编程基础(Scala版)》

大数据学习路线图

林子雨、陶继平编著《Flink编程基础(Scala版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码

第4章 Flink环境搭建和使用方法

  1. cd /home/hadoop
  2. sudo tar -zxvf ~/Downloads/flink-1.11.2-bin-scala_2.12.tgz -C /usr/local/
  3. cd /usr/local
  4. sudo mv ./flink-1.11.2 ./flink
  5. sudo chown -R hadoop:hadoop ./flink # hadoop是当前登录Linux系统的用户名
Shell 命令
  1. vim ~/.bashrc
Shell 命令
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
  1. source ~/.bashrc
Shell 命令
  1. cd /usr/local/flink
  2. ./bin/start-cluster.sh
Shell 命令
  1. jps
Shell 命令
  1. cd /usr/local/flink/bin
  2. ./flink run /usr/local/flink/examples/batch/WordCount.jar
Shell 命令
  1. cd /usr/local/hadoop
  2. ./sbin/start-dfs.sh
Shell 命令
  1. jps
Shell 命令
  1. ./sbin/stop-dfs.sh
Shell 命令
  1. cd /usr/local/flink
  2. ./bin/start-scala-shell.sh local
Shell 命令
  1. scala> 8*2+5
scala
  1. scala>:quit
scala
https://downloads.apache.org/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zip
  1. sudo unzip ~/Downloads/apache-maven-3.6.3-bin.zip -d /usr/local
  2. cd /usr/local
  3. sudo mv apache-maven-3.6.3/ ./maven
  4. sudo chown -R hadoop ./maven
Shell 命令
  1. cd /usr/local/maven/conf
  2. vim settings.xml
Shell 命令
  1. <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
  4. http://maven.apache.org/xsd/settings-1.0.0.xsd">
  5. <mirrors>
  6. <mirror>
  7. <id>aliyunmaven</id>
  8. <mirrorOf>*</mirrorOf>
  9. <name>阿里云公共仓库</name>
  10. <url>https://maven.aliyun.com/repository/public</url>
  11. </mirror>
  12. <mirror>
  13. <id>aliyunmaven</id>
  14. <mirrorOf>*</mirrorOf>
  15. <name>阿里云谷歌仓库</name>
  16. <url>https://maven.aliyun.com/repository/google</url>
  17. </mirror>
  18. <mirror>
  19. <id>aliyunmaven</id>
  20. <mirrorOf>*</mirrorOf>
  21. <name>阿里云阿帕奇仓库</name>
  22. <url>https://maven.aliyun.com/repository/apache-snapshots</url>
  23. </mirror>
  24. <mirror>
  25. <id>aliyunmaven</id>
  26. <mirrorOf>*</mirrorOf>
  27. <name>阿里云spring仓库</name>
  28. <url>https://maven.aliyun.com/repository/spring</url>
  29. </mirror>
  30. <mirror>
  31. <id>aliyunmaven</id>
  32. <mirrorOf>*</mirrorOf>
  33. <name>阿里云spring插件仓库</name>
  34. <url>https://maven.aliyun.com/repository/spring-plugin</url>
  35. </mirror>
  36. </mirrors>
  37. </settings>
XML
  1. cd ~ #进入用户主目录
  2. mkdir -p ./flinkapp/src/main/scala
Shell 命令
  1. package cn.edu.xmu.dblab
  2.  
  3. import org.apache.flink.api.scala._
  4.  
  5. object WordCount {
  6. def main(args: Array[String]): Unit = {
  7.  
  8. //第1步:建立执行环境
  9. val env = ExecutionEnvironment.getExecutionEnvironment
  10.  
  11. //第2步:创建数据源
  12. val text = env.fromElements(
  13. "hello, world!",
  14. "hello, world!",
  15. "hello, world!")
  16.  
  17. //第3步:对数据集指定转换操作
  18. val counts = text.flatMap { _.toLowerCase.split(" ") }
  19. .map { (_, 1) }
  20. .groupBy(0)
  21. .sum(1)
  22.  
  23. // 第4步:输出结果
  24. counts.print()
  25. }
  26. }
scala
  1. <project>
  2. <groupId>cn.edu.xmu.dblab</groupId>
  3. <artifactId>wordcount</artifactId>
  4. <modelVersion>4.0.0</modelVersion>
  5. <name>WordCount</name>
  6. <packaging>jar</packaging>
  7. <version>1.0</version>
  8. <repositories>
  9. <repository>
  10. <id>alimaven</id>
  11. <name>aliyun maven</name>
  12. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  13. </repository>
  14. </repositories>
  15. <dependencies>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-scala_2.12</artifactId>
  19. <version>1.11.2</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-streaming-scala_2.12</artifactId>
  24. <version>1.11.2</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.flink</groupId>
  28. <artifactId>flink-clients_2.12</artifactId>
  29. <version>1.11.2</version>
  30. </dependency>
  31. </dependencies>
  32. <build>
  33. <plugins>
  34. <plugin>
  35. <groupId>net.alchim31.maven</groupId>
  36. <artifactId>scala-maven-plugin</artifactId>
  37. <version>3.4.6</version>
  38. <executions>
  39. <execution>
  40. <goals>
  41. <goal>compile</goal>
  42. </goals>
  43. </execution>
  44. </executions>
  45. </plugin>
  46. <plugin>
  47. <groupId>org.apache.maven.plugins</groupId>
  48. <artifactId>maven-assembly-plugin</artifactId>
  49. <version>3.0.0</version>
  50. <configuration>
  51. <descriptorRefs>
  52. <descriptorRef>jar-with-dependencies</descriptorRef>
  53. </descriptorRefs>
  54. </configuration>
  55. <executions>
  56. <execution>
  57. <id>make-assembly</id>
  58. <phase>package</phase>
  59. <goals>
  60. <goal>single</goal>
  61. </goals>
  62. </execution>
  63. </executions>
  64. </plugin>
  65. </plugins>
  66. </build>
  67. </project>
XML
  1. cd ~/flinkapp
  2. find .
Shell 命令
  1. cd ~/flinkapp #一定把这个目录设置为当前目录
  2. /usr/local/maven/bin/mvn package
Shell 命令
  1. cd ~/flinkapp
  2. /usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.WordCount ./target/wordcount-1.0.jar
Shell 命令
  1. cd ~ #进入用户主目录
  2. mkdir -p ./flinkapp2/src/main/scala
Shell 命令
  1. package cn.edu.xmu.dblab
  2.  
  3. import org.apache.flink.streaming.api.scala._
  4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  5. import org.apache.flink.streaming.api.windowing.time.Time
  6.  
  7. object StreamWordCount{
  8. def main(args: Array[String]): Unit = {
  9.  
  10. //第1步:建立执行环境
  11. val env = StreamExecutionEnvironment.getExecutionEnvironment
  12.  
  13. //第2步:创建数据源
  14. val source = env.socketTextStream("localhost",9999,'\n')
  15.  
  16. //第3步:对数据集指定转换操作逻辑
  17. val dataStream = source.flatMap(_.split(" "))
  18. .map((_,1))
  19. .keyBy(0)
  20. .timeWindow(Time.seconds(2),Time.seconds(2))
  21. .sum(1)
  22.  
  23. //第4步:指定计算结果输出位置
  24. dataStream.print()
  25.  
  26. //第5步:指定名称并触发流计算
  27. env.execute("Flink Streaming Word Count")
  28. }
  29. }
scala
  1. cd ~/flinkapp2
  2. find .
Shell 命令
  1. cd ~/flinkapp2 #一定把这个目录设置为当前目录
  2. /usr/local/maven/bin/mvn package
Shell 命令
  1. nc -lk 9999
Shell 命令
  1. cd ~/flinkapp2
  2. /usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.StreamWordCount ./target/wordcount-1.0.jar
Shell 命令
  1. cd /usr/local/flink/log
  2. tail -f flink*.out
Shell 命令
  1. cd ~ #进入用户主目录
  2. sudo tar -zxvf /home/hadoop/download/ideaIC-2020.2.3.tar.gz -C /usr/local #解压文件
  3. cd /usr/local
  4. sudo mv ./idea-IU-202.7660.26 ./idea #重命名,方便操作
  5. sudo chown -R hadoop ./idea #为当前Linux用户hadoop赋予针对idea目录的权限
Shell 命令
  1. cd /usr/local/idea
  2. ./bin/idea.sh
Shell 命令
  1. package cn.edu.xmu.dblab
  2.  
  3. import org.apache.flink.api.scala._
  4.  
  5. object WordCount {
  6. def main(args: Array[String]): Unit = {
  7.  
  8. //第1步:建立执行环境
  9. val env = ExecutionEnvironment.getExecutionEnvironment
  10.  
  11. //第2步:创建数据源
  12. val text = env.fromElements(
  13. "hello, world!",
  14. "hello, world!",
  15. "hello, world!")
  16.  
  17. //第3步:对数据集指定转换操作
  18. val counts = text.flatMap { _.toLowerCase.split(" ") }
  19. .map { (_, 1) }
  20. .groupBy(0)
  21. .sum(1)
  22.  
  23. // 第4步:输出结果
  24. counts.print()
  25. }
  26. }
scala
  1. <project>
  2. <groupId>dblab</groupId>
  3. <artifactId>WordCount</artifactId>
  4. <modelVersion>4.0.0</modelVersion>
  5. <name>WordCount</name>
  6. <packaging>jar</packaging>
  7. <version>1.0</version>
  8. <repositories>
  9. <repository>
  10. <id>alimaven</id>
  11. <name>aliyun maven</name>
  12. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  13. </repository>
  14. </repositories>
  15. <dependencies>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-scala_2.12</artifactId>
  19. <version>1.11.2</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-streaming-scala_2.12</artifactId>
  24. <version>1.11.2</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.flink</groupId>
  28. <artifactId>flink-clients_2.12</artifactId>
  29. <version>1.11.2</version>
  30. </dependency>
  31. </dependencies>
  32. <build>
  33. <plugins>
  34. <plugin>
  35. <groupId>net.alchim31.maven</groupId>
  36. <artifactId>scala-maven-plugin</artifactId>
  37. <version>3.4.6</version>
  38. <executions>
  39. <execution>
  40. <goals>
  41. <goal>compile</goal>
  42. </goals>
  43. </execution>
  44. </executions>
  45. </plugin>
  46. <plugin>
  47. <groupId>org.apache.maven.plugins</groupId>
  48. <artifactId>maven-assembly-plugin</artifactId>
  49. <version>3.0.0</version>
  50. <configuration>
  51. <descriptorRefs>
  52. <descriptorRef>jar-with-dependencies</descriptorRef>
  53. </descriptorRefs>
  54. </configuration>
  55. <executions>
  56. <execution>
  57. <id>make-assembly</id>
  58. <phase>package</phase>
  59. <goals>
  60. <goal>single</goal>
  61. </goals>
  62. </execution>
  63. </executions>
  64. </plugin>
  65. </plugins>
  66. </build>
  67. </project>
XML
  1. cd ~/flinkapp
  2. /usr/local/flink/bin/flink run --class cn.edu.xmu.dblab.WordCount ./target/WordCount-1.0.jar
Shell 命令
  1. sudo vim /etc/hostname
Shell 命令
  1. sudo vim /etc/hosts
Shell 命令
192.168.1.101   Master
192.168.1.102   Slave1
192.168.1.103   Slave2
  1. ping Master -c 3 #只ping 3次就会停止,否则要按Ctrl+c中断ping命令
  2. ping Slave1 -c 3
Shell 命令
  1. cd /usr/lib
  2. sudo mkdir jvm #创建/usr/lib/jvm目录用来存放JDK文件
Shell 命令
  1. cd ~ #进入hadoop用户的主目录
  2. cd Downloads
  3. sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm
Shell 命令
  1. vim ~/.bashrc
Shell 命令
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
  1. source ~/.bashrc
Shell 命令
  1. java -version
Shell 命令
  1. sudo apt-get install openssh-server
Shell 命令
  1. ssh localhost
Shell 命令
  1. exit
Shell 命令
  1. cd ~/.ssh # 如果没有该目录,先执行一次ssh localhost
  2. rm ./id_rsa* # 删除之前生成的公匙(如果已经存在)
  3. ssh-keygen -t rsa # 执行该命令后,遇到提示信息,一直按回车就可以
  4. cat ./id_rsa.pub >> ./authorized_keys
Shell 命令
  1. scp ~/.ssh/id_rsa.pub hadoop@Slave1:/home/hadoop/
  2. scp ~/.ssh/id_rsa.pub hadoop@Slave2:/home/hadoop/
Shell 命令
  1. mkdir ~/.ssh # 如果不存在该文件夹需先创建,若已存在,则忽略本命令
  2. cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
  3. rm ~/id_rsa.pub # 用完以后就可以删掉
Shell 命令
  1. ssh Slave1
  2. ssh Slave2
Shell 命令
  1. sudo tar -zxf ~/Downloads/flink-1.11.2-bin-scala_2.12.tgz -C /usr/local/
  2. cd /usr/local
  3. sudo mv ./flink-1.11.2 ./flink
  4. sudo chown -R hadoop:hadoop ./flink # hadoop是当前登录Linux系统的用户名
Shell 命令
  1. vim ~/.bashrc
Shell 命令
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
  1. source ~/.bashrc
Shell 命令
  1. cd /usr/local/flink/conf
  2. vim flink-conf.yaml
Shell 命令
jobmanager.rpc.address: Master
taskmanager.tmp.dirs: /usr/local/flink/tmp
  1. cd /usr/local/flink/conf
  2. vim masters
Shell 命令
Master:8081
  1. cd /usr/local/flink/conf
  2. vim workers
Shell 命令
Master
Slave1
Slave2
  1. cd /usr/local/
  2. tar -zcf ~/flink.master.tar.gz ./flink
  3. cd ~
  4. scp ./flink.master.tar.gz Slave1:/home/hadoop
  5. scp ./flink.master.tar.gz Slave2:/home/hadoop
Shell 命令
  1. sudo rm -rf /usr/local/flink/
  2. sudo tar -zxf ~/flink.master.tar.gz -C /usr/local
  3. sudo chown -R hadoop /usr/local/flink
Shell 命令
  1. cd /usr/local/flink
  2. sudo mkdir tmp
  3. sudo chmod -R 755 ./tmp
Shell 命令
  1. cd /usr/local/flink/
  2. ./bin/start-cluster.sh
Shell 命令
  1. jps
Shell 命令
  1. cd /usr/local/flink/bin
  2. ./flink run /usr/local/flink/examples/batch/WordCount.jar
Shell 命令
  1. cd /usr/local/flink
  2. ./bin/stop-cluster.sh
Shell 命令