代码-第4章 Flink环境搭建和使用方法-林子雨编著《Flink编程基础(Java版)》

大数据学习路线图

厦门大学林子雨编著《Flink编程基础(Java版)》教材中的命令行和代码(教材官网
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Flink编程基础(Java版)》教材中的所有命令行和代码

cd  /home/hadoop
sudo  tar  -zxvf  ~/Downloads/flink-1.17.0-bin-scala_2.12.tgz  -C  /usr/local/
cd  /usr/local
sudo  mv  ./flink-1.17.0  ./flink
sudo  chown  -R  hadoop:hadoop  ./flink  # hadoop是当前登录Linux系统的用户名
vim ~/.bashrc
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
source ~/.bashrc
cd /usr/local/flink/conf
vim log4j.properties
rootLogger.level = INFO,console
cd /usr/local/flink
./bin/start-cluster.sh
cd /usr/local/flink
./bin/flink run examples/streaming/WordCount.jar
tail log/flink-*-taskexecutor-*.out
cd /usr/local/flink
./bin/stop-cluster.sh
cd ~  #进入用户主目录
sudo tar -zxvf /home/hadoop/download/ideaIC-2023.1.2.tar.gz -C /usr/local  #解压文件
cd /usr/local
sudo mv ./idea-IC-231.9011.34 ./idea   #重命名,方便操作
sudo chown -R hadoop ./idea   #为当前Linux用户hadoop赋予针对idea目录的权限
cd /usr/local/idea
./bin/idea.sh

WordCountData.java用于提供原始数据,其内容如下:

package cn.edu.xmu;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WordCountData {
    public static final String[] WORDS=new String[]{"To be, or not to be,--that is the question:--", "Whether \'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--\'tis a consummation", "Devoutly to be wish\'d. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there\'s the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there\'s the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor\'s wrong, the proud man\'s contumely,", "The pangs of despis\'d love, the law\'s delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover\'d country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o\'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember\'d."};
    public WordCountData() {
    }
    public static DataStream<String> getDefaultTextLineDataStream(StreamExecutionEnvironment env){
        return env.fromElements(WORDS);
    }
}

WordCountTokenizer.java用于切分句子,其内容如下:

package cn.edu.xmu;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
    public WordCountTokenizer(){}
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.toLowerCase().split("\\W+");
        int len = tokens.length;
        for(int i = 0; i<len;i++){
            String tmp = tokens[i];
            if(tmp.length()>0){
                out.collect(new Tuple2<String, Integer>(tmp,Integer.valueOf(1)));
            }
        }
    }
}

WordCount.java提供主函数,其内容如下:

package cn.edu.xmu;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WordCount {
    public WordCount(){}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        Object text;
        text = WordCountData.getDefaultTextLineDataStream(env);
        DataStream<Tuple2<String, Integer>> counts = ((DataStream<String>)text).flatMap(new WordCountTokenizer())
                .keyBy(0)
                .sum(1);
        counts.print();
        env.execute();
    }
}

把项目下的pom.xml文件内容清空,输入如下内容:

<project>
  <groupId>dblab</groupId>
  <artifactId>wordcount</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>WordCount</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <repositories>
    <repository>
      <id>alimaven</id>
      <name>aliyun maven</name>
      <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
  </repositories>
  <properties>
    <flink.version>1.17.0</flink.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>
cd /usr/local/flink
./bin/flink run \
> --class cn.edu.xmu.WordCount \
> ~/IdeaProjects/WordCount/target/wordcount-1.0.jar
sudo vim /etc/hosts
192.168.91.128   hadoop01
192.168.91.129   hadoop02
192.168.91.130   hadoop03
ping hadoop01 -c 3   #只ping 3次就会停止,否则要按Ctrl+c中断ping命令
ping hadoop02 -c 3
ping hadoop03 -c 3
sudo  tar  -zxf  ~/Downloads/flink-1.17.0-bin-scala_2.12.tgz  -C  /usr/local/
cd  /usr/local
sudo  mv  ./flink-1.17.0  ./flink
sudo  chown  -R  hadoop:hadoop  ./flink  # hadoop是当前登录Linux系统的用户名
vim  ~/.bashrc
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
source  ~/.bashrc
cd /usr/local/flink/conf
vim log4j.properties
rootLogger.level = ERROR,console
cd  /usr/local/flink/conf
vim flink-conf.yaml
jobmanager.rpc.address: hadoop01
jobmanager.bind-host: 0.0.0.0
taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop01
rest.address: hadoop01
rest.bind-address: 0.0.0.0
cd  /usr/local/flink/conf
vim masters
hadoop01:8081
cd  /usr/local/flink/conf
vim workers
cd  /usr/local/
tar  -zcf  ~/flink.master.tar.gz  ./flink
cd  ~
scp  ./flink.master.tar.gz  hadoop02:/home/hadoop
scp  ./flink.master.tar.gz  hadoop03:/home/hadoop
sudo  rm  -rf  /usr/local/flink/
sudo  tar  -zxf  ~/flink.master.tar.gz  -C  /usr/local
sudo  chown  -R  hadoop  /usr/local/flink
taskmanager.host: hadoop02
taskmanager.host: hadoop03
cd  /usr/local/flink/
./bin/start-cluster.sh
cd /usr/local/flink
./bin/flink run /usr/local/flink/examples/batch/WordCount.jar
cd /usr/local/flink/bin
./flink run \
>  --jobmanager hadoop01:8081 \
>  --class cn.edu.xmu.WordCount \
>  ~/Downloads/wordcount-1.0.jar
cd /usr/local/flink
./bin/stop-cluster.sh
cd /usr/local/flink
./bin/standalone-job.sh start --job-classname cn.edu.xmu.WordCount
cd /usr/local/flink
./bin/taskmanager.sh start
cd /usr/local/flink
./bin/standalone-job.sh stop
cd /usr/local/flink
./bin/taskmanager.sh stop
vim ~/.bashrc
export HADOOP_HOME=/usr/local/hadoop
export PATH=$PATH:/usr/local/hadoop/bin:/usr/local/hadoop/sbin
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
source ~/.bashrc
cd /usr/local/hadoop
./sbin/start-dfs.sh
./sbin/start-yarn.sh
cd /usr/local/hadoop
./sbin/stop-dfs.sh
./sbin/stop-yarn.sh
cd /usr/local/hadoop
./sbin/start-dfs.sh
./sbin/start-yarn.sh
cd /usr/local/flink
./bin/yarn-session.sh --help
cd /usr/local/flink
./bin/yarn-session.sh
cd /usr/local/flink
./bin/yarn-session.sh -d
cd /usr/local/flink
./bin/flink run --class cn.edu.xmu.WordCount ~/Downloads/wordcount-1.0.jar
yarn application -kill application_1691735595654_0004
cd /usr/local/hadoop
./sbin/start-dfs.sh
./sbin/start-yarn.sh
cd /usr/local/flink
./bin/flink run -t yarn-per-job -c cn.edu.xmu.WordCount ~/Downloads/wordcount-1.0.jar
classloader.check-leaked-classloader: false
cd /usr/local/hadoop
./sbin/start-dfs.sh
./sbin/start-yarn.sh
cd /usr/local/flink
./bin/flink run-application \
> -t yarn-application \
> -c cn.edu.xmu.WordCount \
> ~/Downloads/wordcount-1.0.jar
cd /usr/local/hadoop
./sbin/start-dfs.sh
./sbin/start-yarn.sh
cd /usr/local/hadoop
./bin/hdfs dfs -mkdir -p /logs/flink-job
jobmanager.archive.fs.dir: hdfs://hadoop01:9000/logs/flink-job
historyserver.web.address: hadoop01
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://hadoop01:9000/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000
cd /usr/local/flink
./bin/historyserver.sh start
cd /usr/local/flink
./bin/flink run-application \
> -t yarn-application \
> -c cn.edu.xmu.WordCount \
> ~/Downloads/wordcount-1.0.jar
cd /usr/local/flink
./bin/historyserver.sh stop