环境配置

前言

本文记录本人学习kafka的流程,采用的系统环境为Ubuntu.

(一)安装Java

配置Kafka前,首先配置Java环境,输入以下命令:

1
2
sudo apt-get update
sudo apt-get install default-jdk

其中,第一行指令是指更新Ubuntu系统中的本地软件包索引,这个命令的作用是从互联网上的软件仓库(即存储软件包的服务器)下载最新的软件包信息,并将这些信息更新到本地系统的数据库中。这个命令执行后,你的系统会知道有哪些软件包可以安装或更新,以及它们的最新版本是什么。这是在进行软件安装或更新之前应该执行的操作,因为它确保了你能够访问到最新的软件版本和依赖关系信息。

(二)安装kafka

由于直接下载官方软件包太慢了,因此用清华镜像网站进行下载,此次下载的版本是3.7.0,这是下载命令,可根据想要下载的版本进行链接选择。

1
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.7.0/

下载完成后,对压缩包进行解压,并进入该目录下:

1
2
tar -xzf kafka_2.13-3.7.0.tgz 
cd kafka_2.13-3.7.0/

(三)单节点测试

1.启动Zookeeper服务

若果你的 Kafka 版本比较新,那它已经不再支持使用 zookeeper 参数来创建主题。在较新的 Kafka 版本中,通常使用 kafka-topics.sh 脚本时不再直接与 Zookeeper 交互,而是通过 Kafka 自身的服务器来管理主题。所以可以直接跳过这一步,不许要启动,如果版本较旧,则
输入以下指令启动Zookeeper:

1
./bin/zookeeper-server-start.sh config/zookeeper.properties

2.启动kafka

输入以下指令启动Kafka:

1
./bin/kafka-server-start.sh config/server.properties

3.创建主题

输入以下指令创建主题:

1
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ldy

输入以下指令查看当前主题信息:

1
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic ldy

4.创建Producer

输入以下命令,创建生产者:

1
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ldy

5.创建Consumer

输入以下命令,穿件消费者:

1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ldy

6.消息测试

在“生产者”控制台中输入信息,可以在“消费者”控制台中看到对应的消息。

(四)单节点多broker测试

1.修改配置文件

在kafka根目录下,找到./config/server.properties配置文件,进行复制:

1
2
cp server.properties server1.properties
cp server.properties server2.properties

随后分别打开server1.properties、server2.properties进行文件配置,分别设置

broker.id=11
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka/logs-1

broker.id=12
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka/logs-2

修改完毕后退出保存

2.启动Zookeeper服务

与上文步骤一致。

3.启动Kafka的server服务

输入以下指令,启动server1与server2服务

1
2
./bin/kafka-server-start.sh config/server1.properties
./bin/kafka-server-start.sh config/server2.properties

4.创建主题

输入以下指令,创建相应的主题,在这里,将主题设置为3个分区和2个拷贝:

1
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3 --topic ldy

可以输入以下指令查看主题的信息:

1
2
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic ldy
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9093 --topic ldy

5.创建Producer与Consumer

由于是多broker,在此对broker1设置为生产者,对broker2设置为消费者。

1
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ldy
1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic ldy

6.消息测试

在broker1的生产者中发送消息,在broker2的消费者中接收到了消息。

(五)Raft模式

1.配置多台虚拟机

为了模拟多节点,在此采用3个虚拟机进行测试,所以在环境中克隆虚拟机①,然后复制虚拟机②、③。
克隆完成后,修改每台主机的信息。这是修改主机名称.(每台虚拟机名称为1-3)

1
2
3
hostnamectl set-hostname kraft1
hostnamectl set-hostname kraft2
hostnamectl set-hostname kraft3

随后修改每台主机的host文件

1
sudo vi /etc/hosts

在每个主机对于的文件中输入以下内容

1
2
3
192.168.10.130 kafka1
192.168.10.131 kafka2
192.168.10.132 kafka3

配置 server.properties,在kafka1节点的./config/kraft/server.properties中配置以下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present. See config/kraft/README.md for details.
#

############################# Server Basics #############################

# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@kafka1:19091,2@kafka2:19091,3@kafka3:19091

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:19091
inter.broker.listener.name=PLAINTEXT

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://:9092

# Listener, host name, and port for the controller to advertise to the brokers. If
# this server is a controller, this listener must be configured.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-combined-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=3000

其中,三台节点的node.id分别是1、2、3,其他都一样。

2.生成集群ID

整个集群有一个唯一的ID标志,使用uuid。可使用官方提供的 kafka-storage 工具生成,亦可以自己去用其他生成uuid。输入该指令,生成ID,并记录

1
2
./bin/kafka-storage.sh random-uuid
gEiTGF_yQM-RZCU9tR9igg

3.格式化目录

使用上面生成集群 uuid, 在三个节点上都执行格式化存储目录命令:

1
2
./bin/kafka-storage.sh format -t gEiTGF_yQM-RZCU9tR9igg -c ./config/kraft/server.properties

4.启动节点

在每个节点开启Kafka服务器

1
./bin/kafka-server-start.sh ./config/kraft/server.properties

5.创建Topic

输入

1
./bin/kafka-topics.sh --create --topic ldy --partitions 1 --replication-factor 3 --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092