一、Flink的安装与配置
1.1 Flink的下载
1
| wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
|
1
| tar -zxvf flink-1.17.2-bin-scala_2.12.tgz
|
二、Flink的集群配置
2.1 flink-conf.yaml文件配置
打开Flink/conf/flink-conf.yaml文件
1
| vim ~/software/flink-1.17.2/conf/flink-conf.yaml
|
配置以下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| jobmanager: bind-host: 0.0.0.0 rpc: address: Node01 port: 6123 memory: process: size: 1600m execution: failover-strategy: region
taskmanager: bind-host: Node01 host: 0.0.0.0 numberOfTaskSlots: 1 memory: process: size: 1728m
parallelism: default: 1
rest: address: Node01 bind-address: 0.0.0.0
|
这些参数是 Flink 集群配置文件 conf.yaml 中的一些常见设置,控制着 Flink 集群的各个组件如何互相通信、资源配置、以及任务执行等。以下是对每个参数的详细解释:
jobmanager.rpc.address: Node01
- 作用:指定 JobManager 的主机名或 IP 地址,TaskManager 和其他 Flink 组件需要通过这个地址与 JobManager 进行 RPC 通信。
- 例子:Node01 指的是 JobManager 所在的机器或节点,或者是该节点的 IP 地址。
jobmanager.rpc.port: 6123
- 作用:指定 JobManager 对外暴露的 RPC 服务端口,Flink 集群中的其他节点(TaskManager 或客户端)将通过这个端口与 JobManager 进行通信。
- 默认值:6123,但可以根据需要更改为其他端口。
jobmanager.bind-host: 0.0.0.0
- 作用:指定 JobManager 绑定的网络地址。如果设置为 0.0.0.0,JobManager 将绑定到所有可用的网络接口上,允许来自任何网络接口的连接。
- 用法:通常用于多机部署,确保 JobManager 能监听所有可用的 IP 地址。
jobmanager.memory.process.size: 1600m
- 作用:设置 JobManager 进程的总内存大小。Flink 的 JobManager 负责管理作业的执行和调度,因此它的内存配置非常重要。
- 例子:1600m 指的是分配给 JobManager 的内存为 1600 MB。
taskmanager.bind-host: 0.0.0.0
- 作用:指定 TaskManager 绑定的网络地址。如果设置为 0.0.0.0,TaskManager 将绑定到所有可用的网络接口上。
- 用法:一般用于在多机环境中,确保 TaskManager 可以监听到所有的网络接口。
taskmanager.host: Node01
- 作用:指定 TaskManager 所在的主机名或 IP 地址,其他 Flink 组件可以通过这个地址与 TaskManager 通信。
- 例子:Node01 是 TaskManager 所在的机器的主机名或 IP 地址。
taskmanager.memory.process.size: 1728m
- 作用:指定 TaskManager 进程的总内存大小。TaskManager 是执行任务的节点,内存设置直接影响到作业的执行性能。
- 例子:1728m 指的是为 TaskManager 分配的内存为 1728 MB。
taskmanager.numberOfTaskSlots: 1
- 作用:指定每个 TaskManager 可以提供的任务槽(Task Slot)数量。任务槽是 TaskManager 中执行任务的基本单位,每个任务槽可以执行一个并行任务。
- 用法:如果设置为 1,表示每个 TaskManager 只能运行一个任务。如果需要更多的并行任务,可以增加此值。
parallelism.default: 1
- 作用:设置 Flink 作业的默认并行度。如果作业没有明确指定并行度,将使用这个默认值。
- 例子:1 表示作业默认以单线程执行,增加并行度值可以提高作业的执行速度。
jobmanager.execution.failover-strategy: region
- 作用:指定 JobManager 的故障恢复策略。region 策略意味着在一个任务失败后,Flask 会尝试恢复整个任务区域(TaskManager 负责的任务),而不仅仅是单个任务。
- 可选值:region 和 full。region 是针对单个 TaskManager 故障的恢复,full 则是完全恢复(包括整个作业的恢复)。
rest.address: Node01
- 作用:指定 Flink REST 服务(包括 Web UI 和 REST API)所在的主机名或 IP 地址。客户端和浏览器通过这个地址访问 Flink 的 Web 界面。
- 例子:Node01 表示 Flink 的 REST 服务将监听在 Node01 上。
rest.bind-address: 0.0.0.0
- 作用:指定 REST 服务绑定的网络地址。如果设置为 0.0.0.0,REST 服务将绑定到所有可用的网络接口上。这样可以让任何网络接口都能访问 Flink 的 REST 服务和 Web UI。
- 用法:与 rest.address 配合使用,确保 Web 界面和 API 能通过所有网络接口访问。
2.2 workers文件配置
workers文件指定集群中哪些机器充当TaskManager角色,即实际执行任务的工作节点。Flink 会根据该文件中的机器列表启动多个 TaskManager 进程,分配计算任务。
每一行应该是一个TaskManager节点的主机名或 IP 地址。每个TaskManager将在相应的主机上启动,负责处理数据流和执行计算任务。
首先打开workers文件
变为以下内容
2.3 masters文件配置
masters文件指定集群中哪些机器充当JobManager角色,该文件的每一行应该是一个JobManager的主机名或 IP 地址。
Flink 集群可以配置多个 JobManager(在高可用模式下),但通常只有一个 JobManager 作为主节点,负责调度作业和管理任务的执行。
以Node01作为主节点,打开masters文件
变为以下内容
2.4 分发Flink
把Flink分发到各个节点上
2.5 各节点上修改flink-conf.yaml
到每个节点上,修改taskmanager.host为当前的节点
1
| vim ~/software/flink-1.17.2/conf/flink- conf.yaml
|
修改节点名称
1 2
| taskmanager: bind-host: Node02/Node03
|
2.6 配置环境变量
打开文件
为了将FLink任务部署在YARN,配置文件内容
1 2 3 4 5
| export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export HADOOP_CLASSPATH=`hadoop classpath`
export FLINK_HOME=~/software/flink-1.17.2 export PATH=$PATH:$FLINK_HOME/bin:$FLINK_HOME/bin
|
刷新环境变量
分发环境变量
最后到每个节点刷新下环境变量
2.7 启动集群
输入指令启动集群
进入UI页面:http://node01:8081,显示成功

三、批处理和流处理
Apache Flink 是一个开源的分布式计算框架,同时支持批处理和流处理,并在两者之上实现了统一编程模型。
3.1 Flink项目创建
创建Meaven项目,在pom.xml中导入所需要的依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId> <artifactId>FlinkTutorial</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.20.0</flink.version> <scala.version>2.12</scala.version> </properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> </dependencies> </project>
|
3.2 批处理
批处理通常用于处理静态、离线的数据集(有限数据),通常用于批量分析(如日志统计、ETL),相当于处理的数据是静态的。
3.2.1 创建测试文本
在Maven项目的根目录下创建input/words.txt文件,并输入以下内容
1 2 3
| Forget about the ldyer. Forget about me. Forget about you.
|
3.2.2 批处理代码实现
建立com.ldyer.wc软件包,并建立BatchWordCountjava类,输入以下代码.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package com.ldyer.wc;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> lineDS = env.readTextFile("input/words.txt");
FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } } });
UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
sum.print(); } }
|
运行程序,则会输出以下内容
(about,3)
(ldyer.,1)
(you.,1)
(me.,1)
(,2)
(Forget,3)
(the,1)
3.3 流处理
3.3.1 流处理代码实现
建立StreamWordCount类,并输入以下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| package com.ldyer.wc;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");
SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } } }) .keyBy(data -> data.f0) .sum(1);
sum.print();
env.execute("Stream Word Count"); } }
|
运行成功,会输出以下内容:
1> (about,1)
6> (Forget,1)
7> (the,1)
10> (you.,1)
1> (about,2)
6> (Forget,2)
1> (about,3)
9> (,1)
8> (ldyer.,1)
9> (,2)
6> (Forget,3)
2> (me.,1)
3.2.4 Socket流处理实现
建立SocketStreamWordCount类,输入以下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| package com.ldyer.wc;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;
public class SocketStreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lineStream = env.socketTextStream("Node01", 7777);
SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream .flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }) .returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy(data -> data.f0) .sum(1);
sum.print();
env.execute("Socket Stream Word Count"); } }
|