¶先决条件
在部署Flink之前,请确认集群的每个节点都符合以下条件:
- 已安装Java 1.8.x或以上版本(推荐1.8版本)
- 节点两两之间可以SSH免密码登录
- 已部署Hadoop(如果只是部署Standalone Cluster则不需要Hadoop)
如果你已经按照Hadoop集群搭建教程成功建立了Hadoop集群,那么以上条件均已满足。
¶下载Flink二进制文件
在Flink的下载页面中有多个版本可以选择,因为之前选择了Hadoop 2.7.7版本,所以这里选择与之对应的 Apache Flink 1.9.2 for Scala 2.12版本,Scala版本选择最新的2.12。
1 | cd /opt |
¶配置Flink
注意:如果只需要部署Flink on YARN,那么可以跳过这小节,因为YARN会帮你打理好一切。
1 | cd /opt/flink-1.9.2/conf |
¶flink-conf.yaml
将jobmanager.rpc.address
指向master节点,其它配置可以按照机器实际硬件情况填写,此处使用默认值。
1 | # The host/IP of JobManager |
¶slaves
向slaves文件写入slave节点的host/IP地址
1 | huawei-02 |
¶将配置好的Flink分发到其它节点
1 | scp -r /opt/flink-1.9.2 huawei-02:/opt/flink-1.9.2 |
¶以Standalone模式启动Flink
1 | cd $FLINK_HOME |
然后可以在huawei-01:8081
查看Flink集群的运行情况
./examples
路径下游许多打包好的实例程序,可以用于验证Flink集群是否正常运行。
1 | ./bin/flink run ./exmaple/batch/WordCount.jar |
上面的命令会向 Flink 集群提交一个 wordcount 任务,这个示例程序可以指定输入和输出路径,这里没有指定,因此输入文件为程序自带的一小段文本,结果直接输出在屏幕上。 如果 Flink 集群工作正常,应该会在屏幕上输出以下结果:
1 | 省略前面的输出 |
¶以Flink on YARN模式启动
把 Flink 运行在 YARN 上有两种方式,第一种方式是建立一个长期运行的 Flink YARN Session,然后向这个 Session 提交 Flink Job,多个任务同时运行时会共享资源。第二种方式是为单个任务启动一个 Flink 集群,这个任务会独占 Flink 集群的所有资源,任务结束即代表集群被回收。
另外,Flink on YARN 模式需要系统中设置了 YARN_CONF_DIR 或 HADOOP_CONF_DIR 环境变量,如果未设置,请在 ~/.profile
中加入以下内容,然后使用 source ~/.profile
命令使修改立即生效。
1 | 在这条命令前定义HADOOP_HOME环境变量 |
¶Flink YARN Session
由于新版本的Flink已经不包含Hadoop依赖项,需要自己添加依赖。首先下载Pre-bundled Hadoop 2.7.5 包。
1 | cd cd $FLINK_HOME/lib |
使用下列命令来启动一个拥有 2 个 TaskManager 的 Flink 集群,每个 TaskManager 有 2 GB 内存,2 个 slot。
1 | ./bin/YARN-session.sh -n 2 -jm 1024m -tm 2048 |
完整的参数列表如下:
1 | Usage: |
启动 YARN Session 以后会输出 JobManager 的 Web Interface 地址,打开以后是这样的:
仔细一看,Task Managers,Task Slots 怎么都是 0 呢?难道是哪里出了问题?其实并没有问题,从某个版本开始 Flink 允许动态分配资源,在没有任务的时候不分配 TaskManager。接下来我们就提交一个任务试试。
因为启动 YARN Session 以后 Flink Client 会一直在前台运行,所以先用 Ctrl + Z
快捷键把 Client 转到后台,然后再提交任务。
1 | ./bin/flink run ./examples/batch/WordCount.jar |
在任务运行期间观察 Web Interface,会发现 Task Managers 变为 1,Task Slots 变为 2 ,与启动集群时指定的参数不符,这是因为 YARN 集群中只有两个 NodeManager,huawei-02 和 huawei-03,其中一个作为 JobManager,因此只剩一个节点可以作为 TaskManager。
任务的运行结果和 Standalone 模式下完全一样。
¶Single Flink job on YARN
下面这条命令会为 wordcount 任务启动一个独占的 Flink 集群,任务结束集群即被回收。其中 -m 选项指定 Flink 集群的启动模式,-yn 选项指定 TaskManager 的数目。
1 | ./bin/flink run -m YARN-cluster -yn 2 ./examples/batch/WordCount.jar |
任务的运行结果和 Standalone 模式下完全一样。