一,Web-Ui操作Kafka
先安装 Kafka-ui
docker run -d \
--name kafka-ui \
-p 8080:8080 \
-e DYNAMIC_CONFIG_ENABLED=true \
-e KAFKA_CLUSTERS_0_NAME=local \
-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 \
provectuslabs/kafka-ui:latest
1.1 创建Topic

-
Topic Name(主题名称)
- 含义:Topic 的名字,用于生产和消费消息时指定。
- 要求:只能包含字母、数字、下划线
_、点.、连字符-。 - 建议:用有业务含义的名称,如
order-events、user-logs。
-
Number of Partitions(分区数)
- 含义:Topic 被拆分成多少个分区,用于水平扩展吞吐量。
- 作用:
- 分区数越多,并行消费和处理能力越强。
- 但过多会增加文件句柄和 leader 选举开销。
- 建议:
- 小规模(测试):1–3 个分区。
- 生产环境(中等流量):6–12 个分区。
- 根据期望的 吞吐量 / 消费者数量 来定。
-
Cleanup Policy(清理策略)
-
含义:决定消息何时被删除或压缩。
选项 含义 适用场景 Delete 直接删除旧数据 默认推荐。适合日志、监控数据,删了就没了。 Compact 压缩(保留每个 Key 的最新一条) 适合数据库变更日志。比如你改了 10 次用户名,Kafka 只保留最后一次改名的结果。 Compact, Delete 先压缩,再按时间/大小删除 特殊场景,一般不用勾选。 -
建议:绝大多数场景用 Delete。
-
-
Min In Sync Replicas(最小同步副本数)
- 含义:生产者需要等待至少多少个 同步中的副本 确认写入,才算成功。
- 作用:控制数据可靠性与可用性的平衡。
- 建议:
- 如果副本数(replication factor)是 3,一般设为 2。
- 设为 1:更快但可能丢数据。
- 设为 3:可靠性高但可能降低可用性。
-
Time to retain data (in ms)(数据保留时间,毫秒)
- 含义:消息保留多久后删除(基于时间)。
- 单位:毫秒。
- 常见值:
- 7 天 =
604800000 - 24 小时 =
86400000 - 1 小时 =
3600000
- 7 天 =
- 建议:根据数据有效期设置,不设则用 broker 默认值。
-
Max size on disk in GB(磁盘最大保留大小,GB)
- 含义:该 Topic 所有分区总数据量超过此值时,删除旧消息。
- 建议:
- 不设置(或设为 -1):仅按时间保留。
- 若磁盘有限,可设置如
10、50。
-
Maximum message size(最大消息大小)
- 含义:Topic 允许的 单条消息最大字节数。
- 默认:通常为 1 MB(
1048576字节)。 - 建议:
- 普通业务保持默认。
- 大文件 / 大数据场景可调大(如 10 MB)。
-
Custom parameters(自定义参数)
- 用于添加 Kafka 支持的 其他 Topic 级别配置(如
compression.type、unclean.leader.election.enable)。 - 不建议新手使用,除非你明确知道需要改某个高级参数。
- 用于添加 Kafka 支持的 其他 Topic 级别配置(如
1.2 发送消息
进入 topic

发送消息

-
Partition(分区)
选项 含义 Partition #0 指定发送到分区 0 Partition #1 发送到分区 1 ... 以此类推 没有指定(留空) Kafka 会根据 Key 的哈希自动选择分区 建议:不熟悉时选择 Partition #0 或留空让 Kafka 自动分配。
-
Key Serde(Key 序列化器)
选项 含义 适合的 Key 格式 String 字符串序列化 "user123"、"order_001"JSON JSON 序列化 {"id": 123}Int32 整数序列化 123 -
Value Serde(Value 序列化器)
同上,用于消息的内容部分。
-
Keep contents(保留内容)
- 含义:勾选后,发送成功后不清空输入框,方便连续发送多条相似消息
- 建议:测试时可以勾上
-
Key(消息键)
- 作用:
- 相同 Key 的消息会进入 同一个分区
- 如果启用了 Compact(压缩),会按 Key 保留最新消息
- 示例:
"user_123"或"order_001"
-
Value(消息值)
-
这是你真正要发送的内容
-
示例:
"hello kafka"或{"name":"张三","age":25}
-
-
Headers(消息头)
-
作用:附加的 键值对元数据,不影响消息内容
-
用途:传递 trace_id、content_type、认证信息等
-
示例:
Header Key Header Value trace_idabc-123-xyzsourcekafka-ui
-
如下图,消息发送成功!

二,命令行操作Kafka
2.1 Topic
-
查看所有 topic
kafka-topics --list --bootstrap-server localhost:9092参数说明
参数 含义 示例 --bootstrap-serverKafka broker 的地址和端口 localhost:9092--topic要操作的 topic 名称 --topic my-topic--list列出所有 topic 名称 -
查看 topic 详情
kafka-topics --describe \ --bootstrap-server localhost:9092 \ --topic user-topic参数说明
参数 含义 说明 --describe查看详情 查看详细信息(分区分布、leader 是谁等) 输出会显示:分区数、副本数、每个分区的 leader 和副本分布。
输出示例
Topic: user-topic TopicId: cnz-J97jRca99hql6W7tJA PartitionCount: 1 ReplicationFactor: 1 Configs: min.insync.replicas=1,cleanup.policy=delete,flush.ms=1000,segment.bytes=1073741824,retention.ms=604800000,flush.messages=10000,max.message.bytes=10485760 Topic: user-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1 -
创建 topic
kafka-topics --create \ --bootstrap-server localhost:9092 \ --topic cli-test \ --partitions 3 \ --replication-factor 1参数说明
参数 含义 说明 --create创建操作 告诉 Kafka 我要建一个新 topic --partitions分区数 数字越大,并行处理能力越强 --replication-factor副本数 1 = 不备份,3 = 每个分区存 3 份(需要 3 台 broker) -
修改 topic(比如增加分区)
kafka-topics --alter \ --bootstrap-server localhost:9092 \ --topic cli-test \ --partitions 6参数 含义 --alter修改现有 topic(只能增加分区,不能减少) -
删除 topic
kafka-topics --delete \ --bootstrap-server localhost:9092 \ --topic cli-test参数 含义 --delete删除这个 topic
2.2 Message
-
基础发送
kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test -
带 Key 发送
kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --property parse.key=true \ --property key.separator=:参数 含义 --property parse.key=true告诉 Kafka 我要手动指定消息的 Key --property key.separator=Key 和 Value 之间的分隔符(这里用冒号) 示例输入:
user123:Hello World- Key =
user123 - Value =
Hello World
- Key =
-
带 Header 发送
kafka kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --header "trace-id:001" \ --header "source:cli"参数 含义 --header "trace-id:001"指定header内容 -
指定超时时间
kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --request-timeout-ms 5000参数 含义 说明 --request-timeout-ms请求超时时间(毫秒) 超过此时间未收到确认,认为发送失败 -
指定确认机制
kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --request-required-acks -1参数 含义 说明 --request-required-acks确认机制 0=不等待确认,1=等待leader确认,-1=等待所有副本确认(等同于all) -
同步发送
kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --sync参数 含义 说明 --sync同步发送模式 每条消息发送完等待确认后才发下一条,吞吐量低但更可靠 -
指定批次大小
kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --max-partition-memory-bytes 32768 \ --timeout 100参数 含义 说明 --max-partition-memory-bytes每个分区的批次大小(字节) 默认16384,等同于 batch.size--timeout延迟发送时间(毫秒) 默认1000,等同于 linger.ms -
指定缓冲区大小
kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --max-memory-bytes 67108864参数 含义 说明 --max-memory-bytes生产者缓冲区总大小(字节) 默认33554432(32MB),等同于 buffer.memory -
指定重试次数和间隔
kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --message-send-max-retries 5 \ --retry-backoff-ms 500参数 含义 说明 --message-send-max-retries重试次数 默认3,等同于 retries--retry-backoff-ms重试间隔(毫秒) 默认100 -
通过属性方式设置参数
kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic cli-test \
--producer-property acks=-1 \
--producer-property compression.type=gzip \
--producer-property batch.size=32768
| 参数 | 含义 | 说明 |
|---|---|---|
--producer-property | 生产者属性 | 格式key=value,可多次使用 |
-
使用配置文件发送消息
kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --producer.config producer.properties参数说明
参数 含义 说明 --producer.configproducer配置文件路径 可以配置acks、压缩等参数 producer.properties示例内容acks=all compression.type=gzip batch.size=16384 linger.ms=5 request.timeout.ms=30000 -
带压缩发送
kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --compression-codec gzip参数说明
参数 含义 说明 --compression-codec压缩算法 可选: gzip、snappy、lz4、zstd
3.3 Consumer
-
基础消费
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test参数说明
参数 含义 说明 --bootstrap-serverKafka broker地址端口 与producer保持一致 --topic要消费的topic名称 必须是已存在的topic 注意:不加
--from-beginning只会消费启动后产生的新消息 -
从头消费
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --from-beginning参数说明
参数 含义 说明 --from-beginning从最早的消息开始消费 等同于 --offset earliest -
显示 Key 和 Value
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --property print.key=true \ --property print.value=true \ --property key.separator=":"参数说明
参数 含义 说明 --property print.key=true打印消息的Key 需要消息有Key才能看到 --property print.value=true打印消息的Value 默认就是true,可省略 --property key.separator=":"Key和Value的分隔符 默认是 \t(制表符)输出示例
user123:Hello World user456:Hello Kafka -
显示 Header
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --property print.headers=true参数说明
参数 含义 说明 --property print.headers=true打印消息的Header 会显示所有header键值对 输出示例
trace-id:001,source:cli Hello World -
显示时间戳
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --property print.timestamp=true参数说明
参数 含义 说明 --property print.timestamp=true打印消息时间戳 显示消息的创建时间 -
组合显示多个属性
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --property print.key=true \ --property print.value=true \ --property print.headers=true \ --property print.timestamp=true \ --property key.separator=":" \ --property headers.separator=","参数说明
参数 含义 说明 --property headers.separatorHeader之间的分隔符 默认是 , -
指定消费者组
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --group my-consumer-group参数说明
参数 含义 说明 --group消费者组ID 同组消费者共同消费topic,消息不重复 -
指定分区消费
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --partition 0 \ --offset 10参数说明
参数 含义 说明 --partition指定分区编号 从0开始,必须配合 --offset或--from-beginning--offset指定起始偏移量 数字表示从第几条开始, earliest或latest -
限制消费消息数量
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --max-messages 5 \ --from-beginning参数说明
参数 含义 说明 --max-messages最大消费消息数 消费完N条后自动退出 -
指定偏移量重置策略
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --group my-group \ --offset earliest参数说明
参数 含义 说明 --offset偏移量重置策略 earliest=最早,latest=最新(默认)注意:
--offset只在没有已提交偏移量时生效 -
指定超时退出
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --timeout-ms 5000参数说明
参数 含义 说明 --timeout-ms超时时间(毫秒) 如果超过此时间没有新消息,则退出 -
不自动提交偏移量
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --group my-group \ --enable-autocommit false参数说明
参数 含义 说明 --enable-autocommit是否自动提交偏移量 默认true,设为false需要手动管理偏移量 -
指定消费间隔
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --group my-group \ --autocommit-interval 5000参数说明
参数 含义 说明 --autocommit-interval自动提交间隔(毫秒) 默认5000(5秒) -
指定 Session 超时
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --group my-group \ --session-timeout 30000参数说明
参数 含义 说明 --session-timeout会话超时时间(毫秒) 默认10000,如果消费者超过此时间未发心跳,认为挂掉 -
指定心跳间隔
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --group my-group \ --heartbeat-interval 3000参数说明
参数 含义 说明 --heartbeat-interval心跳间隔(毫秒) 默认3000,应小于session-timeout的1/3 -
使用配置文件
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --consumer.config consumer.properties参数说明
参数 含义 说明 --consumer.configconsumer配置文件路径 可以配置多个参数 consumer.properties示例:properties
group.id=my-group auto.offset.reset=earliest enable.auto.commit=true auto.commit.interval.ms=5000 session.timeout.ms=30000 heartbeat.interval.ms=3000 max.poll.records=500 -
通过属性方式设置参数
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --consumer-property group.id=my-group \ --consumer-property auto.offset.reset=earliest参数说明
参数 含义 说明 --consumer-property消费者属性 格式 key=value,可多次使用 -
指定 Key 和 Value 反序列化器
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic cli-test \ --key-deserializer org.apache.kafka.common.serialization.StringDeserializer \ --value-deserializer org.apache.kafka.common.serialization.StringDeserializer参数说明
参数 含义 说明 --key-deserializerKey反序列化器类 默认StringDeserializer --value-deserializerValue反序列化器类 默认StringDeserializer -
消费多个 Topic
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic topic1,topic2,topic3 \ --group my-group参数 含义 说明 --topic多个topic用逗号分隔 同时消费多个topic的消息 -
使用正则表达式匹配 Topic
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic "test-.*" \ --group my-group \ --regex参数说明
参数 含义 说明 --regex启用正则表达式匹配 topic参数会被当作正则表达式处理
常用参数速查表
| 参数 | 默认值 | 说明 |
|---|---|---|
--bootstrap-server | 无 | 必填,Kafka集群地址 |
--topic | 无 | 必填,要消费的topic |
--group | 无 | 消费者组ID |
--from-beginning | false | 是否从头消费 |
--offset | latest | 偏移量重置策略(earliest/latest) |
--max-messages | 无 | 最大消费消息数 |
--timeout-ms | 无 | 超时后退出 |
--partition | 无 | 指定分区 |
--enable-autocommit | true | 是否自动提交偏移量 |
--autocommit-interval | 5000 | 自动提交间隔(毫秒) |
--session-timeout | 10000 | 会话超时时间(毫秒) |
--heartbeat-interval | 3000 | 心跳间隔(毫秒) |
--property print.key | false | 是否打印Key |
--property print.value | true | 是否打印Value |
--property print.headers | false | 是否打印Header |
--property print.timestamp | false | 是否打印时间戳 |
--property key.separator | \t | Key和Value的分隔符 |
--consumer.config | 无 | 配置文件路径 |
--consumer-property | 无 | 动态设置属性 |
3.4 Consumer Group
-
查看所有消费者组
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --list参数说明
参数 含义 说明 --bootstrap-serverKafka broker地址端口 与producer保持一致 --list列出所有消费者组 只显示组名,不含详情 输出示例
my-consumer-group console-consumer-12345 test-group user-group -
查看消费者组详情
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --describe \ --group my-consumer-group参数说明
参数 含义 说明 --describe查看组详细信息 显示每个分区的消费进度 --group指定消费者组名 必须配合 --describe使用输出示例
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-consumer-group cli-test 0 15 20 5 consumer-my-consumer-group-1-xxx /127.0.0.1 consumer-my-consumer-group-1 my-consumer-group cli-test 1 8 8 0 consumer-my-consumer-group-1-xxx /127.0.0.1 consumer-my-consumer-group-1 my-consumer-group cli-test 2 22 30 8 consumer-my-consumer-group-2-xxx /127.0.0.1 consumer-my-consumer-group-2输出字段说明
字段 含义 说明 GROUP消费者组名 TOPICtopic名称 PARTITION分区编号 CURRENT-OFFSET当前消费到的位置 已经消费了多少条 LOG-END-OFFSET分区最新消息的位置 总共有多少条消息 LAG消费延迟 LOG-END-OFFSET - CURRENT-OFFSET CONSUMER-ID消费者ID 哪个消费者在消费 HOST消费者主机 消费者运行的主机地址 CLIENT-ID客户端ID 消费者的客户端标识 -
查看指定 Topic 的消费详情
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --describe \ --group my-consumer-group \ --topic cli-test参数说明
参数 含义 说明 --topic指定topic名称 只显示该topic的消费情况 -
查看指定分区的消费详情
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --describe \ --group my-consumer-group \ --members \ --verbose参数说明
参数 含义 说明 --members显示消费者组成员 列出组内所有消费者 --verbose详细模式 同时显示每个消费者分配的分区 输出示例
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS my-consumer-group consumer-my-consumer-group-1-xxx /127.0.0.1 consumer-my-consumer-group-1 2 my-consumer-group consumer-my-consumer-group-2-xxx /127.0.0.1 consumer-my-consumer-group-2 1 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID my-consumer-group cli-test 0 15 20 5 consumer-my-consumer-group-1-xxx my-consumer-group cli-test 1 8 8 0 consumer-my-consumer-group-1-xxx my-consumer-group cli-test 2 22 30 8 consumer-my-consumer-group-2-xxx -
查看消费者组状态
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --describe \ --group my-consumer-group \ --state参数说明
参数 含义 说明 --state显示消费者组状态 显示组的状态(Stable/Empty/PreparingRebalance等) 输出示例
GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS my-consumer-group broker1:9092 (0) range Stable 2状态说明
状态 含义 Stable稳定状态,消费者组正常工作 Empty组内没有活跃的消费者 PreparingRebalance准备重平衡 CompletingRebalance正在完成重平衡 Dead组已死亡 -
重置消费者组偏移量(到最早)
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --group my-consumer-group \ --topic cli-test \ --reset-offsets \ --to-earliest \ --execute参数说明
参数 含义 说明 --reset-offsets执行偏移量重置 必须配合 --execute使用--to-earliest重置到最早的消息 从头开始消费 --execute执行重置 不加则只是预览,不会真正执行 预览示例(不实际执行)
kafka-consumer-groups --bootstrap-server localhost:9092 \ --group my-group --topic cli-test \ --reset-offsets --to-earliest # 输出预览结果 GROUP TOPIC PARTITION NEW-OFFSET my-group cli-test 0 0 my-group cli-test 1 0 my-group cli-test 2 0 -
重置消费者组偏移量(到最新)
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --group my-consumer-group \ --topic cli-test \ --reset-offsets \ --to-latest \ --execute参数 含义 说明 --to-latest重置到最新的消息 只消费新消息 -
重置消费者组偏移量(到指定偏移量)
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --group my-consumer-group \ --topic cli-test:0,1,2 \ --reset-offsets \ --to-offset 100 \ --execute参数说明
参数 含义 说明 --to-offset重置到指定偏移量 例如 --to-offset 100topic:0,1,2指定topic和分区 格式 topic名称:分区号列表 -
重置消费者组偏移量(向前 / 向后移动)
# 向后移动5条(重新消费最近5条) kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --group my-consumer-group \ --topic cli-test \ --reset-offsets \ --shift-by -5 \ --execute # 向前移动10条(跳过10条消息) kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --group my-consumer-group \ --topic cli-test \ --reset-offsets \ --shift-by 10 \ --execute参数说明
参数 含义 说明 --shift-by偏移量移动值 正数向前(跳过),负数向后(重复消费) -
重置消费者组偏移量(到指定时间前)
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --group my-consumer-group \ --topic cli-test \ --reset-offsets \ --to-datetime 2024-01-01T00:00:00.000 \ --execute参数说明
参数 含义 说明 --to-datetime重置到指定时间 格式 YYYY-MM-DDTHH:mm:SS.sss -
重置消费者组偏移量(根据时间戳文件)
# 先创建时间戳文件 echo "cli-test:0:1704067200000" > offsets.txt echo "cli-test:1:1704067200000" >> offsets.txt # 执行重置 kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --group my-consumer-group \ --reset-offsets \ --from-file offsets.txt \ --execute参数说明
参数 含义 说明 --from-file从文件读取重置配置 文件格式 topic:分区:时间戳 -
删除消费者组
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --delete \ --group my-consumer-group参数说明
参数 含义 说明 --delete删除消费者组 组必须没有活跃的消费者,且状态为空 -
删除多个消费者组
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --delete \ --group group1,group2,group3参数 含义 说明 --group多个组用逗号分隔 一次删除多个消费者组 -
导出消费者组偏移量
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --describe \ --group my-consumer-group \ --all-topics \ > offsets-backup.txt参数 含义 说明 --all-topics显示所有topic 导出所有topic的偏移量信息 -
跳过消费者组(导出偏移量到文件用于迁移)
# 导出当前偏移量 kafka-consumer-groups --bootstrap-server localhost:9092 \ --group my-group \ --describe --all-topics \ | awk '{print $2","$4}' > offsets.csv -
查看消费者组协调器
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --describe \ --group my-consumer-group \ --offsets参数 含义 说明 --offsets显示偏移量信息 显示每个分区的详细偏移量
重置偏移量参数速查表
| 参数 | 含义 | 示例 |
|---|---|---|
--to-earliest | 重置到最早 | 从头消费所有消息 |
--to-latest | 重置到最新 | 只消费新消息 |
--to-offset <N> | 重置到指定偏移量 | --to-offset 100 |
--shift-by <N> | 向前/向后移动N条 | --shift-by -5(重复消费5条) |
--to-datetime | 重置到指定时间 | --to-datetime 2024-01-01T00:00:00.000 |
--from-file | 从文件读取 | --from-file offsets.txt |
Kafka基础操作
本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。