一、Flink的安装与配置

1.1 Flink的下载

1
cd ~/software
  • 下载
1
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
  • 解压
1
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz

二、Flink的集群配置

打开Flink/conf/flink-conf.yaml文件

1
vim ~/software/flink-1.17.2/conf/flink-conf.yaml 

配置以下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
jobmanager:
bind-host: 0.0.0.0
rpc:
address: Node01
port: 6123
memory:
process:
size: 1600m
execution:
failover-strategy: region

taskmanager:
bind-host: Node01
host: 0.0.0.0
numberOfTaskSlots: 1
memory:
process:
size: 1728m

parallelism:
default: 1

rest:
address: Node01
bind-address: 0.0.0.0

这些参数是 Flink 集群配置文件 conf.yaml 中的一些常见设置,控制着 Flink 集群的各个组件如何互相通信、资源配置、以及任务执行等。以下是对每个参数的详细解释:

jobmanager.rpc.address: Node01

  • 作用:指定 JobManager 的主机名或 IP 地址,TaskManager 和其他 Flink 组件需要通过这个地址与 JobManager 进行 RPC 通信。
  • 例子:Node01 指的是 JobManager 所在的机器或节点,或者是该节点的 IP 地址。

jobmanager.rpc.port: 6123

  • 作用:指定 JobManager 对外暴露的 RPC 服务端口,Flink 集群中的其他节点(TaskManager 或客户端)将通过这个端口与 JobManager 进行通信。
  • 默认值:6123,但可以根据需要更改为其他端口。

jobmanager.bind-host: 0.0.0.0

  • 作用:指定 JobManager 绑定的网络地址。如果设置为 0.0.0.0,JobManager 将绑定到所有可用的网络接口上,允许来自任何网络接口的连接。
  • 用法:通常用于多机部署,确保 JobManager 能监听所有可用的 IP 地址。

jobmanager.memory.process.size: 1600m

  • 作用:设置 JobManager 进程的总内存大小。Flink 的 JobManager 负责管理作业的执行和调度,因此它的内存配置非常重要。
  • 例子:1600m 指的是分配给 JobManager 的内存为 1600 MB。

taskmanager.bind-host: 0.0.0.0

  • 作用:指定 TaskManager 绑定的网络地址。如果设置为 0.0.0.0,TaskManager 将绑定到所有可用的网络接口上。
  • 用法:一般用于在多机环境中,确保 TaskManager 可以监听到所有的网络接口。

taskmanager.host: Node01

  • 作用:指定 TaskManager 所在的主机名或 IP 地址,其他 Flink 组件可以通过这个地址与 TaskManager 通信。
  • 例子:Node01 是 TaskManager 所在的机器的主机名或 IP 地址。

taskmanager.memory.process.size: 1728m

  • 作用:指定 TaskManager 进程的总内存大小。TaskManager 是执行任务的节点,内存设置直接影响到作业的执行性能。
  • 例子:1728m 指的是为 TaskManager 分配的内存为 1728 MB。

taskmanager.numberOfTaskSlots: 1

  • 作用:指定每个 TaskManager 可以提供的任务槽(Task Slot)数量。任务槽是 TaskManager 中执行任务的基本单位,每个任务槽可以执行一个并行任务。
  • 用法:如果设置为 1,表示每个 TaskManager 只能运行一个任务。如果需要更多的并行任务,可以增加此值。

parallelism.default: 1

  • 作用:设置 Flink 作业的默认并行度。如果作业没有明确指定并行度,将使用这个默认值。
  • 例子:1 表示作业默认以单线程执行,增加并行度值可以提高作业的执行速度。

jobmanager.execution.failover-strategy: region

  • 作用:指定 JobManager 的故障恢复策略。region 策略意味着在一个任务失败后,Flask 会尝试恢复整个任务区域(TaskManager 负责的任务),而不仅仅是单个任务。
  • 可选值:region 和 full。region 是针对单个 TaskManager 故障的恢复,full 则是完全恢复(包括整个作业的恢复)。

rest.address: Node01

  • 作用:指定 Flink REST 服务(包括 Web UI 和 REST API)所在的主机名或 IP 地址。客户端和浏览器通过这个地址访问 Flink 的 Web 界面。
  • 例子:Node01 表示 Flink 的 REST 服务将监听在 Node01 上。

rest.bind-address: 0.0.0.0

  • 作用:指定 REST 服务绑定的网络地址。如果设置为 0.0.0.0,REST 服务将绑定到所有可用的网络接口上。这样可以让任何网络接口都能访问 Flink 的 REST 服务和 Web UI。
  • 用法:与 rest.address 配合使用,确保 Web 界面和 API 能通过所有网络接口访问。

2.2 workers文件配置

workers文件指定集群中哪些机器充当TaskManager角色,即实际执行任务的工作节点。Flink 会根据该文件中的机器列表启动多个 TaskManager 进程,分配计算任务。

每一行应该是一个TaskManager节点的主机名或 IP 地址。每个TaskManager将在相应的主机上启动,负责处理数据流和执行计算任务。

首先打开workers文件

1
vim conf/workers 

变为以下内容

1
2
3
Node01
Node02
Node03

2.3 masters文件配置

masters文件指定集群中哪些机器充当JobManager角色,该文件的每一行应该是一个JobManager的主机名或 IP 地址。

Flink 集群可以配置多个 JobManager(在高可用模式下),但通常只有一个 JobManager 作为主节点,负责调度作业和管理任务的执行。

Node01作为主节点,打开masters文件

1
vim conf/masters

变为以下内容

1
Node01:8081

把Flink分发到各个节点上

1
xsync flink-1.17.2

到每个节点上,修改taskmanager.host为当前的节点

1
vim ~/software/flink-1.17.2/conf/flink- conf.yaml 

修改节点名称

1
2
taskmanager:
bind-host: Node02/Node03

2.6 配置环境变量

打开文件

1
sudo vim ~/.bashrc

为了将FLink任务部署在YARN,配置文件内容

1
2
3
4
5
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

export FLINK_HOME=~/software/flink-1.17.2
export PATH=$PATH:$FLINK_HOME/bin:$FLINK_HOME/bin

刷新环境变量

1
source ~/.bashrc

分发环境变量

1
xsync ~/.bashrc

最后到每个节点刷新下环境变量

2.7 启动集群

输入指令启动集群

1
bin/start-cluster.sh

进入UI页面:http://node01:8081,显示成功
alt text

三、批处理和流处理

Apache Flink 是一个开源的分布式计算框架,同时支持批处理流处理,并在两者之上实现了统一编程模型。

3.1 Flink项目创建

创建Meaven项目,在pom.xml中导入所需要的依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>FlinkTutorial</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.20.0</flink.version>
<scala.version>2.12</scala.version> <!-- 使用 Scala 2.12 -->
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>

3.2 批处理

批处理通常用于处理静态、离线的数据集(有限数据),通常用于批量分析(如日志统计、ETL),相当于处理的数据是静态的。

3.2.1 创建测试文本

在Maven项目的根目录下创建input/words.txt文件,并输入以下内容

1
2
3
Forget about the ldyer.
Forget about me.
Forget about you.

3.2.2 批处理代码实现

建立com.ldyer.wc软件包,并建立BatchWordCountjava类,输入以下代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.ldyer.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {

public static void main(String[] args) throws Exception {

// 1. 创建执行环境
// Flink 的执行环境是程序运行的上下文,用于配置和执行 Flink 程序。
// ExecutionEnvironment 是 Flink 批处理的入口,用于创建数据源和执行作业。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 2. 从文件读取数据
// DataSource 是 Flink 中的数据源操作,用于读取外部数据。
// 这里使用 readTextFile 方法从指定路径读取文本文件,按行读取,每行作为一个字符串元素。
DataSource<String> lineDS = env.readTextFile("input/words.txt");

// 3. 转换数据格式
// 使用 flatMap 方法将每行文本拆分为单词,并为每个单词分配一个计数(1L)。
// flatMap 是一个转换操作,用于将输入的元素转换为零个、一个或多个输出元素。
// 这里将每行文本按空格分割为单词,并将每个单词封装为 Tuple2<String, Long>,其中第一个字段是单词,第二个字段是计数。
FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
// 按空格分割每行文本为单词数组
String[] words = line.split(" ");
// 遍历单词数组,为每个单词生成 Tuple2 对象
for (String word : words) {
out.collect(Tuple2.of(word, 1L)); // 输出单词和计数(1L)
}
}
});

// 4. 按照 word 进行分组
// 使用 groupBy 方法对数据进行分组,分组的依据是 Tuple2 的第一个字段(单词)。
// UnsortedGrouping 表示数据已经分组,但未排序。
UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

// 5. 分组内聚合统计
// 对每个分组内的数据进行聚合操作,计算每个单词的总计数。
// 这里使用 sum 方法对 Tuple2 的第二个字段(计数)进行求和。
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

// 6. 打印结果
// 将聚合后的结果打印到控制台。
sum.print();
}
}

运行程序,则会输出以下内容

(about,3)
(ldyer.,1)
(you.,1)
(me.,1)
(,2)
(Forget,3)
(the,1)

3.3 流处理

3.3.1 流处理代码实现

建立StreamWordCount类,并输入以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.ldyer.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {

public static void main(String[] args) throws Exception {

// 1. 创建流式执行环境
// StreamExecutionEnvironment 是 Flink 流处理的入口,用于配置和执行流式作业。
// getExecutionEnvironment() 方法会根据配置(如本地运行或集群运行)返回相应的执行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 读取文件
// DataStreamSource 是 Flink 中的流式数据源,用于读取外部数据。
// 这里使用 readTextFile 方法从指定路径读取文本文件,按行读取,每行作为一个字符串元素。
DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");

// 3. 转换、分组、求和,得到统计结果
// 3.1 对输入的文本流进行转换,将每行文本拆分为单词,并为每个单词分配计数(1L)。
// 使用 flatMap 方法将每行文本转换为 Tuple2<String, Long>,其中第一个字段是单词,第二个字段是计数。
// flatMap 是一个转换操作,用于将输入的元素转换为零个、一个或多个输出元素。
SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
// 按空格分割每行文本为单词数组
String[] words = line.split(" ");
// 遍历单词数组,为每个单词生成 Tuple2 对象
for (String word : words) {
out.collect(Tuple2.of(word, 1L)); // 输出单词和计数(1L)
}
}
})
// 3.2 按单词进行分组
// keyBy 方法用于对数据流进行分组,分组的依据是 Tuple2 的第一个字段(单词)。
// 这里使用 lambda 表达式指定分组键为 data.f0(即单词)。
.keyBy(data -> data.f0)
// 3.3 对每个分组内的数据进行聚合操作,计算每个单词的总计数。
// sum 方法对指定字段(这里是 Tuple2 的第二个字段,索引为 1)进行求和。
.sum(1);

// 4. 打印
// 将聚合后的结果打印到控制台。
// 这里使用 print 方法将结果输出到标准输出,方便调试和查看。
sum.print();

// 5. 执行
// 调用 execute 方法启动 Flink 流式作业。
// 这是 Flink 流处理程序的入口点,程序从这里开始执行。
env.execute("Stream Word Count");
}
}

运行成功,会输出以下内容:

1> (about,1)
6> (Forget,1)
7> (the,1)
10> (you.,1)
1> (about,2)
6> (Forget,2)
1> (about,3)
9> (,1)
8> (ldyer.,1)
9> (,2)
6> (Forget,3)
2> (me.,1)

3.2.4 Socket流处理实现

建立SocketStreamWordCount类,输入以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.ldyer.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SocketStreamWordCount {

public static void main(String[] args) throws Exception {

// 1. 创建流式执行环境
// StreamExecutionEnvironment 是 Flink 流处理的入口点,用于配置和执行流式作业。
// getExecutionEnvironment() 方法会根据配置(如本地运行或集群运行)返回相应的执行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 读取文本流
// 使用 socketTextStream 方法从指定的主机和端口读取文本数据。
// 这里,"Node01" 是发送端主机名,7777 是端口号。
// Flink 会从这个 Socket 地址接收文本数据,每行作为一个字符串元素。
DataStreamSource<String> lineStream = env.socketTextStream("Node01", 7777);

// 3. 转换、分组、求和,得到统计结果
SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream
// 3.1 使用 flatMap 方法将每行文本拆分为单词,并为每个单词分配计数(1L)。
// flatMap 是一个转换操作,用于将输入的元素转换为零个、一个或多个输出元素。
// 这里使用 lambda 表达式实现 flatMap 功能:
// - 将每行文本按空格分割为单词数组。
// - 遍历单词数组,为每个单词生成 Tuple2<String, Long> 对象。
// - Tuple2 的第一个字段是单词,第二个字段是计数(1L)。
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" "); // 按空格分割文本行
for (String word : words) {
out.collect(Tuple2.of(word, 1L)); // 输出单词和计数(1L)
}
})
// 3.2 指定返回类型
// returns 方法用于显式指定转换操作的返回类型。
// 这里指定返回类型为 Tuple2<String, Long>。
.returns(Types.TUPLE(Types.STRING, Types.LONG))
// 3.3 按单词进行分组
// keyBy 方法用于对数据流进行分组,分组的依据是 Tuple2 的第一个字段(单词)。
// 这里使用 lambda 表达式指定分组键为 data.f0(即单词)。
.keyBy(data -> data.f0)
// 3.4 对每个分组内的数据进行聚合操作,计算每个单词的总计数。
// sum 方法对指定字段(这里是 Tuple2 的第二个字段,索引为 1)进行求和。
.sum(1);

// 4. 打印
// 将聚合后的结果打印到控制台。
// 这里使用 print 方法将结果输出到标准输出,方便调试和查看。
sum.print();

// 5. 执行
// 调用 execute 方法启动 Flink 流式作业。
// 这是 Flink 流处理程序的入口点,程序从这里开始执行。
// 可以在 execute 方法中传入作业名称,便于在 Flink Web UI 中识别。
env.execute("Socket Stream Word Count");
}
}