一,快速入门
-
引入 maven 依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>4.2.0</version> </dependency> -
Producer
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class QuickStartProducer { public static void main(String[] args) { // 1. 配置 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建生产者 try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) { // 3. 发送消息 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>( "cli-test", "key-" + i, "Hello Kafka " + i ); producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.printf("发送成功: topic=%s, partition=%d, offset=%d%n", metadata.topic(), metadata.partition(), metadata.offset()); } else { exception.printStackTrace(); } }); } // 4. 确保所有消息都发送出去 producer.flush(); } } } -
Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class QuickStartConsumer { public static void main(String[] args) { // 1. 配置 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "quick-start-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); // 从头开始消费 // 2. 创建消费者 try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { // 3. 订阅topic consumer.subscribe(Arrays.asList("cli-test")); // 4. 拉取并消费消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } } } -
创建 topic
# 步骤1:确认Kafka已启动 kafka-topics --list --bootstrap-server localhost:9092 # 步骤2:创建topic(如果不存在) kafka-topics --create \ --bootstrap-server localhost:9092 \ --topic cli-test \ --partitions 3 \ --replication-factor 1
好的,我来帮你补全这个笔记的剩余部分:
二,Topic相关API
2.1 创建Topic
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.errors.TopicExistsException;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class TopicAdminAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 配置AdminClient
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 2. 创建AdminClient
try (AdminClient admin = AdminClient.create(props)) {
// 3. 创建Topic
NewTopic newTopic = new NewTopic("java-topic", 3, (short) 1);
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
// 等待创建完成
result.all().get();
System.out.println("Topic创建成功");
}
}
}
2.2 查看Topic列表
import org.apache.kafka.clients.admin.*;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
public class ListTopicsAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) {
// 列出所有topic
ListTopicsResult topics = admin.listTopics();
Collection<TopicListing> listings = topics.listings().get();
for (TopicListing listing : listings) {
System.out.printf("Topic: %s, isInternal: %s%n",
listing.name(), listing.isInternal());
}
}
}
}
2.3 查看Topic详情
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
public class DescribeTopicAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) {
// 查看topic详情
DescribeTopicsResult result = admin.describeTopics(Arrays.asList("java-topic"));
TopicDescription description = result.values().get("java-topic").get();
System.out.println("Topic名称: " + description.name());
System.out.println("分区数: " + description.partitions().size());
for (TopicPartitionInfo partition : description.partitions()) {
System.out.printf("分区: %d, Leader: %s, Replicas: %s, ISR: %s%n",
partition.partition(),
partition.leader(),
partition.replicas(),
partition.isr());
}
}
}
}
2.4 修改Topic(增加分区)
import org.apache.kafka.clients.admin.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class AlterTopicAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
// 增加分区
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put("java-topic", NewPartitions.increaseTo(5));
CreatePartitionsResult result = admin.createPartitions(newPartitions, new CreatePartitionsOptions());
result.all().get();
System.out.println("分区数已增加到5");
}
}
}
2.5 修改Topic配置
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class AlterTopicConfigAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "java-topic");
// 使用 AlterConfigOp 来修改配置(新API)
AlterConfigOp retentionOp = new AlterConfigOp(
new ConfigEntry("retention.ms", "86400000"),
AlterConfigOp.OpType.SET
);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
configs.put(resource, Collections.singletonList(retentionOp));
AlterConfigsResult result = admin.incrementalAlterConfigs(configs, new AlterConfigsOptions());
result.all().get();
System.out.println("配置修改成功");
}
}
}
2.6 删除Topic
import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class DeleteTopicAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
// 新版本使用 TopicCollection.ofTopicNames()
TopicCollection topicCollection = TopicCollection.ofTopicNames(Collections.singleton("java-topic"));
DeleteTopicsResult result = admin.deleteTopics(topicCollection, new DeleteTopicsOptions());
result.all().get();
System.out.println("Topic删除成功");
}
}
}
三,生产者相关API
3.1 基础生产者
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class BasicProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"cli-test", "key-001", "Hello Kafka Java"
);
// 异步发送
producer.send(record);
producer.flush();
}
}
}
3.2 同步发送
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SyncProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"cli-test", "sync-key", "Sync message"
);
// 同步发送:等待结果返回
RecordMetadata metadata = producer.send(record).get();
System.out.println("同步发送成功");
System.out.println("分区: " + metadata.partition());
System.out.println("偏移量: " + metadata.offset());
}
}
}
3.3 带回调的生产者
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CallbackProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 可选配置
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 5);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"cli-test", "key-callback", "Message with callback"
);
// 匿名内部类方式
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("发送成功!");
System.out.println("Topic: " + metadata.topic());
System.out.println("Partition: " + metadata.partition());
System.out.println("Offset: " + metadata.offset());
} else {
System.err.println("发送失败: " + exception.getMessage());
}
}
});
// Lambda表达式方式
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("成功: " + metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.flush();
}
}
}
3.4 发送到指定分区
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class PartitionProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// 方式1:直接指定分区号
ProducerRecord<String, String> record1 = new ProducerRecord<>(
"cli-test", // topic
2, // 指定分区号(从0开始)
"key-part", // key
"指定分区的消息" // value
);
// 方式2:通过key的hash决定分区(相同key进同一分区)
ProducerRecord<String, String> record2 = new ProducerRecord<>(
"cli-test",
"user-001", // 相同key会进入同一分区
"User message"
);
producer.send(record1);
producer.send(record2);
producer.flush();
}
}
}
3.5 发送带Header的消息
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class HeaderProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"cli-test",
"key-header",
"Message with headers"
);
// 添加headers
record.headers()
.add("trace-id", "123456".getBytes())
.add("source", "java-api".getBytes())
.add("user-id", "user001".getBytes());
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("带Header的消息发送成功");
}
});
producer.flush();
}
}
}
3.6 生产者配置参数
Properties props = new Properties();
// 必填配置
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 可靠性配置
props.put("acks", "all"); // 0/1/all,all最可靠
props.put("retries", 3); // 重试次数
props.put("enable.idempotence", "true"); // 幂等性,防止重复
props.put("max.in.flight.requests.per.connection", 5); // 最大未确认请求数
// 性能配置
props.put("batch.size", 16384); // 批次大小(字节)
props.put("linger.ms", 5); // 延迟发送时间(毫秒)
props.put("buffer.memory", 33554432); // 缓冲区大小(32MB)
props.put("compression.type", "snappy"); // 压缩:gzip/snappy/lz4/zstd
props.put("max.request.size", 1048576); // 最大请求大小(1MB)
// 超时配置
props.put("request.timeout.ms", 30000); // 请求超时
props.put("delivery.timeout.ms", 120000); // 发送超时
props.put("retry.backoff.ms", 100); // 重试间隔
四,消费者相关API
4.1 基础消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class BasicConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "basic-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("cli-test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
}
}
}
4.2 手动提交偏移量
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ManualCommitConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "manual-commit-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("auto.offset.reset", "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("cli-test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("处理消息: " + record.value());
// 处理业务逻辑...
}
// 手动同步提交
consumer.commitSync();
System.out.println("偏移量已提交");
// 或者异步提交
// consumer.commitAsync();
}
}
}
}
4.3 指定分区消费
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class PartitionConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "partition-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// 指定分区
TopicPartition partition = new TopicPartition("cli-test", 0);
consumer.assign(Arrays.asList(partition));
// 从指定偏移量开始消费
consumer.seek(partition, 10); // 从第10条开始
// 从头开始
// consumer.seekToBeginning(Arrays.asList(partition));
// 从尾开始
// consumer.seekToEnd(Arrays.asList(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("分区=%d, offset=%d, value=%s%n",
record.partition(), record.offset(), record.value());
}
consumer.commitSync();
}
}
}
}
4.4 控制消费数量
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
public class LimitedConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "limited-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.records", "10"); // 每次最多拉取10条
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("cli-test"));
AtomicInteger count = new AtomicInteger(0);
int maxMessages = 50; // 最多消费50条
while (count.get() < maxMessages) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
if (count.incrementAndGet() > maxMessages) {
break;
}
System.out.println("消费第 " + count.get() + " 条: " + record.value());
}
consumer.commitSync();
}
System.out.println("已消费 " + maxMessages + " 条消息,退出");
}
}
}
4.5 消费者配置参数
Properties props = new Properties();
// 必填配置
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 偏移量配置
props.put("auto.offset.reset", "earliest"); // earliest/latest/none
props.put("enable.auto.commit", "true"); // 自动提交
props.put("auto.commit.interval.ms", "5000"); // 自动提交间隔
// 拉取配置
props.put("max.poll.records", "500"); // 每次最大拉取条数
props.put("max.poll.interval.ms", "300000"); // 拉取间隔
props.put("fetch.min.bytes", "1"); // 最小拉取字节数
props.put("fetch.max.wait.ms", "500"); // 最大等待时间
// 会话配置
props.put("session.timeout.ms", "45000"); // 会话超时
props.put("heartbeat.interval.ms", "3000"); // 心跳间隔
你说得对!根据源码,Kafka 4.1 版本开始,ConsumerGroupListing和相关 API 已经被标记为废弃。让我重新修正消费者组相关 API:
五,消费者组相关API
5.1 查看消费者组列表
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ListGroupsAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.116.127:9092");
try (Admin admin = Admin.create(props)) {
// 使用 listGroups 替代废弃的 listConsumerGroups
ListGroupsResult result = admin.listGroups(new ListGroupsOptions());
// 获取所有组
Collection<GroupListing> groups = result.all().get();
System.out.println("=== 所有组列表 ===");
for (GroupListing group : groups) {
System.out.printf("组ID: %s%n", group.groupId());
// 获取组类型(CLASSIC 或 STREAMS)
Optional<GroupType> type = group.type();
System.out.printf(" 类型: %s%n", type.map(GroupType::toString).orElse("unknown"));
// 获取协议
System.out.printf(" 协议: %s%n", group.protocol());
// 获取组状态
Optional<GroupState> state = group.groupState();
System.out.printf(" 状态: %s%n", state.map(GroupState::toString).orElse("unknown"));
// 判断是否为简单消费者组
System.out.printf(" 是否简单消费者组: %s%n", group.isSimpleConsumerGroup());
System.out.println();
}
}
}
}
5.2 查看消费者组详情
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class DescribeConsumerGroupAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
// describeConsumerGroups 仍然可用
DescribeConsumerGroupsResult result = admin.describeConsumerGroups(
Collections.singleton("my-spring-group"),
new DescribeConsumerGroupsOptions()
);
ConsumerGroupDescription desc = result.all().get().get("my-spring-group");
System.out.println("=== 消费者组详情 ===");
System.out.println("组ID: " + desc.groupId());
System.out.println("是否简单消费者组: " + desc.isSimpleConsumerGroup());
System.out.println("组类型: " + desc.type()); // CLASSIC 或 STREAMS
System.out.println("组状态: " + desc.groupState()); // STABLE, EMPTY, COMPLETING_REBALANCE 等
System.out.println("分区分配策略: " + desc.partitionAssignor());
System.out.println("协调器: " + desc.coordinator());
// 新版本新增的字段
System.out.println("组纪元: " + desc.groupEpoch().orElse(null));
System.out.println("目标分配纪元: " + desc.targetAssignmentEpoch().orElse(null));
System.out.println("\n成员列表 (" + desc.members().size() + "):");
for (MemberDescription member : desc.members()) {
System.out.printf(" - 成员ID: %s%n", member.consumerId());
System.out.printf(" Host: %s%n", member.host());
System.out.printf(" Client ID: %s%n", member.clientId());
if (member.assignment() != null && !member.assignment().topicPartitions().isEmpty()) {
System.out.printf(" 分配的分区: %s%n", member.assignment().topicPartitions());
}
}
}
}
}
5.3 查看消费者组偏移量
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ListConsumerGroupOffsetsAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
// listConsumerGroupOffsets 仍然可用
ListConsumerGroupOffsetsResult result = admin.listConsumerGroupOffsets(
"my-spring-group",
new ListConsumerGroupOffsetsOptions()
);
Map<TopicPartition, OffsetAndMetadata> offsets = result.partitionsToOffsetAndMetadata().get();
System.out.println("=== 消费进度 ===");
if (offsets.isEmpty()) {
System.out.println("该组没有消费记录");
return;
}
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition tp = entry.getKey();
OffsetAndMetadata offset = entry.getValue();
System.out.printf("Topic: %s, 分区: %d, 当前偏移量: %d, 元数据: %s%n",
tp.topic(), tp.partition(), offset.offset(), offset.metadata());
}
}
}
}
5.4 重置消费者组偏移量
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ResetConsumerGroupOffsetsAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
Map<TopicPartition, OffsetAndMetadata> newOffsets = new HashMap<>();
// 重置到0
TopicPartition tp = new TopicPartition("cli-test", 0);
newOffsets.put(tp, new OffsetAndMetadata(0));
AlterConsumerGroupOffsetsResult result = admin.alterConsumerGroupOffsets(
"my-spring-group",
newOffsets,
new AlterConsumerGroupOffsetsOptions()
);
result.all().get();
System.out.println("偏移量重置成功");
}
}
}
5.5 删除消费者组
import org.apache.kafka.clients.admin.*;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class DeleteConsumerGroupAPI {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
// deleteConsumerGroups 仍然可用
DeleteConsumerGroupsResult result = admin.deleteConsumerGroups(
Arrays.asList("old-group1", "old-group2"),
new DeleteConsumerGroupsOptions()
);
result.all().get();
System.out.println("消费者组删除成功");
}
}
}
好的,我来帮你补全这个事务笔记框架:
六,Kafka事务详解
Kafka 事务是 Kafka 0.11.0.0 版本引入的重要特性,用于实现精确一次(Exactly-Once)语义。
6.1 什么是Kafka事务?
事务允许生产者原子性地向多个分区发送一批消息,这些消息要么全部成功,要么全部失败。同时支持跨会话的幂等性。
6.2 为什么需要事务?
// 没有事务的问题场景
// 场景1:消费-生产原子性问题
consumer.poll(); // 从topic A消费消息
processMessage(); // 处理消息
producer.send(topic B); // 发送结果到topic B
consumer.commit(); // 提交偏移量
// 问题:如果发送消息后、提交偏移量前程序崩溃
// 结果:消息被重复处理(重新消费 + 重复发送)
// 场景2:多分区原子性写入
producer.send(topicA, msg1); // 写入分区1
producer.send(topicB, msg2); // 写入分区2
// 问题:msg1成功但msg2失败,数据不一致
6.3 事务核心概念
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
public class TransactionConcepts {
// 1. 事务相关配置
public static Properties getTransactionalConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// 事务必须开启幂等性
props.put("enable.idempotence", "true");
// 生产者事务ID(唯一标识)
props.put("transactional.id", "my-transactional-id");
// 事务超时时间(默认1分钟)
props.put("transaction.timeout.ms", "60000");
// 序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
// 2. 消费者事务隔离级别
public static Properties getIsolationLevelConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "transactional-consumer");
// 隔离级别:
// read_uncommitted: 可以读取未提交的事务消息(包括最终可能回滚的)
// read_committed: 只读取已提交的事务消息
props.put("isolation.level", "read_committed");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
return props;
}
}
6.4 事务生产者完整示例
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.ProducerFencedException;
import java.util.Properties;
public class TransactionalProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 事务必须开启幂等性
props.put("enable.idempotence", "true");
// 设置事务ID(唯一标识一个生产者实例)
props.put("transactional.id", "txn-producer-001");
// 事务超时
props.put("transaction.timeout.ms", "30000");
// 生产者的重试次数
props.put("retries", Integer.MAX_VALUE);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// 1. 初始化事务(必须首先调用)
producer.initTransactions();
// 2. 开始事务
producer.beginTransaction();
try {
// 3. 发送消息到多个topic
ProducerRecord<String, String> record1 = new ProducerRecord<>(
"topic-A", "key1", "消息1"
);
producer.send(record1);
ProducerRecord<String, String> record2 = new ProducerRecord<>(
"topic-B", "key2", "消息2"
);
producer.send(record2);
// 也可以发送到指定分区
ProducerRecord<String, String> record3 = new ProducerRecord<>(
"topic-C", 2, "key3", "消息3"
);
producer.send(record3);
// 4. 提交事务(所有消息对外可见)
producer.commitTransaction();
System.out.println("事务提交成功,所有消息已发送");
} catch (Exception e) {
// 5. 发生异常,回滚事务
producer.abortTransaction();
System.err.println("事务回滚,消息未发送: " + e.getMessage());
}
}
}
}
6.5 消费-生产事务(原子性读写)
这是最常见的场景:从 Kafka 消费消息,处理后发送到另一个 topic,保证原子性。
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class ConsumeTransformProduceTransactional {
public static void main(String[] args) {
// 1. 配置生产者(事务性)
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "consume-transform-produce");
producerProps.put("transaction.timeout.ms", "60000");
// 2. 配置消费者
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "transactional-consumer-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置为 read_committed,只读已提交的消息
consumerProps.put("isolation.level", "read_committed");
consumerProps.put("auto.offset.reset", "earliest");
// 关闭自动提交
consumerProps.put("enable.auto.commit", "false");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
// 3. 订阅消费的topic
consumer.subscribe(Arrays.asList("source-topic"));
// 4. 初始化生产者事务
producer.initTransactions();
while (true) {
// 5. 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) {
continue;
}
// 6. 开始事务
producer.beginTransaction();
try {
// 7. 处理每条消息并发送到目标topic
for (ConsumerRecord<String, String> record : records) {
String processedValue = "PROCESSED: " + record.value();
ProducerRecord<String, String> outputRecord = new ProducerRecord<>(
"target-topic",
record.key(),
processedValue
);
producer.send(outputRecord);
System.out.printf("已处理: offset=%d, value=%s%n",
record.offset(), record.value());
}
// 8. 准备偏移量
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
// 9. 从消费者获取真实的元数据(推荐方式,不使用废弃的构造函数)
ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
// 10. 将消费者的偏移量发送到事务中
producer.sendOffsetsToTransaction(offsets, groupMetadata);
// 11. 提交事务(消息发送 + 偏移量提交 原子性完成)
producer.commitTransaction();
System.out.println("事务提交成功");
} catch (Exception e) {
// 12. 出错则回滚
producer.abortTransaction();
System.err.println("事务回滚: " + e.getMessage());
}
}
}
}
}
6.6 事务API详解
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class TransactionAPIExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-transactional-id");
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "192.168.116.127:9092");
consumerProps.put("group.id", "transactional-consumer-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置为 read_committed,只读已提交的消息
consumerProps.put("isolation.level", "read_committed");
consumerProps.put("auto.offset.reset", "earliest");
// 关闭自动提交
consumerProps.put("enable.auto.commit", "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// 1. initTransactions() - 初始化事务
// 必须在发送消息前调用,会:
// - 向事务协调器注册
// - 恢复或中止之前的未完成事务
producer.initTransactions();
// 2. beginTransaction() - 开始事务
// 标记一个新事务的开始
producer.beginTransaction();
try {
// 3. send() - 发送消息(在事务内)
producer.send(new ProducerRecord<>("topic1", "msg1"));
producer.send(new ProducerRecord<>("topic2", "msg2"));
// 4. sendOffsetsToTransaction() - 发送消费偏移量
// 将消费者组偏移量作为事务的一部分提交
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("source-topic", 0),
new OffsetAndMetadata(100));
// 注意:使用 consumer.groupMetadata() 获取元数据,而不是直接构造
ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
producer.sendOffsetsToTransaction(offsets, groupMetadata);
// 5. commitTransaction() - 提交事务
// 所有消息和偏移量原子性对外可见
producer.commitTransaction();
} catch (Exception e) {
// 6. abortTransaction() - 回滚事务
// 所有消息被丢弃,偏移量不提交
producer.abortTransaction();
e.printStackTrace();
}
producer.close();
}
}
6.7 事务消费者
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class TransactionalConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 重要:设置隔离级别
// read_committed: 只读取已提交的事务消息
// read_uncommitted: 读取所有消息(包括未提交的)
props.put("isolation.level", "read_committed");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("target-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// read_committed模式下,只能看到已提交的消息
System.out.printf("offset=%d, key=%s, value=%s%n",
record.offset(), record.key(), record.value());
}
}
}
}
}
6.8 事务高级特性
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class AdvancedTransactionalProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");
// 1. 事务ID的唯一性
// 同一个transactional.id同时只能有一个活跃生产者
// 新的生产者会"隔离"(fence)旧的,防止重复写入
props.put("transactional.id", "unique-transactional-id");
// 2. 事务超时
props.put("transaction.timeout.ms", "30000");
// 3. 生产者的重试配置
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", "5");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 4. 多个事务串行执行
for (int i = 0; i < 10; i++) {
producer.initTransactions();
producer.beginTransaction();
try {
producer.send(new ProducerRecord<>("topic", "batch-" + i + "-msg1"));
producer.send(new ProducerRecord<>("topic", "batch-" + i + "-msg2"));
producer.commitTransaction();
System.out.println("批次 " + i + " 事务提交成功");
} catch (Exception e) {
producer.abortTransaction();
System.err.println("批次 " + i + " 事务回滚");
}
}
producer.close();
}
}
6.9 事务处理器的Fencing机制
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.ProducerFencedException;
public class FencingMechanism {
// 同一个transactional.id只能有一个活跃生产者
// 如果启动第二个相同ID的生产者,第一个会被fence
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "same-id");
props.put("enable.idempotence", "true");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", "message"));
// 如果此时另一个相同ID的生产者启动了
// 这个生产者再发送消息会抛出 ProducerFencedException
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 被隔离了,需要关闭生产者
System.err.println("生产者被隔离: " + e.getMessage());
producer.close();
}
}
}
6.10 事务的限制和注意事项
public class TransactionLimitations {
/**
* 1. 事务不能跨Kafka集群
*/
public static void limitation1() {
// 事务只能在一个Kafka集群内有效
// 不能原子性地写入两个不同的集群
}
/**
* 2. 事务内的消息不能太多或太大
*/
public static void limitation2() {
// 事务协调器会在内存中保存事务信息
// 建议单事务消息数 < 10000
// 建议单事务总大小 < 100MB
}
/**
* 3. 事务超时后会自动中止
*/
public static void limitation3() {
// 如果事务超过 transaction.timeout.ms 未提交
// 事务协调器会自动中止事务
}
/**
* 4. 生产者崩溃后的恢复
*/
public static void limitation4() {
// 生产者崩溃后,新生产者使用相同transactional.id
// 会自动中止之前未完成的事务
}
/**
* 5. 性能开销
*/
public static void limitation5() {
// 事务会增加延迟(需要与事务协调器通信)
// 建议只在需要精确一次语义的场景使用
// 普通场景使用幂等性生产者即可
}
/**
* 6. ConsumerGroupMetadata 构造器已废弃
*/
public static void limitation6() {
// Kafka 4.2+ 版本中 ConsumerGroupMetadata 的构造器已废弃
// 正确方式:使用 consumer.groupMetadata() 获取
// 错误方式:new ConsumerGroupMetadata(groupId)
}
}
6.11 事务使用场景
| 场景 | 是否需要事务 | 说明 |
|---|---|---|
| 普通日志收集 | ❌ 不需要 | at-least-once 就够了 |
| 金融交易 | ✅ 需要 | 不能丢也不能重复 |
| 库存扣减 | ✅ 需要 | 精确一次语义 |
| 实时ETL | ✅ 需要 | 消费-处理-生产原子性 |
| 监控告警 | ❌ 不需要 | 丢几条无所谓 |
| 点击流分析 | ❌ 不需要 | 允许少量重复 |
| 跨分区写入 | ✅ 需要 | 保证多分区原子性 |
| 数据库CDC | ✅ 需要 | 精确一次同步 |
6.12 总结
事务解决了三个核心问题:
- 原子性写入:跨多个topic/分区的原子写入
- 消费-生产原子性:消费和生产的原子性
- 故障恢复:崩溃后的状态恢复
关键 API(Kafka 4.2+):
| 方法 | 说明 |
|---|---|
initTransactions() | 初始化事务 |
beginTransaction() | 开始事务 |
sendOffsetsToTransaction(offsets, groupMetadata) | 提交消费偏移量 |
commitTransaction() | 提交事务 |
abortTransaction() | 回滚事务 |
重要注意事项:
- 必须设置
enable.idempotence=true - 必须设置
transactional.id - 消费端设置
isolation.level=read_committed - 不要直接构造
ConsumerGroupMetadata,使用consumer.groupMetadata() - 事务不能跨集群
- 事务有性能开销,只在需要时使用
使用建议:
- 只在需要精确一次语义时使用
- 事务ID要有明确的命名规范,是生产者实例的标识,不是消息ID,应该用固定值(如服务名),这样才能在重启后恢复事务
- 合理设置事务超时时间
- 消费端记得设置
isolation.level=read_committed - 使用
consumer.groupMetadata()获取元数据,不要手动构造
原创
Java操作Kafka
本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。