一,快速入门

  1. 引入 maven 依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>4.2.0</version>
    </dependency>
    
  2. Producer

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    
    public class QuickStartProducer {
        public static void main(String[] args) {
            // 1. 配置
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            
            // 2. 创建生产者
            try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
                
                // 3. 发送消息
                for (int i = 0; i < 10; i++) {
                    ProducerRecord<String, String> record = new ProducerRecord<>(
                        "cli-test",
                        "key-" + i,
                        "Hello Kafka " + i
                    );
                    
                    producer.send(record, (metadata, exception) -> {
                        if (exception == null) {
                            System.out.printf("发送成功: topic=%s, partition=%d, offset=%d%n",
                                metadata.topic(), metadata.partition(), metadata.offset());
                        } else {
                            exception.printStackTrace();
                        }
                    });
                }
                
                // 4. 确保所有消息都发送出去
                producer.flush();
            }
        }
    }
    
  3. Consumer

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class QuickStartConsumer {
        public static void main(String[] args) {
            // 1. 配置
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "quick-start-group");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("auto.offset.reset", "earliest");  // 从头开始消费
            
            // 2. 创建消费者
            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                
                // 3. 订阅topic
                consumer.subscribe(Arrays.asList("cli-test"));
                
                // 4. 拉取并消费消息
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                    
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("收到消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                            record.topic(), record.partition(), record.offset(),
                            record.key(), record.value());
                    }
                }
            }
        }
    }
    
  4. 创建 topic

    # 步骤1:确认Kafka已启动
    kafka-topics --list --bootstrap-server localhost:9092
    
    # 步骤2:创建topic(如果不存在)
    kafka-topics --create \
      --bootstrap-server localhost:9092 \
      --topic cli-test \
      --partitions 3 \
      --replication-factor 1
    

好的,我来帮你补全这个笔记的剩余部分:

二,Topic相关API

2.1 创建Topic

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.errors.TopicExistsException;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class TopicAdminAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 配置AdminClient
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        // 2. 创建AdminClient
        try (AdminClient admin = AdminClient.create(props)) {
            
            // 3. 创建Topic
            NewTopic newTopic = new NewTopic("java-topic", 3, (short) 1);
            
            CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
            
            // 等待创建完成
            result.all().get();
            System.out.println("Topic创建成功");
        }
    }
}

2.2 查看Topic列表

import org.apache.kafka.clients.admin.*;
import java.util.Collection;
import java.util.concurrent.ExecutionException;

public class ListTopicsAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        try (AdminClient admin = AdminClient.create(props)) {
            // 列出所有topic
            ListTopicsResult topics = admin.listTopics();
            
            Collection<TopicListing> listings = topics.listings().get();
            
            for (TopicListing listing : listings) {
                System.out.printf("Topic: %s, isInternal: %s%n",
                    listing.name(), listing.isInternal());
            }
        }
    }
}

2.3 查看Topic详情

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;

public class DescribeTopicAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        try (AdminClient admin = AdminClient.create(props)) {
            // 查看topic详情
            DescribeTopicsResult result = admin.describeTopics(Arrays.asList("java-topic"));
            
            TopicDescription description = result.values().get("java-topic").get();
            
            System.out.println("Topic名称: " + description.name());
            System.out.println("分区数: " + description.partitions().size());
            
            for (TopicPartitionInfo partition : description.partitions()) {
                System.out.printf("分区: %d, Leader: %s, Replicas: %s, ISR: %s%n",
                    partition.partition(),
                    partition.leader(),
                    partition.replicas(),
                    partition.isr());
            }
        }
    }
}

2.4 修改Topic(增加分区)

import org.apache.kafka.clients.admin.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class AlterTopicAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        try (Admin admin = Admin.create(props)) {
            // 增加分区
            Map<String, NewPartitions> newPartitions = new HashMap<>();
            newPartitions.put("java-topic", NewPartitions.increaseTo(5));
            CreatePartitionsResult result = admin.createPartitions(newPartitions, new CreatePartitionsOptions());
            result.all().get();
            System.out.println("分区数已增加到5");
        }
    }
}

2.5 修改Topic配置

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import java.util.*;
import java.util.concurrent.ExecutionException;

public class AlterTopicConfigAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        try (Admin admin = Admin.create(props)) {
            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "java-topic");
            
            // 使用 AlterConfigOp 来修改配置(新API)
            AlterConfigOp retentionOp = new AlterConfigOp(
                new ConfigEntry("retention.ms", "86400000"),
                AlterConfigOp.OpType.SET
            );
            
            Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
            configs.put(resource, Collections.singletonList(retentionOp));
            
            AlterConfigsResult result = admin.incrementalAlterConfigs(configs, new AlterConfigsOptions());
            result.all().get();
            System.out.println("配置修改成功");
        }
    }
}

2.6 删除Topic

import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class DeleteTopicAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        try (Admin admin = Admin.create(props)) {
            // 新版本使用 TopicCollection.ofTopicNames()
            TopicCollection topicCollection = TopicCollection.ofTopicNames(Collections.singleton("java-topic"));
            DeleteTopicsResult result = admin.deleteTopics(topicCollection, new DeleteTopicsOptions());
            result.all().get();
            System.out.println("Topic删除成功");
        }
    }
}

三,生产者相关API

3.1 基础生产者

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class BasicProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "cli-test", "key-001", "Hello Kafka Java"
            );
            
            // 异步发送
            producer.send(record);
            producer.flush();
        }
    }
}

3.2 同步发送

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class SyncProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "cli-test", "sync-key", "Sync message"
            );
            
            // 同步发送:等待结果返回
            RecordMetadata metadata = producer.send(record).get();
            
            System.out.println("同步发送成功");
            System.out.println("分区: " + metadata.partition());
            System.out.println("偏移量: " + metadata.offset());
        }
    }
}

3.3 带回调的生产者

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CallbackProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // 可选配置
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("linger.ms", 5);
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "cli-test", "key-callback", "Message with callback"
            );
            
            // 匿名内部类方式
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("发送成功!");
                        System.out.println("Topic: " + metadata.topic());
                        System.out.println("Partition: " + metadata.partition());
                        System.out.println("Offset: " + metadata.offset());
                    } else {
                        System.err.println("发送失败: " + exception.getMessage());
                    }
                }
            });
            
            // Lambda表达式方式
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("成功: " + metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            });
            
            producer.flush();
        }
    }
}

3.4 发送到指定分区

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class PartitionProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            
            // 方式1:直接指定分区号
            ProducerRecord<String, String> record1 = new ProducerRecord<>(
                "cli-test",  // topic
                2,           // 指定分区号(从0开始)
                "key-part",  // key
                "指定分区的消息"  // value
            );
            
            // 方式2:通过key的hash决定分区(相同key进同一分区)
            ProducerRecord<String, String> record2 = new ProducerRecord<>(
                "cli-test",
                "user-001",  // 相同key会进入同一分区
                "User message"
            );
            
            producer.send(record1);
            producer.send(record2);
            
            producer.flush();
        }
    }
}

3.5 发送带Header的消息

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class HeaderProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "cli-test",
                "key-header",
                "Message with headers"
            );
            
            // 添加headers
            record.headers()
                .add("trace-id", "123456".getBytes())
                .add("source", "java-api".getBytes())
                .add("user-id", "user001".getBytes());
            
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("带Header的消息发送成功");
                }
            });
            
            producer.flush();
        }
    }
}

3.6 生产者配置参数

Properties props = new Properties();

// 必填配置
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 可靠性配置
props.put("acks", "all");                    // 0/1/all,all最可靠
props.put("retries", 3);                     // 重试次数
props.put("enable.idempotence", "true");     // 幂等性,防止重复
props.put("max.in.flight.requests.per.connection", 5);  // 最大未确认请求数

// 性能配置
props.put("batch.size", 16384);              // 批次大小(字节)
props.put("linger.ms", 5);                   // 延迟发送时间(毫秒)
props.put("buffer.memory", 33554432);        // 缓冲区大小(32MB)
props.put("compression.type", "snappy");     // 压缩:gzip/snappy/lz4/zstd
props.put("max.request.size", 1048576);      // 最大请求大小(1MB)

// 超时配置
props.put("request.timeout.ms", 30000);      // 请求超时
props.put("delivery.timeout.ms", 120000);    // 发送超时
props.put("retry.backoff.ms", 100);          // 重试间隔

四,消费者相关API

4.1 基础消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class BasicConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "basic-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList("cli-test"));
            
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                        record.topic(), record.partition(), record.offset(),
                        record.key(), record.value());
                }
            }
        }
    }
}

4.2 手动提交偏移量

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ManualCommitConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "manual-commit-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");  // 关闭自动提交
        props.put("auto.offset.reset", "earliest");
        
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList("cli-test"));
            
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("处理消息: " + record.value());
                    
                    // 处理业务逻辑...
                }
                
                // 手动同步提交
                consumer.commitSync();
                System.out.println("偏移量已提交");
                
                // 或者异步提交
                // consumer.commitAsync();
            }
        }
    }
}

4.3 指定分区消费

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class PartitionConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "partition-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");
        
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // 指定分区
            TopicPartition partition = new TopicPartition("cli-test", 0);
            consumer.assign(Arrays.asList(partition));
            
            // 从指定偏移量开始消费
            consumer.seek(partition, 10);  // 从第10条开始
            
            // 从头开始
            // consumer.seekToBeginning(Arrays.asList(partition));
            
            // 从尾开始
            // consumer.seekToEnd(Arrays.asList(partition));
            
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("分区=%d, offset=%d, value=%s%n",
                        record.partition(), record.offset(), record.value());
                }
                
                consumer.commitSync();
            }
        }
    }
}

4.4 控制消费数量

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

public class LimitedConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "limited-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.records", "10");  // 每次最多拉取10条
        
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList("cli-test"));
            
            AtomicInteger count = new AtomicInteger(0);
            int maxMessages = 50;  // 最多消费50条
            
            while (count.get() < maxMessages) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    if (count.incrementAndGet() > maxMessages) {
                        break;
                    }
                    System.out.println("消费第 " + count.get() + " 条: " + record.value());
                }
                
                consumer.commitSync();
            }
            
            System.out.println("已消费 " + maxMessages + " 条消息,退出");
        }
    }
}

4.5 消费者配置参数

Properties props = new Properties();

// 必填配置
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 偏移量配置
props.put("auto.offset.reset", "earliest");  // earliest/latest/none
props.put("enable.auto.commit", "true");     // 自动提交
props.put("auto.commit.interval.ms", "5000"); // 自动提交间隔

// 拉取配置
props.put("max.poll.records", "500");         // 每次最大拉取条数
props.put("max.poll.interval.ms", "300000");  // 拉取间隔
props.put("fetch.min.bytes", "1");            // 最小拉取字节数
props.put("fetch.max.wait.ms", "500");        // 最大等待时间

// 会话配置
props.put("session.timeout.ms", "45000");     // 会话超时
props.put("heartbeat.interval.ms", "3000");   // 心跳间隔

你说得对!根据源码,Kafka 4.1 版本开始,ConsumerGroupListing和相关 API 已经被标记为废弃。让我重新修正消费者组相关 API:

五,消费者组相关API

5.1 查看消费者组列表

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ListGroupsAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.116.127:9092");

        try (Admin admin = Admin.create(props)) {
            // 使用 listGroups 替代废弃的 listConsumerGroups
            ListGroupsResult result = admin.listGroups(new ListGroupsOptions());

            // 获取所有组
            Collection<GroupListing> groups = result.all().get();

            System.out.println("=== 所有组列表 ===");
            for (GroupListing group : groups) {
                System.out.printf("组ID: %s%n", group.groupId());

                // 获取组类型(CLASSIC 或 STREAMS)
                Optional<GroupType> type = group.type();
                System.out.printf("  类型: %s%n", type.map(GroupType::toString).orElse("unknown"));

                // 获取协议
                System.out.printf("  协议: %s%n", group.protocol());

                // 获取组状态
                Optional<GroupState> state = group.groupState();
                System.out.printf("  状态: %s%n", state.map(GroupState::toString).orElse("unknown"));

                // 判断是否为简单消费者组
                System.out.printf("  是否简单消费者组: %s%n", group.isSimpleConsumerGroup());
                System.out.println();
            }
        }
    }
}

5.2 查看消费者组详情

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class DescribeConsumerGroupAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        try (Admin admin = Admin.create(props)) {
            // describeConsumerGroups 仍然可用
            DescribeConsumerGroupsResult result = admin.describeConsumerGroups(
                Collections.singleton("my-spring-group"),
                new DescribeConsumerGroupsOptions()
            );
            
            ConsumerGroupDescription desc = result.all().get().get("my-spring-group");
            
            System.out.println("=== 消费者组详情 ===");
            System.out.println("组ID: " + desc.groupId());
            System.out.println("是否简单消费者组: " + desc.isSimpleConsumerGroup());
            System.out.println("组类型: " + desc.type());  // CLASSIC 或 STREAMS
            System.out.println("组状态: " + desc.groupState());  // STABLE, EMPTY, COMPLETING_REBALANCE 等
            System.out.println("分区分配策略: " + desc.partitionAssignor());
            System.out.println("协调器: " + desc.coordinator());
            
            // 新版本新增的字段
            System.out.println("组纪元: " + desc.groupEpoch().orElse(null));
            System.out.println("目标分配纪元: " + desc.targetAssignmentEpoch().orElse(null));
            
            System.out.println("\n成员列表 (" + desc.members().size() + "):");
            for (MemberDescription member : desc.members()) {
                System.out.printf("  - 成员ID: %s%n", member.consumerId());
                System.out.printf("    Host: %s%n", member.host());
                System.out.printf("    Client ID: %s%n", member.clientId());
                if (member.assignment() != null && !member.assignment().topicPartitions().isEmpty()) {
                    System.out.printf("    分配的分区: %s%n", member.assignment().topicPartitions());
                }
            }
        }
    }
}

5.3 查看消费者组偏移量

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ListConsumerGroupOffsetsAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        try (Admin admin = Admin.create(props)) {
            // listConsumerGroupOffsets 仍然可用
            ListConsumerGroupOffsetsResult result = admin.listConsumerGroupOffsets(
                "my-spring-group",
                new ListConsumerGroupOffsetsOptions()
            );
            
            Map<TopicPartition, OffsetAndMetadata> offsets = result.partitionsToOffsetAndMetadata().get();
            
            System.out.println("=== 消费进度 ===");
            if (offsets.isEmpty()) {
                System.out.println("该组没有消费记录");
                return;
            }
            
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetAndMetadata offset = entry.getValue();
                System.out.printf("Topic: %s, 分区: %d, 当前偏移量: %d, 元数据: %s%n",
                    tp.topic(), tp.partition(), offset.offset(), offset.metadata());
            }
        }
    }
}

5.4 重置消费者组偏移量

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ResetConsumerGroupOffsetsAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        try (Admin admin = Admin.create(props)) {
            Map<TopicPartition, OffsetAndMetadata> newOffsets = new HashMap<>();
            
            // 重置到0
            TopicPartition tp = new TopicPartition("cli-test", 0);
            newOffsets.put(tp, new OffsetAndMetadata(0));
            
            AlterConsumerGroupOffsetsResult result = admin.alterConsumerGroupOffsets(
                "my-spring-group",
                newOffsets,
                new AlterConsumerGroupOffsetsOptions()
            );
            
            result.all().get();
            System.out.println("偏移量重置成功");
        }
    }
}

5.5 删除消费者组

import org.apache.kafka.clients.admin.*;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class DeleteConsumerGroupAPI {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        try (Admin admin = Admin.create(props)) {
            // deleteConsumerGroups 仍然可用
            DeleteConsumerGroupsResult result = admin.deleteConsumerGroups(
                Arrays.asList("old-group1", "old-group2"),
                new DeleteConsumerGroupsOptions()
            );
            
            result.all().get();
            System.out.println("消费者组删除成功");
        }
    }
}

好的,我来帮你补全这个事务笔记框架:

六,Kafka事务详解

Kafka 事务是 Kafka 0.11.0.0 版本引入的重要特性,用于实现精确一次(Exactly-Once)语义

6.1 什么是Kafka事务?

事务允许生产者原子性地向多个分区发送一批消息,这些消息要么全部成功,要么全部失败。同时支持跨会话的幂等性。

6.2 为什么需要事务?

// 没有事务的问题场景
// 场景1:消费-生产原子性问题
consumer.poll();           // 从topic A消费消息
processMessage();          // 处理消息
producer.send(topic B);    // 发送结果到topic B
consumer.commit();         // 提交偏移量

// 问题:如果发送消息后、提交偏移量前程序崩溃
// 结果:消息被重复处理(重新消费 + 重复发送)

// 场景2:多分区原子性写入
producer.send(topicA, msg1);  // 写入分区1
producer.send(topicB, msg2);  // 写入分区2
// 问题:msg1成功但msg2失败,数据不一致

6.3 事务核心概念

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;

public class TransactionConcepts {
    
    // 1. 事务相关配置
    public static Properties getTransactionalConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        
        // 事务必须开启幂等性
        props.put("enable.idempotence", "true");
        
        // 生产者事务ID(唯一标识)
        props.put("transactional.id", "my-transactional-id");
        
        // 事务超时时间(默认1分钟)
        props.put("transaction.timeout.ms", "60000");
        
        // 序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        return props;
    }
    
    // 2. 消费者事务隔离级别
    public static Properties getIsolationLevelConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "transactional-consumer");
        
        // 隔离级别:
        // read_uncommitted: 可以读取未提交的事务消息(包括最终可能回滚的)
        // read_committed: 只读取已提交的事务消息
        props.put("isolation.level", "read_committed");
        
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        
        return props;
    }
}

6.4 事务生产者完整示例

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.ProducerFencedException;
import java.util.Properties;

public class TransactionalProducer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // 事务必须开启幂等性
        props.put("enable.idempotence", "true");
        
        // 设置事务ID(唯一标识一个生产者实例)
        props.put("transactional.id", "txn-producer-001");
        
        // 事务超时
        props.put("transaction.timeout.ms", "30000");
        
        // 生产者的重试次数
        props.put("retries", Integer.MAX_VALUE);
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            
            // 1. 初始化事务(必须首先调用)
            producer.initTransactions();
            
            // 2. 开始事务
            producer.beginTransaction();
            
            try {
                // 3. 发送消息到多个topic
                ProducerRecord<String, String> record1 = new ProducerRecord<>(
                    "topic-A", "key1", "消息1"
                );
                producer.send(record1);
                
                ProducerRecord<String, String> record2 = new ProducerRecord<>(
                    "topic-B", "key2", "消息2"
                );
                producer.send(record2);
                
                // 也可以发送到指定分区
                ProducerRecord<String, String> record3 = new ProducerRecord<>(
                    "topic-C", 2, "key3", "消息3"
                );
                producer.send(record3);
                
                // 4. 提交事务(所有消息对外可见)
                producer.commitTransaction();
                System.out.println("事务提交成功,所有消息已发送");
                
            } catch (Exception e) {
                // 5. 发生异常,回滚事务
                producer.abortTransaction();
                System.err.println("事务回滚,消息未发送: " + e.getMessage());
            }
        }
    }
}

6.5 消费-生产事务(原子性读写)

这是最常见的场景:从 Kafka 消费消息,处理后发送到另一个 topic,保证原子性。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

public class ConsumeTransformProduceTransactional {
    
    public static void main(String[] args) {
        // 1. 配置生产者(事务性)
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("enable.idempotence", "true");
        producerProps.put("transactional.id", "consume-transform-produce");
        producerProps.put("transaction.timeout.ms", "60000");
        
        // 2. 配置消费者
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "localhost:9092");
        consumerProps.put("group.id", "transactional-consumer-group");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 设置为 read_committed,只读已提交的消息
        consumerProps.put("isolation.level", "read_committed");
        consumerProps.put("auto.offset.reset", "earliest");
        // 关闭自动提交
        consumerProps.put("enable.auto.commit", "false");
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
             KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
            
            // 3. 订阅消费的topic
            consumer.subscribe(Arrays.asList("source-topic"));
            
            // 4. 初始化生产者事务
            producer.initTransactions();
            
            while (true) {
                // 5. 拉取消息
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                if (records.isEmpty()) {
                    continue;
                }
                
                // 6. 开始事务
                producer.beginTransaction();
                
                try {
                    // 7. 处理每条消息并发送到目标topic
                    for (ConsumerRecord<String, String> record : records) {
                        String processedValue = "PROCESSED: " + record.value();
                        
                        ProducerRecord<String, String> outputRecord = new ProducerRecord<>(
                            "target-topic",
                            record.key(),
                            processedValue
                        );
                        producer.send(outputRecord);
                        
                        System.out.printf("已处理: offset=%d, value=%s%n", 
                            record.offset(), record.value());
                    }
                    
                    // 8. 准备偏移量
                    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    }
                    
                    // 9. 从消费者获取真实的元数据(推荐方式,不使用废弃的构造函数)
                    ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
                    
                    // 10. 将消费者的偏移量发送到事务中
                    producer.sendOffsetsToTransaction(offsets, groupMetadata);
                    
                    // 11. 提交事务(消息发送 + 偏移量提交 原子性完成)
                    producer.commitTransaction();
                    System.out.println("事务提交成功");
                    
                } catch (Exception e) {
                    // 12. 出错则回滚
                    producer.abortTransaction();
                    System.err.println("事务回滚: " + e.getMessage());
                }
            }
        }
    }
}

6.6 事务API详解


import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class TransactionAPIExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("enable.idempotence", "true");
        props.put("transactional.id", "my-transactional-id");

        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "192.168.116.127:9092");
        consumerProps.put("group.id", "transactional-consumer-group");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 设置为 read_committed,只读已提交的消息
        consumerProps.put("isolation.level", "read_committed");
        consumerProps.put("auto.offset.reset", "earliest");
        // 关闭自动提交
        consumerProps.put("enable.auto.commit", "false");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        // 1. initTransactions() - 初始化事务
        // 必须在发送消息前调用,会:
        // - 向事务协调器注册
        // - 恢复或中止之前的未完成事务
        producer.initTransactions();

        // 2. beginTransaction() - 开始事务
        // 标记一个新事务的开始
        producer.beginTransaction();

        try {
            // 3. send() - 发送消息(在事务内)
            producer.send(new ProducerRecord<>("topic1", "msg1"));
            producer.send(new ProducerRecord<>("topic2", "msg2"));

            // 4. sendOffsetsToTransaction() - 发送消费偏移量
            // 将消费者组偏移量作为事务的一部分提交
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            offsets.put(new TopicPartition("source-topic", 0),
                    new OffsetAndMetadata(100));

            // 注意:使用 consumer.groupMetadata() 获取元数据,而不是直接构造
            ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
            producer.sendOffsetsToTransaction(offsets, groupMetadata);

            // 5. commitTransaction() - 提交事务
            // 所有消息和偏移量原子性对外可见
            producer.commitTransaction();

        } catch (Exception e) {
            // 6. abortTransaction() - 回滚事务
            // 所有消息被丢弃,偏移量不提交
            producer.abortTransaction();
            e.printStackTrace();
        }

        producer.close();
    }
}

6.7 事务消费者

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class TransactionalConsumer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        // 重要:设置隔离级别
        // read_committed: 只读取已提交的事务消息
        // read_uncommitted: 读取所有消息(包括未提交的)
        props.put("isolation.level", "read_committed");
        
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true");
        
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList("target-topic"));
            
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    // read_committed模式下,只能看到已提交的消息
                    System.out.printf("offset=%d, key=%s, value=%s%n",
                        record.offset(), record.key(), record.value());
                }
            }
        }
    }
}

6.8 事务高级特性

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class AdvancedTransactionalProducer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("enable.idempotence", "true");
        
        // 1. 事务ID的唯一性
        // 同一个transactional.id同时只能有一个活跃生产者
        // 新的生产者会"隔离"(fence)旧的,防止重复写入
        props.put("transactional.id", "unique-transactional-id");
        
        // 2. 事务超时
        props.put("transaction.timeout.ms", "30000");
        
        // 3. 生产者的重试配置
        props.put("retries", Integer.MAX_VALUE);
        props.put("max.in.flight.requests.per.connection", "5");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // 4. 多个事务串行执行
        for (int i = 0; i < 10; i++) {
            producer.initTransactions();
            producer.beginTransaction();
            
            try {
                producer.send(new ProducerRecord<>("topic", "batch-" + i + "-msg1"));
                producer.send(new ProducerRecord<>("topic", "batch-" + i + "-msg2"));
                producer.commitTransaction();
                System.out.println("批次 " + i + " 事务提交成功");
            } catch (Exception e) {
                producer.abortTransaction();
                System.err.println("批次 " + i + " 事务回滚");
            }
        }
        
        producer.close();
    }
}

6.9 事务处理器的Fencing机制

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.ProducerFencedException;

public class FencingMechanism {
    
    // 同一个transactional.id只能有一个活跃生产者
    // 如果启动第二个相同ID的生产者,第一个会被fence
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("transactional.id", "same-id");
        props.put("enable.idempotence", "true");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        producer.initTransactions();
        
        try {
            producer.beginTransaction();
            producer.send(new ProducerRecord<>("topic", "message"));
            
            // 如果此时另一个相同ID的生产者启动了
            // 这个生产者再发送消息会抛出 ProducerFencedException
            producer.commitTransaction();
            
        } catch (ProducerFencedException e) {
            // 被隔离了,需要关闭生产者
            System.err.println("生产者被隔离: " + e.getMessage());
            producer.close();
        }
    }
}

6.10 事务的限制和注意事项

public class TransactionLimitations {
    
    /**
     * 1. 事务不能跨Kafka集群
     */
    public static void limitation1() {
        // 事务只能在一个Kafka集群内有效
        // 不能原子性地写入两个不同的集群
    }
    
    /**
     * 2. 事务内的消息不能太多或太大
     */
    public static void limitation2() {
        // 事务协调器会在内存中保存事务信息
        // 建议单事务消息数 < 10000
        // 建议单事务总大小 < 100MB
    }
    
    /**
     * 3. 事务超时后会自动中止
     */
    public static void limitation3() {
        // 如果事务超过 transaction.timeout.ms 未提交
        // 事务协调器会自动中止事务
    }
    
    /**
     * 4. 生产者崩溃后的恢复
     */
    public static void limitation4() {
        // 生产者崩溃后,新生产者使用相同transactional.id
        // 会自动中止之前未完成的事务
    }
    
    /**
     * 5. 性能开销
     */
    public static void limitation5() {
        // 事务会增加延迟(需要与事务协调器通信)
        // 建议只在需要精确一次语义的场景使用
        // 普通场景使用幂等性生产者即可
    }
    
    /**
     * 6. ConsumerGroupMetadata 构造器已废弃
     */
    public static void limitation6() {
        // Kafka 4.2+ 版本中 ConsumerGroupMetadata 的构造器已废弃
        // 正确方式:使用 consumer.groupMetadata() 获取
        // 错误方式:new ConsumerGroupMetadata(groupId)
    }
}

6.11 事务使用场景

场景是否需要事务说明
普通日志收集❌ 不需要at-least-once 就够了
金融交易✅ 需要不能丢也不能重复
库存扣减✅ 需要精确一次语义
实时ETL✅ 需要消费-处理-生产原子性
监控告警❌ 不需要丢几条无所谓
点击流分析❌ 不需要允许少量重复
跨分区写入✅ 需要保证多分区原子性
数据库CDC✅ 需要精确一次同步

6.12 总结

事务解决了三个核心问题:

  1. 原子性写入:跨多个topic/分区的原子写入
  2. 消费-生产原子性:消费和生产的原子性
  3. 故障恢复:崩溃后的状态恢复

关键 API(Kafka 4.2+):

方法说明
initTransactions()初始化事务
beginTransaction()开始事务
sendOffsetsToTransaction(offsets, groupMetadata)提交消费偏移量
commitTransaction()提交事务
abortTransaction()回滚事务

重要注意事项:

  1. 必须设置 enable.idempotence=true
  2. 必须设置 transactional.id
  3. 消费端设置 isolation.level=read_committed
  4. 不要直接构造 ConsumerGroupMetadata,使用 consumer.groupMetadata()
  5. 事务不能跨集群
  6. 事务有性能开销,只在需要时使用

使用建议:

  • 只在需要精确一次语义时使用
  • 事务ID要有明确的命名规范,是生产者实例的标识,不是消息ID,应该用固定值(如服务名),这样才能在重启后恢复事务
  • 合理设置事务超时时间
  • 消费端记得设置isolation.level=read_committed
  • 使用 consumer.groupMetadata() 获取元数据,不要手动构造