Loading...
墨滴

程序员大魔王

2021/06/18  阅读:95  主题:锤子便签主题第2版

[Flink 实践篇-1] Flink 1.12.1 实现 WordCount 入门案例

[Flink 实践篇-1] Flink 1.12.1 实现 WordCount 入门案例

什么是 WordCount ?

wordcount 简单来讲就是单词计数,是一般大数据计算框架(Hadoop、Spark、Flink)的入门学习案例,相当于编程语言(Java、Python)中的 HelloWorld 案例,适合刚开始了解 Flink 作业提交流程的同学。

环境要求
  1. JDK 1.8 (必须)
~  $ java -version
java version "1.8.0_291"
Java(TM) SE Runtime Environment (build 1.8.0_291-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.291-b10, mixed mode)
  1. Flink 1.12.1(必须)
  • 采用 tar 包安装的方式,下载地址如下:

https://archive.apache.org/dist/flink/flink-1.12.1/

下载 Flink 安装包
下载 Flink 安装包
  • 解压安装包
~/flink-dev  $ tar -zxvf flink-1.12.1-bin-scala_2.11.tgz
~/flink-dev  $ ls -l
total 655424
drwxr-xr-x@ 13 it  staff        416  1 10 08:46 flink-1.12.1
-rw-r--r--@  1 it  staff  334271560  6 18 00:18 flink-1.12.1-bin-scala_2.11.tgz
  • 查看目录结构
Flink 目录结构
Flink 目录结构
  • 配置环境变量
~/flink-dev/flink-1.12.1  $ pwd
/xxx/flink-dev/flink-1.12.1
~/flink-dev/flink-1.12.1  $ vi ~/.zshrc
# 将下面两行添加到 上述文件中
# 因为我这里用的是 zsh ,根据自己的需要选择文件
# mac 默认是 .bash_profile 
export FLINK_HOME=/xxx/flink-dev/flink-1.12.1
export PATH=$PATH:$FLINK_HOME/bin
# 保存之后执行以下命令
source ~/.zshrc
  • 验证版本
~/flink-dev/flink-1.12.1  $ flink --version
Version: 1.12.1, Commit ID: dc404e2

如上显示,则表示安装成功。

启动集群

学过 一些 Java 的用户应该了解,启动集群脚本一般在 bin 目录下

bin 目录下的脚本
bin 目录下的脚本
  • 执行启动集群命令
~/flink-dev/flink-1.12.1/bin  $ ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host MacBook-Pro.lan.
Starting taskexecutor daemon on host MacBook-Pro.lan.

-- 访问 Flink webUI 界面 (localhost:8081)

如果输入地址后可以看到以下页面,表示集群启动成功。 输入地址

web UI
web UI
编写 wordcount 项目

Flink 本地启动了一个集群,接下来就是提交我们的任务,任务使用 Java 语言调用 Flink 的 api 来编写。

  • 创建项目 New Project
  • 将以下配置复制到 pom.xml 文件
<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.12.1</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.15</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
  • coding

这里我们稍微改造一下,我们用 netcat 往某个端口中传输数据,这样就可以模拟源源不断的数据流,让 wordcount 程序监听这个端口并进行单词计数,并且让最终的结果输出到 log 中。代码如下:

package com.bruce.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WordCount {

    static Logger logger = LoggerFactory.getLogger(WordCount.class);


    /**
     * String --> Tuple2<String, Integer>
     *
     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
     * form of "(word,1)" ({@code Tuple2<String, Integer>}).
     */

    public static final class Tokenizer
            implements FlatMapFunction<StringTuple2<StringInteger>> 
{

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        //参数解析
        if (args.length != 2) {
            logger.error("Usage: \n");
            logger.error("Please input host and port.");
            return;
        }
        String host = args[0];
        int port = Integer.parseInt(args[1]);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // source
        DataStream<String> source = env.addSource(new SocketTextStreamFunction(host, port, "\n"0)).name("Source");

        // transform
        DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                source.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(value -> value.f0)
                        .sum(1).name("Transform");

        // sink = log
        // 这里为了方便展示效果,将结果直接输出到 log
        counts.addSink(new WordCountSink()).name("Sink");

        // execute program
        env.execute("WordCount from socket by bruce.");
    }
}

改造的 Sink 代码:

package com.bruce.wordcount;


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WordCountSink implements SinkFunction<Tuple2<StringInteger>> {
    private static final long serialVersionUID = 1L;
    private static final Logger logger = LoggerFactory.getLogger(WordCountSink.class);

    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        logger.info("{ Word: \""+ value.f0 + "\", Cnt:" + value.f1 +"}");
    }
}

  • 打包

进入项目目录,使用 mvn 命令打包

mvn clean package -Dmaven.test.skip=true
打包
打包
  • 开启监听端口

提交任务前需要先开启监听端口,否则会报链接失败的错误,再开一个终端执行以下命令:

nc -l 10000

注意,这里窗口会阻塞

  • 提交任务

进入如下目录

~/flink-dev/flink-1.12.1/bin

执行提交命令 (jar 包路径最好用绝对路径)

flink run -c com.wordcount.WordCount /xxx/IdeaProjects/FlinkPractice/target/FlinkPractice-1.0-SNAPSHOT.jar localhost 10000
  • 查看任务

实时流处理任务肯定是一直在 running 的,因为需要处理源源不断的数据。 任务提交成功

  • 输入数据

在阻塞窗口中输入一些单词,回车就会被发送出去。 输入数据

  • 查看任务日志

Task Manager 里面查看日志,如图则表示统计成功。 查看日志

附录:完整代码已经放在 github

https://github.com/BruceWong96/Flink-Guide

程序员大魔王

2021/06/18  阅读:95  主题:锤子便签主题第2版

作者介绍

程序员大魔王

公众号:程序员大魔王