一,集群搭建

1.1 重要原则

  • Kafka节点只要注册到同一个Zookeeper上就代表它们是同一个集群的
  • Kafka通过broker.id来区分集群中的不同节点

1.2 规划

  • 简单起见,我们只使用一个VMWare虚拟机,所以各个broker实例需要设定不同端口号
  • Kafka程序不需要复制,对应各自不同的配置文件启动多个进程就能组成集群
  • Zookeeper还是使用原来的2181即可
端口号配置文件日志目录
实例017000/opt/k-cluster/server7000.properties/opt/k-cluster/log7000
实例028000/opt/k-cluster/server8000.properties/opt/k-cluster/log8000
实例039000/opt/k-cluster/server9000.properties/opt/k-cluster/log9000

1.3 具体操作

1.3.1 创建目录

mkdir -p /opt/k-cluster/log7000
mkdir -p /opt/k-cluster/log8000
mkdir -p /opt/k-cluster/log9000

1.3.2 复制配置文件

cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server7000.properties
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server8000.properties
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server9000.properties

1.3.3 修改配置文件

[1]7000

broker.id=1
listeners=PLAINTEXT://192.168.200.100:7000
advertised.listeners=PLAINTEXT://192.168.200.100:7000
log.dirs=/opt/k-cluster/log7000

[2]8000

broker.id=2
listeners=PLAINTEXT://192.168.200.100:8000
advertised.listeners=PLAINTEXT://192.168.200.100:8000
log.dirs=/opt/k-cluster/log8000

[3]9000

broker.id=3
listeners=PLAINTEXT://192.168.200.100:9000
advertised.listeners=PLAINTEXT://192.168.200.100:9000
log.dirs=/opt/k-cluster/log9000

1.4 启动集群各实例

注意:此前需要先启动 Zookeeper

kafka-server-start.sh -daemon /opt/k-cluster/server7000.properties
kafka-server-start.sh -daemon /opt/k-cluster/server8000.properties
kafka-server-start.sh -daemon /opt/k-cluster/server9000.properties

验证各个端口号:

lsof -i:2181
lsof -i:7000
lsof -i:8000
lsof -i:9000

如果因为内存不足而启动失败,可以修改对应启动脚本程序中的内存大小:

  • Zookeeper启动脚本程序:zookeeper-server-start.sh
  • Zookeeper中Kafka堆内存大小变量名称:KAFKA_HEAP_OPTS
  • Kafka启动脚本程序:kafka-server-start.sh
  • Kafka堆内存大小变量名称:KAFKA_HEAP_OPTS

1.5 停止集群

# 停止Kafka,无需指定端口号就能停止各个实例:
kafka-server-stop.sh
# 停止zk
zookeeper-server-stop.sh

二、使用集群

2.1 在集群上创建主题

kafka-topics.sh \
--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 \
--create \
--partitions 3 \
--replication-factor 3 \
--topic my-cluster-topic

2.2 查看集群主题

kafka-topics.sh \
--bootstrap-server 192.168.200.100:7000 \
--describe --topic my-cluster-topic

2953321-20260421174022452-1286900429.png

2.3 集群消息发送

kafka-console-producer.sh \
--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 \
--topic my-cluster-topic

2.4 集群消息消费

kafka-console-consumer.sh \
--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000  \
--from-beginning \
--topic my-cluster-topic

2.5 集群消息消费相关问题

问题描述

  • 通过集群接收消息时,接收不到

问题产生原因

多个 broker 实例部署在同一个虚拟机上

  • 192.168.200.100:7000
  • 192.168.200.100:8000
  • 192.168.200.100:9000

这只是我们在测试环境下,非正式的这么安排,实际开发中不会把集群的所有实例放在一个机器上

问题解决方案一

消费端接收消息时指定分区

kafka-console-consumer.sh \
--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000  \
--from-beginning \
--partition 0 \
--topic my-cluster-topic

kafka-console-consumer.sh \
--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000  \
--from-beginning \
--partition 1 \
--topic my-cluster-topic

kafka-console-consumer.sh \
--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000  \
--from-beginning \
--partition 2 \
--topic my-cluster-topic

问题解决方案二

  • 第一步:把 apache-zookeeper-3.9.1-bin.tar.gz 上传到 Linux 系统 /opt 目录下

  • 第二步:解压 apache-zookeeper-3.9.1-bin.tar.gz 文件

    cd /opt
    tar -zxvf apache-zookeeper-3.9.1-bin.tar.gz
    
  • 第三步:运行 zkCli.sh 脚本文件,登录到 Zookeeper 服务器

    /opt/apache-zookeeper-3.9.1-bin/bin/zkCli.sh
    
  • 第四步:删除 __consumer_offsets 主题

    deleteall /brokers/topics/__consumer_offsets
    
  • 第五步:退出 Zookeeper

    quit
    
  • 第六步:重启

    • 先关闭然后重新启动Zookeeper
    • 先关闭然后重新启动集群各实例