一,Spring Boot集成Kafka快速入门

1.1 环境准备

确保你的 Kafka 已启动:

kafka-topics --list --bootstrap-server localhost:9092

1.2 创建Spring Boot项目

项目结构:

kafka-springboot-demo/
├── pom.xml
└── src/main/java/com/example/kafka/
    ├── KafkaSpringbootApplication.java
    ├── config/
    │   └── KafkaConfig.java
    ├── producer/
    │   └── MessageProducer.java
    ├── model/
    │   └── Message.java
    ├── consumer/
    │   └── MessageConsumer.java
    └── controller/
        └── MessageController.java

1.3 Maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.5</version> 
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>SpringBootDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>SpringBootDemo</name>
    <description>SpringBootDemo</description>

    <properties>
        <java.version>17</java.version>  
        <spring-kafka.version>3.1.5</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>  
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- Jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- Spring Boot Kafka Starter -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <!-- 测试依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>17</source>
                    <target>17</target>
                    <annotationProcessorPaths>
                        <path>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                            <version>${lombok.version}</version>
                        </path>
                    </annotationProcessorPaths>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

1.4 配置文件

application.yml:

spring:
  kafka:
    # Kafka服务器地址(注意:用下划线,不是横线)
    bootstrap-servers: 192.168.116.127:9092

    # 生产者配置
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 其他配置
      properties:
        acks: all
        retries: 3
        batch.size: 16384
        buffer.memory: 33554432
        compression.type: snappy

    # 消费者配置
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: my-spring-group
      properties:
        auto.offset.reset: earliest
        enable.auto.commit: true
        auto.commit.interval.ms: 5000
        isolation.level: read_committed

    # 监听器配置
    listener:
      ack-mode: RECORD
kafka:
  topic: spring-topic

配置类 KafkaConfig

@Configuration
@EnableKafka
public class KafkaConfig {

    /**
     * 自动创建Topic(可选)
     * 如果Topic已存在,不会重复创建
     */
    @Bean
    public NewTopic cliTestTopic() {
        return new NewTopic("spring-topic", 3, (short) 1);
    }
}

1.5 生产者

package com.example.springbootdemo.producer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class MessageProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.topic:spring-topic}")
    private String topic;
    /**
     * 发送带key的消息
     */
    public void sendWithKey(String key, String message) {
        kafkaTemplate.send(topic,key, message);
        log.info("发送消息: key={}, message={}", key, message);
    }
}

1.6 消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageConsumer {
    
    @KafkaListener(topics = "${kafka.topic:cli-test}", groupId = "my-spring-group")
    public void consume(ConsumerRecord<String, String> record) {
        log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
            record.topic(),
            record.partition(),
            record.offset(),
            record.key(),
            record.value());
    }
}

1.7 REST Controller

import com.example.springbootdemo.model.Message;
import com.example.springbootdemo.producer.MessageProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/kafka")
@RequiredArgsConstructor
public class MessageController {

    private final MessageProducer producer;

    @PostMapping("/send-with-key")
    public String sendWithKey(@RequestBody Message message) {
        producer.sendWithKey(message.getKey(), message.getMessage());
        return String.format("消息已发送: key=%s, message=%s", message.getKey(), message.getMessage());
    }
}

1.8 消息接收对象

import lombok.Data;

@Data
public class Message {
    private String key;
    private String message;
}

二,Kafka配置详解

2.1 核心配置结构

完整结构是这样的:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    
    producer:
      ...
    
    consumer:
      ...
    
    listener:
      ...
    
    admin:
      ...
    
    properties:
      ..

2.2 基础连接配置

spring:
  kafka:
    bootstrap-servers: localhost:9092

作用: Kafka 集群地址

本质:

bootstrap.servers

注意:可以多个

bootstrap-servers: host1:9092,host2:9092

2.3 Producer 配置

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      
      acks: all
      retries: 3
      batch-size: 16384
      linger-ms: 5
      buffer-memory: 33554432

重点配置解释(非常重要)

  1. 序列化器

    key-serializer
    value-serializer
    

    必须配置,否则无法发送

    常见:

    类型类名作用特点场景推荐度
    StringStringSerializerString → byte[]简单、可读日志、简单消息⭐⭐⭐⭐
    ByteArrayByteArraySerializerbyte[] → byte[]不做任何处理已有二进制数据⭐⭐⭐⭐
    ByteBufferByteBufferSerializerByteBuffer → byte[]NIO支持特殊场景⭐⭐
    IntegerIntegerSerializerint → byte[]固定4字节计数类key⭐⭐⭐
    LongLongSerializerlong → byte[]固定8字节ID/时间戳⭐⭐⭐⭐
    DoubleDoubleSerializerdouble → byte[]数值型统计数据⭐⭐
    UUIDUUIDSerializerUUID → byte[]标准UUID分布式ID⭐⭐⭐
    JSON(这个是spring提供的)JsonSerializer对象 → JSON → byte[]可读、通用微服务通信⭐⭐⭐⭐⭐

    示例

    spring:
      kafka:
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    

    注意: Kafka 只认 byte[]

  2. acks(可靠性核心)

    acks: all
    

    含义:

    含义
    0不等确认(最快,可能丢数据)
    1leader确认
    all所有副本确认(最安全)
  3. retries(重试)

    发送失败重试次数

    retries: 3
    
  4. batch-size(批量发送)

    提高吞吐量(Kafka 高性能核心)

    batch-size: 16384  # 16KB
    

    核心理解:Kafka 不是一条条发,而是攒一批再发

  5. linger-ms(延迟发送)

    等待更多消息一起发送(提高吞吐)

    linger-ms: 5
    
  6. buffer-memory

    Producer 缓存大小(32MB)

    buffer-memory: 33554432  # 32MB
    

    作用:Producer 内存缓冲区

  7. compressionType(压缩)

 **作用**: 减少网络传输
 ```yaml
 compression-type: gzip
 ```

 | 类型   | 特点           |
 | ------ | -------------- |
 | none   | 默认           |
 | gzip   | 压缩高,慢     |
 | snappy | 平衡           |
 | lz4    | 快             |
 | zstd   | 推荐(新版本) |

8. clientId(客户端标识)

 ```yaml
 client-id: order-producer
 ```

 用于:日志,监控,metrics

9. transactionIdPrefix(事务)

 **作用**:开启 Kafka 事务

 ```yaml
 transaction-id-prefix: tx-
 ```

 背后自动开启:

 ```yaml
 enable.idempotence = true
 transactional.id = xxx
 ```

 **使用前提:**

 -   Kafka >= 0.11
 -   consumer 配合 read_committed

 >   当设置了 `transactional.id` 时,Kafka 会**隐式修改配置**:
 >
 >   ```properties
 >   enable.idempotence = true
 >   acks = all
 >   retries = Integer.MAX_VALUE
 >   max.in.flight.requests.per.connection <= 5
 >   ```
 >
 >   Spring 会做这几件事(核心逻辑在 `DefaultKafkaProducerFactory`):
 >
 >   自动推导 transactional.id
 >
 >   ```
 >   tx- + 递增序号
 >   ```
 >
 >   例如:
 >
 >   ```
 >   tx-0
 >   tx-1
 >   tx-2
 >   ```
 >
 >    **开启事务后,`acks` 必须是 `all`(或 `-1`)**
 >
 >   而且:不是 Spring 要求的,是 Kafka 客户端强制要求的

10. properties(万能入口)

  ```yaml
  properties:
    enable.idempotence: true
    max.in.flight.requests.per.connection: 5
  ```

  **作用:** 覆盖 Kafka 原生所有配置

  常用高级参数:

  -   幂等性

      防止重复消息

      ```yaml
      enable.idempotence: true
      ```

  -   **飞行请求数**

      定义(核心):**一个连接上,已经发送但“还没收到响应”的请求数量**

      ```yaml
      max.in.flight.requests.per.connection: 5
      ```

       作用:防止乱序

      >   通俗理解:Producer 发消息是异步的:
      >
      >   ```
      >   发送1 → 未返回
      >   发送2 → 未返回
      >   发送3 → 未返回
      >   ```
      >
      >   这 3 个请求就叫:
      >
      >   ```
      >   in-flight requests(飞行中的请求)
      >   ```
      >
      >   为什么需要这个东西?--》 **提高吞吐量**
      >
      >   对比两种模式
      >
      >   -   串行(in-flight = 1)
      >
      >        慢但安全
      >
      >       ```
      >       发1 → 等返回 → 发2 → 等返回
      >       ```
      >
      >   -   并行(in-flight = 5)
      >
      >       快很多
      >
      >       ```
      >       发1
      >       发2
      >       发3
      >       发4
      >       发5
      >       ```
      >
      >    **乱序问题**
      >
      >   场景
      >
      >   ```
      >   发送顺序:A → B
      >   ```
      >
      >   发生异常:
      >
      >   ```
      >   A 发送失败(需要重试)
      >   B 发送成功
      >   ```
      >
      >   最终结果:
      >
      >   ```
      >   Broker里顺序:B → A   ❌
      >   ```
      >
      >    顺序被打乱
      >
      >   **Kafka怎么解决这个问题?**
      >
      >   **普通模式(默认)**
      >
      >   -   不保证顺序(除非你设置 in-flight=1)
      >
      >   **幂等性模式**
      >
      >   当你开启:
      >
      >   ```
      >   enable.idempotence: true
      >   ```
      >
      >   Kafka要求:
      >
      >   ```
      >   max.in.flight.requests.per.connection ≤ 5
      >   ```
      >
      >   **为什么是 5?**
      >
      >   因为 Kafka 内部设计,每个 Producer 有:
      >
      >   -   PID(生产者ID)
      >   -   Sequence Number(序列号)
      >
      >   Kafka 能保证:
      >
      >   -    **最多缓存 5 个未确认的 sequence**
      >
      >   超过就可能:
      >
      >   -   **无法正确去重 / 排序**

  -   最大请求大小

      ```yaml
      max.request.size: 1048576
      ```

  -   重试间隔

      ```yaml
      retry.backoff.ms: 100
      ```

2.4 Consumer 配置

基本配置

spring:
  kafka:
    consumer:
      group-id: test-group
      
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      
      auto-offset-reset: earliest
      enable-auto-commit: false

Consumer 的本质不是“读消息”,而是“维护消费进度(offset)

所以配置可以分成三类:

分类作用
拉取控制怎么拉数据
消费进度offset怎么提交
组协调怎么做负载均衡
  1. groupId(消费者组)

    ** 作用: ** 标识“消费身份”

    group-id: order-group
    

    核心能力

    能力说明
    负载均衡一个 group 内分区分摊
    消费进度offset 按 group 维护

    举例:

    topic 有 3 个分区
    group 有 3 个消费者
    

    每人一个分区

  2. enableAutoCommit(是否自动提交 offset)

    ** 作用: ** 是否自动提交 offset

    enable-auto-commit: false
    

    两种模式:

    • 自动提交:

      Kafka 自动定时提交 offset

      简单,但有风险

    • 手动提交(推荐)

      业务成功 → 手动 ack
      

      保证不丢数据

  3. autoCommitInterval(自动提交间隔)

    作用:多久提交一次 offset

    auto-commit-interval: 5s
    

    仅在:

    enable-auto-commit: true
    

    时生效

  4. autoOffsetReset(起始消费位置)

    什么时候生效? 没有 offset 时

    auto-offset-reset: earliest
    

    可选值:

    含义
    earliest从最早开始
    latest从最新开始
    none报错

    常见坑:新 group + latest → 收不到历史数据

  5. keyDeserializer / valueDeserializer(反序列化)

    作用:

    byte[] → Java对象
    

    常见:

    StringDeserializer
    JsonDeserializer
    

    必须和 Producer 匹配!

    就是将 Producer 的 xxxSerializer --》XXXDeserializer

  6. maxPollRecords(每次拉多少条)

    作用:一次 poll() 返回多少条

    max-poll-records: 500
    
  7. fetchMinSize(最小拉取字节数)

    作用:至少攒这么多数据才返回,类似 Producer 的 batch

    fetch-min-size: 1KB
    
  8. fetchMaxWait(最大等待时间)

    作用: 等待 fetchMinSize 的最长时间

    fetch-max-wait: 500ms
    

    和 fetchMinSize 配合:

    参数控制
    fetchMinSize数据量
    fetchMaxWait时间
  9. heartbeatInterval(心跳)

    作用:Consumer 向 Group Coordinator 发心跳

    heartbeat-interval: 3s
    

    如果不发:

    被踢出消费者组
    → 触发 rebalance
    

    影响稳定性

  10. isolationLevel(事务读取)

  ```yaml
  isolation-level: read_committed
  ```

  **可选:**

  | 值               | 含义       |
  | ---------------- | ---------- |
  | read_uncommitted | 读所有     |
  | read_committed   | 只读已提交 |

11. properties(高级配置入口)

  ```yaml
  properties:
    max.poll.interval.ms: 300000
  ```

  常见高级配置:

  -   max.poll.interval.ms

      ```yaml
      properties:
        max.poll.interval.ms: 300000
      ```

      如果你处理太慢:Kafka认为你挂了 → 触发rebalance

  -   fetch.max.bytes

      ```yaml
      properties:
        fetch.max.bytes: 300000
      ```

  -   **session.timeout.ms**

       **作用**:Consumer 多久没心跳就被认为“死了”

      ```yaml
      properties:
        session.timeout.ms: 15000
      ```

      举例:

      ```
      session.timeout.ms: 15000
      heartbeat.interval.ms: 3000
      ```

      表示:

      -   每 3 秒发一次心跳
      -   15 秒没收到 → 判定挂了

2.5 Listener 配置(Spring特有)

Spring Kafka 和原生 Kafka 最大的差异点

  • Kafka 原生只有 poll()
  • Spring 帮你封装成:@KafkaListener + Listener Container

所以这一块本质是:“Spring 是怎么帮你消费消息的(线程 + ack + 容器模型)”

Listener 配置整体结构

spring:
  kafka:
    listener:
      concurrency: 3
      ack-mode: manual
      poll-timeout: 1500
      type: single
      idle-event-interval: 60000

核心配置

  1. ackMode(确认机制)
```yaml
spring:
  kafka:
    listener:
      ack-mode: manual
```

ack-mode 是

-   **决定“什么时候提交 offset”**

**可选值**

| 值               | 含义      | 解释                        | 优点               | 缺点           |
| ---------------- | --------- | --------------------------- | ------------------ | -------------- |
| RECORD           | 逐条提交  | 处理一条 → 提交一条         | 最安全             | 性能差         |
| BATCH            | 批量提交  | 一批消息处理完 → 一起提交   | 性能好             | 失败时可能重复 |
| TIME             | 定时提交  | 每隔 N 秒提交,类似自动提交 |                    |                |
| COUNT            | 条数提交  | 处理 N 条 → 提交            |                    |                |
| COUNT_TIME       | 条数+时间 |                             |                    |                |
| MANUAL           | 手动      | ack.acknowledge();          | 你自己控制提交时机 |                |
| MANUAL_IMMEDIATE | 立即提交  | 调用 ack 立刻提交(同步)   |                    |                |

**手动提交示例**

```java
@Slf4j
@Component
public class MessageConsumer {

    @KafkaListener(topics = "spring-topic", groupId = "my-spring-group")
    public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
        try{
            // 执行业务
            acknowledgment.acknowledge();
        }catch (Exception e){
            log.error("消费消息异常: {}", e.getMessage());
        }
        
    }
}
```

2. asyncAcks(异步提交 offset)

**作用: **  `ack()` 是不是**异步提交 offset**,false(同步)

**开启:**

```
spring:
  kafka:
    listener:
      async-acks: true
```

**行为**:

| 模式  | 行为       |
| ----- | ---------- |
| false | ack 阻塞   |
| true  | ack 不阻塞 |

风险:业务成功了,但 offset 提交失败

一般建议: **默认别开,除非你在做高吞吐优化**

3. idleBetweenPolls(poll 间隔)

```yaml
spring:
  kafka:
    listener:
      idleBetweenPolls: 500ms
```

**作用:**  poll() → 处理 → sleep → 再 poll

**防止**:CPU 空转(一直 poll)

4. idleEventInterval(空闲事件)

**作用**: 多久没消息 → 触发事件

```yaml
spring:
  kafka:
    listener:
      idleEventInterval: 100m
```

使用:

```java
@EventListener
public void onIdle(ListenerContainerIdleEvent event) {
    // 可以做监控 / 定时任务
}
```

 **实战用途:**

-   判断系统是否“没流量”
-   定时扫描

5. noPollThreshold(卡死检测)

**作用: **  检测 consumer 是否“卡住了”

```yaml
spring:
  kafka:
    # kafka连接池
    bootstrap-servers: 192.168.116.127:9092
    listener:
      # 卡死检测
      noPollThreshold: 10
```

**原理:**

```
如果:
超过 noPollThreshold × pollTimeout
还没 poll
→ 认为异常
```

**举例**

```
poll-timeout: 1s
no-poll-threshold: 3
```

超过 3 秒没 poll → 告警

```java
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class KafkaConsumerMonitor {

    @EventListener
    public void handleNonResponsiveConsumer(NonResponsiveConsumerEvent event) {

        log.error("⚠️ Kafka消费者卡死检测到!");

        // 1️⃣ 基本信息
        log.error("监听器ID: {}", event.getListenerId());
        log.error("消费者: {}", event.getConsumer());
        log.error("主题分区: {}", event.getTopicPartitions());

        // 2️⃣ 卡住时间(关键)
        log.error("多久没poll了(ms): {}", event.getTimeSinceLastPoll());

        // 3️⃣ Consumer对象(可以做高级操作)
        log.error("Consumer实例: {}", event.getConsumer());

        // 4️⃣ 可选:自动恢复(慎用)
        // event.getSource(MessageListenerContainer.class).stop();
        // event.getSource(MessageListenerContainer.class).start();
    }
}
```

6. missingTopicsFatal(Topic 不存在)

默认:true(报错)

```yaml
spring:
  kafka:
    # kafka连接池
    bootstrap-servers: 192.168.116.127:9092
    listener:
      missingTopicsFatal: true
```

7. immediateStop(停止策略)

```yaml
spring:
  kafka:
    # kafka连接池
    bootstrap-servers: 192.168.116.127:9092
    listener:
      immediateStop: false
```

**含义:**

| 值    | 行为       |
| ----- | ---------- |
| true  | 立刻停     |
| false | 处理完再停 |

8. changeConsumerThreadName(修改线程名)

```yaml
spring:
  kafka:
    # kafka连接池
    bootstrap-servers: 192.168.116.127:9092
    listener:
      changeConsumerThreadName: true
```

**作用**:是否让 Spring Kafka 给消费者线程设置“有意义的名字”

```
kafka-consumer-xxx
```

9. logContainerConfig(启动时打印配置)

```yaml
spring:
  kafka:
    # kafka连接池
    bootstrap-servers: 192.168.116.127:9092
    listener:
      logContainerConfig: true
```

2.6 Admin 配置(可选)

spring:
  kafka:
    admin:
      auto-create: true

常用配置

  1. clientId(客户端标识)

    spring:
      kafka:
        admin:
          client-id: my-admin
    

    用于:

    • 日志
    • 监控
  2. autoCreate(自动创建 topic)

    含义:启动时自动创建 Topic(通过 Bean)

    spring:
      kafka:
        admin:
          auto-create: true
    

    注意:不是 Kafka 自动创建(不是 broker 配置)

    写法

    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class KafkaTopicConfig {
    
        @Bean
        public NewTopic testTopic() {
            return new NewTopic("spring-topic", 3, (short) 1);
        }
    }
    

    含义:

    topic = spring-topic
    分区 = 3
    副本 = 1
    
  3. failFast(快速失败)

 **含义**:启动时连接 Kafka 失败 → 是否直接报错
 ```yaml
 spring:
   kafka:
     admin:
       fail-fast: true
 ```

 | 值    | 行为           |
 | ----- | -------------- |
 | true  | 启动失败       |
 | false | 忽略,继续启动 |

三,自定义Bean配置

3.1 Topic 级别(Topic 管理)

Spring 提供了一个非常关键的类: NewTopic(配合 KafkaAdmin 自动创建)

3.1.1 自定义 Topic Bean

@Bean
public NewTopic orderTopic() {
    return TopicBuilder.name("order-topic")
            .partitions(3)          // 分区数
            .replicas(1)            // 副本数
            .config("retention.ms", "86400000") // 保留时间 1天
            .build();
}

3.1.2 自定义 KafkaAdmin

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

这些配置可以看这些 config 的枚举,他们有描述的,基本上都可以用 yaml 进行配置

3.2 Producer(生产者

核心链路:

ProducerFactory → KafkaTemplate

3.2.1 自定义 ProducerFactory

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
	// 记得生产者和消费者都加,要不然会报错
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    // 高级配置
    props.put(ProducerConfig.ACKS_CONFIG, "all");              // 最强可靠性
    props.put(ProducerConfig.RETRIES_CONFIG, 3);               // 重试
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);        // 批量大小
    props.put(ProducerConfig.LINGER_MS_CONFIG, 5);             // 延迟发送

    return new DefaultKafkaProducerFactory<>(props);
}

3.2.2 自定义 KafkaTemplate

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

3.2.3 ProducerListener(发送监听)

@Bean
public ProducerListener<String, String> producerListener() {
    return new ProducerListener<>() {
        @Override
        public void onSuccess(ProducerRecord<String, String> record, RecordMetadata metadata) {
            System.out.println("发送成功: " + record.value());
        }

        @Override
        public void onError(ProducerRecord<String, String> record, RecordMetadata metadata, Exception exception) {
            System.out.println("发送失败: " + exception.getMessage());
        }
    };
}

绑定:

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());
    template.setProducerListener(producerListener());
    return template;
}

3.2.4 自定义分区器

public class MyPartitioner implements Partitioner {

    @Override
    public void configure(Map<String, ?> configs) {
        // 可以读取配置
    }

    @Override
    public int partition(String topic,
                         Object key,
                         byte[] keyBytes,
                         Object value,
                         byte[] valueBytes,
                         Cluster cluster) {

        // 获取可用分区
        List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);

        int numPartitions = partitions.size();

        // 1. key 不为空 → hash
        if (keyBytes != null) {
            return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
        }

        // 2. key 为空 → 轮询(更合理)
        return ThreadLocalRandom.current().nextInt(numPartitions);
    }

    @Override
    public void close() {
    }
}

注册:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);

3.3 Consumer(消费者)

核心链路:

ConsumerFactory → ListenerContainerFactory → @KafkaListener

3.3.1 自定义ConsumerFactory

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    // 高级配置
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);      // 批量消费
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return new DefaultKafkaConsumerFactory<>(props);
}

3.3.2 消费者监听ListenerContainerFactory

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());

    // 并发消费
    factory.setConcurrency(3);

    // 手动ACK
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

    // 批量消费
    factory.setBatchListener(true);

    return factory;
}

3.3.3 消费者

@Slf4j
@Component
public class MessageConsumer {

    @KafkaListener(topics = "spring-topic",containerFactory = "kafkaListenerContainerFactory")
    public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
        try{
            // 执行业务
            acknowledgment.acknowledge();
        }catch (Exception e){
            log.error("消费消息异常: {}", e.getMessage());
        }

    }
}

记得生产者和消费者都加,要不然会报错
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

四,拦截器(Interceptor)

4.1 Producer 拦截器

/**
 * 生产者拦截器
 */
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        System.out.println("发送前拦截");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
		// ack的时候拦截
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {
		// 读取配置
    }
}

注册:

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());

4.2 Consumer 拦截器

/**
 * 消费者拦截器
 */
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        System.out.println("消费前拦截");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

也要注册!

五,生产者详解

5.1 生产者核心结构

在 Spring Kafka 里:

ProducerFactory → KafkaTemplate → send()

所有发送能力都在 KafkaTemplate

5.2 KafkaTemplate 核心 API

核心方法签名

CompletableFuture<SendResult<K, V>> send(...)

5.3 send 方法全家桶

  1. 最简单发送

    kafkaTemplate.send("topic", "value");
    

    默认:

    • key = null
    • partition = Kafka 自动选
  2. 带 key

    kafkaTemplate.send("topic", "key", "value");
    

    作用:

    特性说明
    分区固定相同 key → 同一 partition
    顺序保证同 key 顺序消费
  3. 指定 partition

    kafkaTemplate.send("topic", 1, "key", "value");
    

    强制发到 partition=1

  4. 指定 timestamp

    kafkaTemplate.send("topic", 1, System.currentTimeMillis(), "key", "value");
    

    用于时间语义(日志 / 流处理)

  5. 发送 ProducerRecord

    ProducerRecord<String, String> record =
            new ProducerRecord<>("topic", "key", "value");
    
    record.headers().add("traceId", "123".getBytes());
    record.headers().add("source", "order-service".getBytes());
    
    kafkaTemplate.send(record);
    

    优点:

    • 可加 header
    • 可控制全部字段

5.4 发送结果处理

  1. 异步回调(推荐)

    kafkaTemplate.send("topic", "msg")
        .whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("成功:" + result.getRecordMetadata());
            } else {
                System.out.println("失败:" + ex.getMessage());
            }
        });
    
  2. 同步发送(慎用)

SendResult<String, String> result =
        kafkaTemplate.send("topic", "msg").get();

风险:

  • 阻塞线程
  • 降低吞吐
  1. 获取元数据

    RecordMetadata meta = result.getRecordMetadata();
    
    meta.topic();
    meta.partition();
    meta.offset();
    

5.5 批量发送 / 性能相关

5.5.1 批量其实是“自动”的

Kafka Producer 内部:

batch.size=16384
linger.ms=5

Spring 不需要你手写 batch API


5.5.2 强制 flush

kafkaTemplate.flush();

使用场景:

  • 程序结束前
  • 测试环境

5.6 生产者高级能力

5.6.1 事务

配置事务

@Bean
public KafkaTemplate<String, String> kafkaTemplate(
        ProducerFactory<String, String> factory) {

    KafkaTemplate<String, String> template = new KafkaTemplate<>(factory);
    template.setTransactionIdPrefix("tx-");
    return template;
}

使用事务

kafkaTemplate.executeInTransaction(operations -> {

    operations.send("topic1", "msg1");
    operations.send("topic2", "msg2");

    return true;
});

特性:

  • 要么全成功
  • 要么全失败

5.6.2 幂等性(必须开)

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

解决:

  • 重试导致重复消息

5.6.3 自定义序列化

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

常见:

  • JSON
  • Avro
  • Protobuf

5.6.5 路由(多 KafkaTemplate)

@Bean
public KafkaTemplate<String, String> kafkaTemplate1(...)
@Bean
public KafkaTemplate<String, String> kafkaTemplate2(...)

用于:

  • 多集群
  • 多业务隔离

5.7 易错点

  1. key 乱用 → 顺序丢失

    没 key:分区随机

    send("topic", null, value)
    
  2. 不处理发送失败

  3. 事务 + 非事务混用 :会导致异常

  4. partition 手动指定错误 : 超出范围直接报错

六,消费者详解

6.1 消费者基本结构

Spring Kafka 中,消费者的核心类是 KafkaListener 注解和 ConcurrentMessageListenerContainer,你会通过 @KafkaListener 注解创建消费者,底层的消息消费逻辑会交给 Spring 处理。

简要模型:

KafkaListener → ConsumerFactory → MessageListenerContainer → KafkaConsumer

6.2 消费者核心 API

6.2.1 创建简单消费者

通过 @KafkaListener 注解,最常见的方式是直接用注解来定义消费者。

@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
    System.out.println("收到消息: " + message);
}
  • topics:消费哪个主题(一个或多个)
  • groupId:消费者组 ID(多消费者时同组 ID)
  • 返回类型void 或者 Future/CompletableFuture

@KafkaListener 是 Spring 提供的用于消费消息的便捷方法,它背后是基于 MessageListener 的。

6.2.2 手动提交偏移量

Kafka 消费者默认会自动提交偏移量,但有时你需要手动控制提交。手动提交适用于一些场景,例如:

  • 需要保证消息的处理顺序。
  • 需要在消息处理成功后再提交偏移量,确保消息不丢失。
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
    System.out.println("收到消息: " + record.value());
    // 处理消息后手动提交
    acknowledgment.acknowledge();
}
  • ConsumerRecord:包装了接收到的消息内容、偏移量等信息
  • Acknowledgment:手动提交偏移量

通过调用 acknowledge() 方法,你就可以手动提交消息的偏移量。

6.2.3 设置消费并发(线程池)

为了提高消费能力,可以配置 并发消费者。Spring Kafka 支持通过配置 并发消费者线程池 来提高吞吐量。

@KafkaListener(topics = "my-topic", groupId = "my-group", concurrency = "3")
public void listen(String message) {
    System.out.println("收到消息: " + message);
}
  • concurrency:配置消费者线程数。此参数会启动多个消费者实例来并行消费消息。

6.2.4 消费特定分区

如果你需要消费特定分区的消息,可以通过 @PartitionOffset 注解来指定从哪个分区、哪个偏移量开始消费。

@KafkaListener(
    topicPartitions = @TopicPartition(
        topic = "my-topic",
        partitions = { "0", "1" }, // 指定要消费的分区
        partitionOffsets = { 
            @PartitionOffset(partition = "0", initialOffset = "0"),  // 从分区 0 的偏移量 0 开始
            @PartitionOffset(partition = "1", initialOffset = "10")  // 从分区 1 的偏移量 10 开始
        }
    ),
    groupId = "my-group"
)
public void listen(String message) {
    System.out.println("接收到的消息: " + message);
}

解释

  • @TopicPartition

    :定义了监听的主题和分区,并且通过

    partitionOffsets
    

    指定了从哪些偏移量开始消费。

    • topic:要消费的 Kafka 主题 my-topic

    • partitions:指定要消费的分区 01

    • partitionOffsets
      

      :指定从各个分区的特定偏移量开始消费。

      • @PartitionOffset(partition = "0", initialOffset = "0") 表示从分区 0 的偏移量 0 开始消费。
      • @PartitionOffset(partition = "1", initialOffset = "10") 表示从分区 1 的偏移量 10 开始消费。

6.2.5 异常处理

在生产中,消息消费过程中可能会遇到异常。Spring Kafka 提供了 ErrorHandler 来处理消费中的异常。

a) 配置 ErrorHandler

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerErrorHandler;
import org.springframework.kafka.listener.config.ErrorHandler;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;

@EnableKafka
public class KafkaConsumer {

    // 使用默认的异常处理方式
    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
        // 模拟异常
        if (message.contains("error")) {
            throw new RuntimeException("Simulated exception!");
        }
    }

    // 自定义异常处理方法
    @KafkaListener(
        topics = "my-topic",
        groupId = "my-group",
        errorHandler = "errorHandler" // 引用一个异常处理器
    )
    public void listenWithErrorHandling(String message) {
        System.out.println("Received message: " + message);
        if (message.contains("error")) {
            throw new RuntimeException("Simulated exception!");
        }
    }

    // 异常处理器方法
    public void errorHandler(ConsumerRecord<?, ?> record, Exception exception) {
        // 这里可以处理异常的逻辑,比如日志记录或重新发送消息
        System.out.println("Error occurred while consuming message: " + exception.getMessage());
    }
}

解释:

  1. @KafkaListener:该注解标注了消费者的方法。消费者从 my-topic 中消费消息,并且有一个异常处理器 errorHandler
  2. errorHandler:这是一个自定义的异常处理方法,用于捕获消费者处理过程中发生的异常。在这个方法中,你可以执行日志记录、报警、重新投递消息等操作。

b) 默认异常处理

DefaultErrorHandler 是 Spring Kafka 2.8.0 引入的一个新的默认错误处理器。它具有以下特点:

  • 重试机制:可以配置重试次数和间隔时间,支持控制重试行为。
  • 死信队列(Dead-letter Queue,DLQ):允许在消息处理失败时,将失败的消息发送到一个专门的死信队列。
  • 消息回滚:能够在特定的情况下回滚消费(例如,当消息处理异常时)。

示例:使用 DefaultErrorHandler 配置消费者

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerErrorHandler;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
        
        // 假设消息处理过程中抛出异常
        if (message.contains("error")) {
            throw new RuntimeException("Simulated processing error!");
        }
        
        System.out.println("Message processed successfully.");
    }

    // 配置 DefaultErrorHandler
    @Bean
    public DefaultErrorHandler errorHandler() {
        // 创建 DefaultErrorHandler
        // 配置最大重试次数 3 次,每次重试间隔 1 秒
        return new DefaultErrorHandler(
            (record, exception) -> {
                // 当消费失败时,打印错误信息
                System.out.println("Error processing message, retrying... " + record.value());
            },
            3,  // 重试 3 次
            1000  // 每次重试间隔 1 秒
        );
    }
}

详细说明:

  • DefaultErrorHandler:这个类用来处理消费者消息处理过程中的错误。它不仅支持重试,还支持自定义错误处理逻辑。我们可以指定失败消息的重试次数、重试间隔时间等。
  • 重试机制
    • 重试次数:在这个例子中,消费者遇到异常时会最多重试 3 次。
    • 重试间隔:每次重试间隔 1 秒(1000 毫秒)。
  • 失败后的处理逻辑
    • 我们在错误发生时打印错误信息,当然,你也可以在这里定义其他的处理方式,如将失败的消息转发到死信队列(DLQ)。

配置死信队列

你可以配置死信队列来处理那些无法成功消费的消息。通过 DeadLetterPublishingRecoverer,你可以将失败的消息发送到指定的死信队列(一个独立的 Kafka 主题)。例如:

@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, String> kafkaTemplate) {
    // 配置重试策略,每次重试间隔 1 秒,最多重试 3 次
    BackOff backOff = new FixedBackOff(1000L, 3);
    
    // 创建 DeadLetterPublishingRecoverer,将失败消息发送到死信队列
    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
    
    // 创建 DefaultErrorHandler,配置重试和死信队列
    return new DefaultErrorHandler(recoverer, backOff);
}

死信队列工作原理:

  • 失败消息:当消费失败且无法成功重试时,DeadLetterPublishingRecoverer 会将失败的消息发送到一个新的 Kafka 主题(即死信队列)。
  • 死信队列的作用:可以用于持久化失败消息,以便后续进行处理或分析,避免丢失数据。

6.3 高级消费者能力

6.3.1 消费者事务

以下是一个简单的消费者事务的示例,展示了如何使用 Spring Kafka 在消费者端实现事务消费。

示例代码

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TransactionalConsumer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        // 开始事务
        kafkaTemplate.executeInTransaction(operations -> {
            // 消费消息后执行的操作
            System.out.println("消费的消息: " + message);

            // 发送事务消息到另一个 topic
            operations.send("other-topic", message);

            // 可以进行其他的数据库操作或业务操作
            // 如果没有问题,提交事务
            return true; // 提交事务
        });
    }
}

详细解释

  1. @KafkaListener 注解:通过 @KafkaListener 注解,我们定义了一个消费者方法,监听 my-topic 这个 Kafka topic。
  2. kafkaTemplate.executeInTransaction():这个方法包裹了消费者消费消息后的所有操作。executeInTransaction 是 KafkaTemplate 提供的方法,它确保我们在执行操作时保持事务的一致性。在这个事务块内,我们不仅消费了消息,还向另一个 Kafka topic 发送了一个新消息。
  3. operations.send("other-topic", message):这行代码会将消费的消息发送到另一个 Kafka topic other-topic。这个发送操作是和消费操作在同一个事务内进行的。如果发送失败,事务会被回滚,消费的消息不会被确认。
  4. 事务的提交与回滚
    • 如果 operations.send 成功执行,executeInTransaction 方法会返回 true,表示事务成功,所有操作被提交。
    • 如果在事务过程中出现任何错误(如发送消息失败),Spring Kafka 会自动回滚事务,确保没有消息被消费和处理。

6.3.2 延迟消费

可以通过 延迟队列(Delay Queue) 模式来设计延迟消费,通常会用到 ScheduledExecutorService 来模拟。

一种常见的方式是通过 ScheduledExecutorService 来模拟延迟队列。在这个模型中,消息会被消费后,放入一个调度队列中,消费者线程通过调度器来定期检查是否该消费。

基本代码示例:模拟延迟消费

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
public class DelayedConsumer {

    // KafkaTemplate 用于发送消息
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    // 延迟执行器
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        // 假设消息中带有延迟时间,单位为秒
        long delay = extractDelayFromMessage(message);

        // 模拟延迟消费,将消息在指定时间后消费
        scheduler.schedule(() -> {
            processMessage(message);
        }, delay, TimeUnit.SECONDS);
    }

    private long extractDelayFromMessage(String message) {
        // 假设消息中包含延迟时间(单位:秒),实际根据应用场景修改
        return 5; // 延迟 5 秒消费
    }

    private void processMessage(String message) {
        // 模拟消息的处理
        System.out.println("消费消息: " + message);
        
        // 消息处理完成后,发送到另一个 topic(可选)
        kafkaTemplate.send("processed-topic", message);
    }
}

解释

  1. ScheduledExecutorService:这个 Java 类提供了一个调度执行器,它能够在指定的延迟后执行任务。在此例中,我们用它来实现延迟消费。
  2. listen 方法:这是 Kafka 消费者方法,当有消息进入时,我们首先从消息中提取出延迟时间(例如,消息中带有一个延迟时间字段),然后使用 scheduler.schedule 方法延迟执行消息处理。
  3. processMessage 方法:这是模拟消息处理的方法,可以是任何业务逻辑,例如调用数据库、外部服务等。处理完成后,消息可以被转发到另一个 Kafka topic,或者进行其他的业务操作。
  4. scheduler.schedule():它会将任务提交到延迟队列,在延迟时间到达后再执行。在这里,我们传入了一个延迟时间,表示多少秒后执行消息的消费。

6.4 消费者常见坑

  1. 没有设置 group.id 或者设置错误

    Kafka 中消费者需要设置 group.id 来标识消费者组。如果不设置,Kafka 会把每个消费者当作独立消费者对待。

  2. 偏移量提交失败

    当消费者在处理消息时,如果没有正确提交偏移量,可能会导致消息丢失或重复消费。

    解决办法:

    • 定期 commit 偏移量。
    • 异常处理后手动提交
  3. 消费过多消息导致堆积

    如果消费者消费速度慢,消息堆积在 Kafka 中。解决办法:

    • 增加并发消费者数(concurrency)。
    • 优化消费处理速度。