一,Web-Ui操作Kafka

先安装 Kafka-ui

docker run -d \
  --name kafka-ui \
  -p 8080:8080 \
  -e DYNAMIC_CONFIG_ENABLED=true \
  -e KAFKA_CLUSTERS_0_NAME=local \
  -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 \
  provectuslabs/kafka-ui:latest

1.1 创建Topic

2953321-20260415204146493-1797905051.png

  1. Topic Name(主题名称)

    • 含义:Topic 的名字,用于生产和消费消息时指定。
    • 要求:只能包含字母、数字、下划线 _、点 .、连字符 -
    • 建议:用有业务含义的名称,如 order-eventsuser-logs
  2. Number of Partitions(分区数)

  • 含义:Topic 被拆分成多少个分区,用于水平扩展吞吐量。
  • 作用
    • 分区数越多,并行消费和处理能力越强。
    • 但过多会增加文件句柄和 leader 选举开销。
  • 建议
    • 小规模(测试):1–3 个分区。
    • 生产环境(中等流量):6–12 个分区。
    • 根据期望的 吞吐量 / 消费者数量 来定。
  1. Cleanup Policy(清理策略)

    • 含义:决定消息何时被删除或压缩。

      选项含义适用场景
      Delete直接删除旧数据默认推荐。适合日志、监控数据,删了就没了。
      Compact压缩(保留每个 Key 的最新一条)适合数据库变更日志。比如你改了 10 次用户名,Kafka 只保留最后一次改名的结果。
      Compact, Delete先压缩,再按时间/大小删除特殊场景,一般不用勾选。
    • 建议:绝大多数场景用 Delete

  2. Min In Sync Replicas(最小同步副本数)

    • 含义:生产者需要等待至少多少个 同步中的副本 确认写入,才算成功。
    • 作用:控制数据可靠性与可用性的平衡。
    • 建议
      • 如果副本数(replication factor)是 3,一般设为 2
      • 设为 1:更快但可能丢数据。
      • 设为 3:可靠性高但可能降低可用性。
  3. Time to retain data (in ms)(数据保留时间,毫秒)

    • 含义:消息保留多久后删除(基于时间)。
    • 单位:毫秒。
    • 常见值
      • 7 天 = 604800000
      • 24 小时 = 86400000
      • 1 小时 = 3600000
    • 建议:根据数据有效期设置,不设则用 broker 默认值。
  4. Max size on disk in GB(磁盘最大保留大小,GB)

    • 含义:该 Topic 所有分区总数据量超过此值时,删除旧消息。
    • 建议
      • 不设置(或设为 -1):仅按时间保留。
      • 若磁盘有限,可设置如 1050
  5. Maximum message size(最大消息大小)

  • 含义:Topic 允许的 单条消息最大字节数
  • 默认:通常为 1 MB(1048576 字节)。
  • 建议
    • 普通业务保持默认。
    • 大文件 / 大数据场景可调大(如 10 MB)。
  1. Custom parameters(自定义参数)

    • 用于添加 Kafka 支持的 其他 Topic 级别配置(如 compression.typeunclean.leader.election.enable)。
    • 不建议新手使用,除非你明确知道需要改某个高级参数。

1.2 发送消息

进入 topic

2953321-20260415204203201-1806693951.png

发送消息

2953321-20260415204212198-699011669.png

  1. Partition(分区)

    选项含义
    Partition #0指定发送到分区 0
    Partition #1发送到分区 1
    ...以此类推
    没有指定(留空)Kafka 会根据 Key 的哈希自动选择分区

    建议:不熟悉时选择 Partition #0 或留空让 Kafka 自动分配。

  2. Key Serde(Key 序列化器)

    选项含义适合的 Key 格式
    String字符串序列化"user123""order_001"
    JSONJSON 序列化{"id": 123}
    Int32整数序列化123
  3. Value Serde(Value 序列化器)

    同上,用于消息的内容部分。

  4. Keep contents(保留内容)

    • 含义:勾选后,发送成功后不清空输入框,方便连续发送多条相似消息
    • 建议:测试时可以勾上
  5. Key(消息键)

  • 作用:
    • 相同 Key 的消息会进入 同一个分区
    • 如果启用了 Compact(压缩),会按 Key 保留最新消息
  • 示例:"user_123""order_001"
  1. Value(消息值)

    • 这是你真正要发送的内容

    • 示例:"hello kafka"{"name":"张三","age":25}

  2. Headers(消息头)

    • 作用:附加的 键值对元数据,不影响消息内容

    • 用途:传递 trace_id、content_type、认证信息等

    • 示例:

      Header KeyHeader Value
      trace_idabc-123-xyz
      sourcekafka-ui

如下图,消息发送成功!

2953321-20260415204231049-803317522.png

二,命令行操作Kafka

2.1 Topic

  1. 查看所有 topic

    kafka-topics --list --bootstrap-server localhost:9092
    

    参数说明

    参数含义示例
    --bootstrap-serverKafka broker 的地址和端口localhost:9092
    --topic要操作的 topic 名称--topic my-topic
    --list列出所有 topic 名称
  2. 查看 topic 详情

    kafka-topics --describe \
      --bootstrap-server localhost:9092 \
      --topic user-topic
    

    参数说明

    参数含义说明
    --describe查看详情查看详细信息(分区分布、leader 是谁等)

    输出会显示:分区数、副本数、每个分区的 leader 和副本分布。

    输出示例

    Topic: user-topic       TopicId: cnz-J97jRca99hql6W7tJA PartitionCount: 1       ReplicationFactor: 1    Configs: min.insync.replicas=1,cleanup.policy=delete,flush.ms=1000,segment.bytes=1073741824,retention.ms=604800000,flush.messages=10000,max.message.bytes=10485760
            Topic: user-topic       Partition: 0    Leader: 1       Replicas: 1     Isr: 1
    
  3. 创建 topic

    kafka-topics --create \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --partitions 3 \
      --replication-factor 1
    

    参数说明

    参数含义说明
    --create创建操作告诉 Kafka 我要建一个新 topic
    --partitions分区数数字越大,并行处理能力越强
    --replication-factor副本数1 = 不备份,3 = 每个分区存 3 份(需要 3 台 broker)
  4. 修改 topic(比如增加分区)

    kafka-topics --alter \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --partitions 6
    
    参数含义
    --alter修改现有 topic(只能增加分区,不能减少)
  5. 删除 topic

    kafka-topics --delete \
      --bootstrap-server localhost:9092 \
      --topic cli-test
    
    参数含义
    --delete删除这个 topic

2.2 Message

  1. 基础发送

    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test
    
  2. 带 Key 发送

    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --property parse.key=true \
      --property key.separator=:
    
    参数含义
    --property parse.key=true告诉 Kafka 我要手动指定消息的 Key
    --property key.separator=Key 和 Value 之间的分隔符(这里用冒号)

    示例输入user123:Hello World

    • Key = user123
    • Value = Hello World
  3. 带 Header 发送

    kafka kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --header "trace-id:001" \
      --header "source:cli"
    
    参数含义
    --header "trace-id:001"指定header内容
  4. 指定超时时间

    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --request-timeout-ms 5000
    
    参数含义说明
    --request-timeout-ms请求超时时间(毫秒)超过此时间未收到确认,认为发送失败
  5. 指定确认机制

    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --request-required-acks -1
    
    参数含义说明
    --request-required-acks确认机制0=不等待确认,1=等待leader确认,-1=等待所有副本确认(等同于all
  6. 同步发送

    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --sync
    
    参数含义说明
    --sync同步发送模式每条消息发送完等待确认后才发下一条,吞吐量低但更可靠
  7. 指定批次大小

    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --max-partition-memory-bytes 32768 \
      --timeout 100
    
    参数含义说明
    --max-partition-memory-bytes每个分区的批次大小(字节)默认16384,等同于batch.size
    --timeout延迟发送时间(毫秒)默认1000,等同于linger.ms
  8. 指定缓冲区大小

    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --max-memory-bytes 67108864
    
    参数含义说明
    --max-memory-bytes生产者缓冲区总大小(字节)默认33554432(32MB),等同于buffer.memory
  9. 指定重试次数和间隔

    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --message-send-max-retries 5 \
      --retry-backoff-ms 500
    
    参数含义说明
    --message-send-max-retries重试次数默认3,等同于retries
    --retry-backoff-ms重试间隔(毫秒)默认100
  10. 通过属性方式设置参数

kafka-console-producer \
  --bootstrap-server localhost:9092 \
  --topic cli-test \
  --producer-property acks=-1 \
  --producer-property compression.type=gzip \
  --producer-property batch.size=32768
参数含义说明
--producer-property生产者属性格式key=value,可多次使用
  1. 使用配置文件发送消息

    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --producer.config producer.properties
    

    参数说明

    参数含义说明
    --producer.configproducer配置文件路径可以配置acks、压缩等参数

    producer.properties示例内容

    acks=all
    compression.type=gzip
    batch.size=16384
    linger.ms=5
    request.timeout.ms=30000
    
  2. 带压缩发送

    kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --compression-codec gzip
    

    参数说明

    参数含义说明
    --compression-codec压缩算法可选:gzipsnappylz4zstd

3.3 Consumer

  1. 基础消费

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test
    

    参数说明

    参数含义说明
    --bootstrap-serverKafka broker地址端口与producer保持一致
    --topic要消费的topic名称必须是已存在的topic

    注意:不加--from-beginning只会消费启动后产生的新消息

  2. 从头消费

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --from-beginning
    

    参数说明

    参数含义说明
    --from-beginning从最早的消息开始消费等同于--offset earliest
  3. 显示 Key 和 Value

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --property print.key=true \
      --property print.value=true \
      --property key.separator=":"
    

    参数说明

    参数含义说明
    --property print.key=true打印消息的Key需要消息有Key才能看到
    --property print.value=true打印消息的Value默认就是true,可省略
    --property key.separator=":"Key和Value的分隔符默认是\t(制表符)

    输出示例

    user123:Hello World
    user456:Hello Kafka
    
  4. 显示 Header

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --property print.headers=true
    

    参数说明

    参数含义说明
    --property print.headers=true打印消息的Header会显示所有header键值对

    输出示例

    trace-id:001,source:cli	Hello World
    
  5. 显示时间戳

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --property print.timestamp=true
    

    参数说明

    参数含义说明
    --property print.timestamp=true打印消息时间戳显示消息的创建时间
  6. 组合显示多个属性

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --property print.key=true \
      --property print.value=true \
      --property print.headers=true \
      --property print.timestamp=true \
      --property key.separator=":" \
      --property headers.separator=","
    

    参数说明

    参数含义说明
    --property headers.separatorHeader之间的分隔符默认是,
  7. 指定消费者组

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --group my-consumer-group
    

    参数说明

    参数含义说明
    --group消费者组ID同组消费者共同消费topic,消息不重复
  8. 指定分区消费

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --partition 0 \
      --offset 10
    

    参数说明

    参数含义说明
    --partition指定分区编号从0开始,必须配合--offset--from-beginning
    --offset指定起始偏移量数字表示从第几条开始,earliestlatest
  9. 限制消费消息数量

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --max-messages 5 \
      --from-beginning
    

    参数说明

    参数含义说明
    --max-messages最大消费消息数消费完N条后自动退出
  10. 指定偏移量重置策略

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --group my-group \
      --offset earliest
    

    参数说明

    参数含义说明
    --offset偏移量重置策略earliest=最早,latest=最新(默认)

    注意--offset只在没有已提交偏移量时生效

  11. 指定超时退出

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --timeout-ms 5000
    

    参数说明

    参数含义说明
    --timeout-ms超时时间(毫秒)如果超过此时间没有新消息,则退出
  12. 不自动提交偏移量

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --group my-group \
      --enable-autocommit false
    

    参数说明

    参数含义说明
    --enable-autocommit是否自动提交偏移量默认true,设为false需要手动管理偏移量
  13. 指定消费间隔

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --group my-group \
      --autocommit-interval 5000
    

    参数说明

    参数含义说明
    --autocommit-interval自动提交间隔(毫秒)默认5000(5秒)
  14. 指定 Session 超时

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --group my-group \
      --session-timeout 30000
    

    参数说明

    参数含义说明
    --session-timeout会话超时时间(毫秒)默认10000,如果消费者超过此时间未发心跳,认为挂掉
  15. 指定心跳间隔

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --group my-group \
      --heartbeat-interval 3000
    

    参数说明

    参数含义说明
    --heartbeat-interval心跳间隔(毫秒)默认3000,应小于session-timeout的1/3
  16. 使用配置文件

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --consumer.config consumer.properties
    

    参数说明

    参数含义说明
    --consumer.configconsumer配置文件路径可以配置多个参数

    consumer.properties 示例:

    properties

    group.id=my-group
    auto.offset.reset=earliest
    enable.auto.commit=true
    auto.commit.interval.ms=5000
    session.timeout.ms=30000
    heartbeat.interval.ms=3000
    max.poll.records=500
    
  17. 通过属性方式设置参数

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --consumer-property group.id=my-group \
      --consumer-property auto.offset.reset=earliest
    

    参数说明

    参数含义说明
    --consumer-property消费者属性格式key=value,可多次使用
  18. 指定 Key 和 Value 反序列化器

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --key-deserializer org.apache.kafka.common.serialization.StringDeserializer \
      --value-deserializer org.apache.kafka.common.serialization.StringDeserializer
    

    参数说明

    参数含义说明
    --key-deserializerKey反序列化器类默认StringDeserializer
    --value-deserializerValue反序列化器类默认StringDeserializer
  19. 消费多个 Topic

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic topic1,topic2,topic3 \
      --group my-group
    
    参数含义说明
    --topic多个topic用逗号分隔同时消费多个topic的消息
  20. 使用正则表达式匹配 Topic

    kafka-console-consumer \
      --bootstrap-server localhost:9092 \
      --topic "test-.*" \
      --group my-group \
      --regex
    

    参数说明

    参数含义说明
    --regex启用正则表达式匹配topic参数会被当作正则表达式处理

常用参数速查表

参数默认值说明
--bootstrap-server必填,Kafka集群地址
--topic必填,要消费的topic
--group消费者组ID
--from-beginningfalse是否从头消费
--offsetlatest偏移量重置策略(earliest/latest)
--max-messages最大消费消息数
--timeout-ms超时后退出
--partition指定分区
--enable-autocommittrue是否自动提交偏移量
--autocommit-interval5000自动提交间隔(毫秒)
--session-timeout10000会话超时时间(毫秒)
--heartbeat-interval3000心跳间隔(毫秒)
--property print.keyfalse是否打印Key
--property print.valuetrue是否打印Value
--property print.headersfalse是否打印Header
--property print.timestampfalse是否打印时间戳
--property key.separator\tKey和Value的分隔符
--consumer.config配置文件路径
--consumer-property动态设置属性

3.4 Consumer Group

  1. 查看所有消费者组

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --list
    

    参数说明

    参数含义说明
    --bootstrap-serverKafka broker地址端口与producer保持一致
    --list列出所有消费者组只显示组名,不含详情

    输出示例

    my-consumer-group
    console-consumer-12345
    test-group
    user-group
    
  2. 查看消费者组详情

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --describe \
      --group my-consumer-group
    

    参数说明

    参数含义说明
    --describe查看组详细信息显示每个分区的消费进度
    --group指定消费者组名必须配合--describe使用

    输出示例

    GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID                                          HOST              CLIENT-ID
    my-consumer-group   cli-test        0          15              20              5     consumer-my-consumer-group-1-xxx /127.0.0.1 consumer-my-consumer-group-1
    my-consumer-group   cli-test        1          8               8               0     consumer-my-consumer-group-1-xxx /127.0.0.1 consumer-my-consumer-group-1
    my-consumer-group   cli-test        2          22              30              8     consumer-my-consumer-group-2-xxx /127.0.0.1 consumer-my-consumer-group-2
    

    输出字段说明

    字段含义说明
    GROUP消费者组名
    TOPICtopic名称
    PARTITION分区编号
    CURRENT-OFFSET当前消费到的位置已经消费了多少条
    LOG-END-OFFSET分区最新消息的位置总共有多少条消息
    LAG消费延迟LOG-END-OFFSET - CURRENT-OFFSET
    CONSUMER-ID消费者ID哪个消费者在消费
    HOST消费者主机消费者运行的主机地址
    CLIENT-ID客户端ID消费者的客户端标识
  3. 查看指定 Topic 的消费详情

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --describe \
      --group my-consumer-group \
      --topic cli-test
    

    参数说明

    参数含义说明
    --topic指定topic名称只显示该topic的消费情况
  4. 查看指定分区的消费详情

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --describe \
      --group my-consumer-group \
      --members \
      --verbose
    

    参数说明

    参数含义说明
    --members显示消费者组成员列出组内所有消费者
    --verbose详细模式同时显示每个消费者分配的分区

    输出示例

    GROUP               CONSUMER-ID                                          HOST              CLIENT-ID           #PARTITIONS
    my-consumer-group   consumer-my-consumer-group-1-xxx /127.0.0.1 consumer-my-consumer-group-1 2
    my-consumer-group   consumer-my-consumer-group-2-xxx /127.0.0.1 consumer-my-consumer-group-2 1
    
    GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID
    my-consumer-group   cli-test        0          15              20              5     consumer-my-consumer-group-1-xxx
    my-consumer-group   cli-test        1          8               8               0     consumer-my-consumer-group-1-xxx
    my-consumer-group   cli-test        2          22              30              8     consumer-my-consumer-group-2-xxx
    
  5. 查看消费者组状态

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --describe \
      --group my-consumer-group \
      --state
    

    参数说明

    参数含义说明
    --state显示消费者组状态显示组的状态(Stable/Empty/PreparingRebalance等)

    输出示例

    GROUP               COORDINATOR (ID)  ASSIGNMENT-STRATEGY  STATE           #MEMBERS
    my-consumer-group   broker1:9092 (0)  range               Stable          2
    

    状态说明

    状态含义
    Stable稳定状态,消费者组正常工作
    Empty组内没有活跃的消费者
    PreparingRebalance准备重平衡
    CompletingRebalance正在完成重平衡
    Dead组已死亡
  6. 重置消费者组偏移量(到最早)

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --group my-consumer-group \
      --topic cli-test \
      --reset-offsets \
      --to-earliest \
      --execute
    

    参数说明

    参数含义说明
    --reset-offsets执行偏移量重置必须配合--execute使用
    --to-earliest重置到最早的消息从头开始消费
    --execute执行重置不加则只是预览,不会真正执行

    预览示例(不实际执行)

    kafka-consumer-groups --bootstrap-server localhost:9092 \
      --group my-group --topic cli-test \
      --reset-offsets --to-earliest
    
    # 输出预览结果
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
    my-group                       cli-test                       0          0
    my-group                       cli-test                       1          0
    my-group                       cli-test                       2          0
    
  7. 重置消费者组偏移量(到最新)

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --group my-consumer-group \
      --topic cli-test \
      --reset-offsets \
      --to-latest \
      --execute
    
    参数含义说明
    --to-latest重置到最新的消息只消费新消息
  8. 重置消费者组偏移量(到指定偏移量)

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --group my-consumer-group \
      --topic cli-test:0,1,2 \
      --reset-offsets \
      --to-offset 100 \
      --execute
    

    参数说明

    参数含义说明
    --to-offset重置到指定偏移量例如--to-offset 100
    topic:0,1,2指定topic和分区格式topic名称:分区号列表
  9. 重置消费者组偏移量(向前 / 向后移动)

    # 向后移动5条(重新消费最近5条)
    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --group my-consumer-group \
      --topic cli-test \
      --reset-offsets \
      --shift-by -5 \
      --execute
    
    # 向前移动10条(跳过10条消息)
    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --group my-consumer-group \
      --topic cli-test \
      --reset-offsets \
      --shift-by 10 \
      --execute
    

    参数说明

    参数含义说明
    --shift-by偏移量移动值正数向前(跳过),负数向后(重复消费)
  10. 重置消费者组偏移量(到指定时间前)

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --group my-consumer-group \
      --topic cli-test \
      --reset-offsets \
      --to-datetime 2024-01-01T00:00:00.000 \
      --execute
    

    参数说明

    参数含义说明
    --to-datetime重置到指定时间格式YYYY-MM-DDTHH:mm:SS.sss
  11. 重置消费者组偏移量(根据时间戳文件)

    # 先创建时间戳文件
    echo "cli-test:0:1704067200000" > offsets.txt
    echo "cli-test:1:1704067200000" >> offsets.txt
    
    # 执行重置
    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --group my-consumer-group \
      --reset-offsets \
      --from-file offsets.txt \
      --execute
    

    参数说明

    参数含义说明
    --from-file从文件读取重置配置文件格式topic:分区:时间戳
  12. 删除消费者组

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --delete \
      --group my-consumer-group
    

    参数说明

    参数含义说明
    --delete删除消费者组组必须没有活跃的消费者,且状态为空
  13. 删除多个消费者组

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --delete \
      --group group1,group2,group3
    
    参数含义说明
    --group多个组用逗号分隔一次删除多个消费者组
  14. 导出消费者组偏移量

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --describe \
      --group my-consumer-group \
      --all-topics \
      > offsets-backup.txt
    
    参数含义说明
    --all-topics显示所有topic导出所有topic的偏移量信息
  15. 跳过消费者组(导出偏移量到文件用于迁移)

    # 导出当前偏移量
    kafka-consumer-groups --bootstrap-server localhost:9092 \
      --group my-group \
      --describe --all-topics \
      | awk '{print $2","$4}' > offsets.csv
    
  16. 查看消费者组协调器

    kafka-consumer-groups \
      --bootstrap-server localhost:9092 \
      --describe \
      --group my-consumer-group \
      --offsets
    
    参数含义说明
    --offsets显示偏移量信息显示每个分区的详细偏移量

重置偏移量参数速查表

参数含义示例
--to-earliest重置到最早从头消费所有消息
--to-latest重置到最新只消费新消息
--to-offset <N>重置到指定偏移量--to-offset 100
--shift-by <N>向前/向后移动N条--shift-by -5(重复消费5条)
--to-datetime重置到指定时间--to-datetime 2024-01-01T00:00:00.000
--from-file从文件读取--from-file offsets.txt