在Docker跑通Flink分布式版本的WordCount

前言

前文我们介绍了,使用Docker快速部署Flink分布式集群,这一把我们研究一下怎么自己撸一个WordCount上去跑起来。

官网例子的问题

大家发现我的风格或多或少是因为引导大家怎么去入门到熟悉的过程,所以我希望传递给大家一些学习的办法。我是比较大家直接看官网源码的例子的,而且官网刻意给出来本身也就是这个作用,WordCount非常重要,这是一个麻雀虽小五脏俱全的例子,这个例子熟悉了,其实就是入门了。熟悉的第一步特征就是可以试着自己跑起来,第二步就是去改写了,前面我们已经运行过了,这一把我们试着去改一改。

是这样,官网的例子呢,默认是帮你跑批模式,里面是做了一个WordCountData静态类作为批的输入,然后就跑完就结束了。实时程序码,我们还是希望源源不断的输入来一个计算一个嘛,那才有感觉。第二点就是里面倒是有输入一个,计算一个的例子,但是这个例子是需要基于Socket通信的输入,就是其他额外的动作比较多,在容器中操作起来会麻烦一些,也是搞得很容易放弃的,比较难受。

所以我们基于这种程序做一些改造,我们希望程序自己去生成数据,源源不断产生就行,然后后面可以简单的帮我做计算,这样给我们一种丝滑体验。

flink工程构建

命令行构建maven工程

 mvn --settings /Users/zhuxuemin/.m2/settings_aliyun.xml archetype:generate -DgroupId=com.blog.flink -DartifactId=docker-app-flink -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

导入idea,然后改吧改吧,测试代码什么的直接删除。注意仓库调整好,还是用阿里巴巴的就行

 alimaven aliyun maven http://maven.aliyun.com/nexus/content/repositories/central/ central

我们来开始定义pom.xml,这里注意了,我们里面用啥版本,和我们容器上一致就行,低版本虽然也是可以兼容的,但是代码风格有差别的,直接干到最新就行,学习嘛。去哪里可以抄呢,还记得我们的命令行上面其实是带的。

那java版本是多少呢,自然是看文档啦,官网文档在这里

我们一看很清楚的

Java8目前是标注了过时了, 是建议搞到11

Java11呢,明显字那么多,就是当前支持的啦

Java17呢,实验性支持。

所以有的老铁说那我直接干到最新行不行,现在JDK都干到了21去了,太新太旧肯定都不合适的,官网都有的

使是所以我们的版本就定了基调了,

  11 11 1.18.1 2.12 

那引入哪些包呢,这里其实去源码里面抄就行的,就是对应WordCount里面就有,不过记得把源码搞到对应的版本。

怎么操作呢,从github上面拉取源码之后一般是master分支,然后我们找到远程的大版本分支就行

 git branch -a
 
 remotes/origin/HEAD -> origin/master
 remotes/origin/blink
 ...这里好多分支
 remotes/origin/release-1.16
 remotes/origin/release-1.17
 remotes/origin/release-1.18. --这个就是我们要的

找到1.18分支,然后

git checkout release-1.18

然后找到对应的文件,就可以开始抄了

这里是要啥补啥,不断测试的时候就删减

   org.apache.flink flink-java ${flink.version} provided    org.apache.flink flink-streaming-java ${flink.version} provided   org.apache.flink flink-clients ${flink.version} provided   org.apache.flink flink-connector-datagen ${flink.version}  

WordCount编写

其实整个代码就是需要改造输入源,因为我不需要做输入来源的选择,所以对参数判断的逻辑就干掉了。

总共三个类,第一个是WordSource,我准备了一些句子,算是做数据准备

public class WordSource { static String[]  messages=new String[]{ "How are you?",
            "Nice to meet you.",
            "How is it going?",
            "How's everything with you?",
            "Hi!Are you having fun",
            "Nice to see you again.",
            "How have you been?"
    };
    public static int length(){ return messages.length;
    }
}

第二个就是分词器,就是把上面的每一句话打散成一个个单词,从源码里直接Copy:

public  final class Tokenizer
        implements FlatMapFunction> { @Override
    public void flatMap(String value, Collector> out) { // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");
        // emit the pairs
        for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1));
            }
        }
    }
}

第三个类是主类,首先我们改写了源码中的数据源,通过DataGeneratorSource自动生成的方式,随机从我们WordSource里面的消息获取句子,每次取一句,源头就构建好了

第二步就是计算,WordCount的核心逻辑了

后面就run起来,没啥逻辑了,改写之后的代码比较清爽,是不是感觉更加适合入门了。

public class WordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //自定义发射数据源
        DataGeneratorSource wordSource=new DataGeneratorSource(new GeneratorFunction() { Random random=new Random();
            public String map(Long aLong) throws Exception { return WordSource.messages[random.nextInt(WordSource.length())];
            }
        },Long.MAX_VALUE, RateLimiterStrategy.perSecond(10), Types.STRING);
        //计算
        env.fromSource(wordSource, WatermarkStrategy.noWatermarks(),"wordMessageSource")
        .flatMap(new Tokenizer()).name("tokenizer")
                .keyBy((KeySelector, String>) value -> value.f0)
                .sum(1).name("counter")
                .print().name("print-sink");
        env.execute();
    }
}

这种程序是本机就可以运行的,先确保本机的输出符合预期:

到Docker里面的Flink集群跑起来

注意了,发布之前其实需要打包的,部署失败的时候其实是有日志提示的,按照下面这个方式去找到错误信息。

这种错误是不是感觉特亲切。

Caused by: java.lang.NoClassDefFoundError: org/apache/flink/connector/datagen/source/GeneratorFunction
	at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
	at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
	at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
	at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
	at java.lang.Class.getMethod(Unknown Source) ~[?:?]
	at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:307) ~[flink-dist-1.18.1.jar:1.18.1]
	... 54 more

那如何打包呢,当然直接百度一下没问题的,当然我还是会建议直接去源码中去拿,源码中其实是一个程序里面包含了很多个示例程序,相当于支持了一个应用工程中多个打包程序的定制输出,这其实是直接在我们实际开发中也是碰到的。

这样子的话,我们有个参考,定义一份属于我们的打包文件,我把关键部分放出来,附上我的注释,这样子方便大家对着改

  org.apache.maven.plugins maven-shade-plugin 3.2.4   WordCount  package  shade     false  WordCount    org.apache.flink:flink-connector-datagen  org/apache/flink/connector/datagen/source/**     

打包:运行

-DskipTests=true package

那个最丰满的,名字又是那么显眼的,一下子就认出来了。

接下来我们部署上去,首先还是从flink的UI上面部署,

上传之后就看到了一直运行的job了。

以命令行方式提交

这里先给一个小妙招,我们在Docker里面运行的时候需要每次找到对应的容器ID再执行代码比较麻烦,我们可以通过一些查询条件去获取到jobmanager对应的容器ID,再执行里面的flink命令,那么就变成下面这样子,比如说我要知道flink运行了哪些作业,执行下面的命令:

 docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink list

可以看到我们刚才提交的命令

Waiting for response...
------------------ Running/Restarting Jobs -------------------
03.03.2024 07:50:15 : e356b3168e95ec2716c0830380f6c57b : Flink Streaming Job (RUNNING)
--------------------------------------------------------------

由于我们对jobmanager的容器id下面会重复使用,而且比较长,我们定义一个变量代替,这样就方便使用

FLINK_JOBMANAGER=$(docker ps --filter name=jobmanager --format={{.ID}})

可以输出这个结果:

echo $FLINK_JOBMANAGER
cd422928babc

接下来,我们先拆解动作,我们首先把宿主机器上面的jar包复制到我们jobmanager容器中:

 docker cp /Users/zhuxuemin/docker-app-flink/target/WordCount.jar $FLINK_JOBMANAGER:/opt/WordCount.jar

可以验证查看

docker exec $FLINK_JOBMANAGER ls /opt
flink
java
WordCount.jar

下一步,我们可以在容器内部实施提交了,还比较丝滑

docker exec -it $FLINK_JOBMANAGER flink \
 run -c com.blog.flink.WordCount \
 /opt/WordCount.jar

其实这样就可以了。