一,Spring Boot集成Kafka快速入门
1.1 环境准备
确保你的 Kafka 已启动:
kafka-topics --list --bootstrap-server localhost:9092
1.2 创建Spring Boot项目
项目结构:
kafka-springboot-demo/
├── pom.xml
└── src/main/java/com/example/kafka/
├── KafkaSpringbootApplication.java
├── config/
│ └── KafkaConfig.java
├── producer/
│ └── MessageProducer.java
├── model/
│ └── Message.java
├── consumer/
│ └── MessageConsumer.java
└── controller/
└── MessageController.java
1.3 Maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>SpringBootDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBootDemo</name>
<description>SpringBootDemo</description>
<properties>
<java.version>17</java.version>
<spring-kafka.version>3.1.5</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Spring Boot Kafka Starter -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>
1.4 配置文件
application.yml:
spring:
kafka:
# Kafka服务器地址(注意:用下划线,不是横线)
bootstrap-servers: 192.168.116.127:9092
# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 其他配置
properties:
acks: all
retries: 3
batch.size: 16384
buffer.memory: 33554432
compression.type: snappy
# 消费者配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: my-spring-group
properties:
auto.offset.reset: earliest
enable.auto.commit: true
auto.commit.interval.ms: 5000
isolation.level: read_committed
# 监听器配置
listener:
ack-mode: RECORD
kafka:
topic: spring-topic
配置类 KafkaConfig
@Configuration
@EnableKafka
public class KafkaConfig {
/**
* 自动创建Topic(可选)
* 如果Topic已存在,不会重复创建
*/
@Bean
public NewTopic cliTestTopic() {
return new NewTopic("spring-topic", 3, (short) 1);
}
}
1.5 生产者
package com.example.springbootdemo.producer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic:spring-topic}")
private String topic;
/**
* 发送带key的消息
*/
public void sendWithKey(String key, String message) {
kafkaTemplate.send(topic,key, message);
log.info("发送消息: key={}, message={}", key, message);
}
}
1.6 消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageConsumer {
@KafkaListener(topics = "${kafka.topic:cli-test}", groupId = "my-spring-group")
public void consume(ConsumerRecord<String, String> record) {
log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
}
1.7 REST Controller
import com.example.springbootdemo.model.Message;
import com.example.springbootdemo.producer.MessageProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/kafka")
@RequiredArgsConstructor
public class MessageController {
private final MessageProducer producer;
@PostMapping("/send-with-key")
public String sendWithKey(@RequestBody Message message) {
producer.sendWithKey(message.getKey(), message.getMessage());
return String.format("消息已发送: key=%s, message=%s", message.getKey(), message.getMessage());
}
}
1.8 消息接收对象
import lombok.Data;
@Data
public class Message {
private String key;
private String message;
}
二,Kafka配置详解
2.1 核心配置结构
完整结构是这样的:
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
...
consumer:
...
listener:
...
admin:
...
properties:
..
2.2 基础连接配置
spring:
kafka:
bootstrap-servers: localhost:9092
作用: Kafka 集群地址
本质:
bootstrap.servers
注意:可以多个
bootstrap-servers: host1:9092,host2:9092
2.3 Producer 配置
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
batch-size: 16384
linger-ms: 5
buffer-memory: 33554432
重点配置解释(非常重要)
-
序列化器
key-serializer value-serializer必须配置,否则无法发送
常见:
类型 类名 作用 特点 场景 推荐度 String StringSerializerString → byte[] 简单、可读 日志、简单消息 ⭐⭐⭐⭐ ByteArray ByteArraySerializerbyte[] → byte[] 不做任何处理 已有二进制数据 ⭐⭐⭐⭐ ByteBuffer ByteBufferSerializerByteBuffer → byte[] NIO支持 特殊场景 ⭐⭐ Integer IntegerSerializerint → byte[] 固定4字节 计数类key ⭐⭐⭐ Long LongSerializerlong → byte[] 固定8字节 ID/时间戳 ⭐⭐⭐⭐ Double DoubleSerializerdouble → byte[] 数值型 统计数据 ⭐⭐ UUID UUIDSerializerUUID → byte[] 标准UUID 分布式ID ⭐⭐⭐ JSON(这个是spring提供的) JsonSerializer 对象 → JSON → byte[] 可读、通用 微服务通信 ⭐⭐⭐⭐⭐ 示例
spring: kafka: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer注意: Kafka 只认 byte[]
-
acks(可靠性核心)
acks: all含义:
值 含义 0 不等确认(最快,可能丢数据) 1 leader确认 all 所有副本确认(最安全) -
retries(重试)
发送失败重试次数
retries: 3 -
batch-size(批量发送)
提高吞吐量(Kafka 高性能核心)
batch-size: 16384 # 16KB核心理解:Kafka 不是一条条发,而是攒一批再发
-
linger-ms(延迟发送)
等待更多消息一起发送(提高吞吐)
linger-ms: 5 -
buffer-memory
Producer 缓存大小(32MB)
buffer-memory: 33554432 # 32MB作用:Producer 内存缓冲区
-
compressionType(压缩)
**作用**: 减少网络传输
```yaml
compression-type: gzip
```
| 类型 | 特点 |
| ------ | -------------- |
| none | 默认 |
| gzip | 压缩高,慢 |
| snappy | 平衡 |
| lz4 | 快 |
| zstd | 推荐(新版本) |
8. clientId(客户端标识)
```yaml
client-id: order-producer
```
用于:日志,监控,metrics
9. transactionIdPrefix(事务)
**作用**:开启 Kafka 事务
```yaml
transaction-id-prefix: tx-
```
背后自动开启:
```yaml
enable.idempotence = true
transactional.id = xxx
```
**使用前提:**
- Kafka >= 0.11
- consumer 配合 read_committed
> 当设置了 `transactional.id` 时,Kafka 会**隐式修改配置**:
>
> ```properties
> enable.idempotence = true
> acks = all
> retries = Integer.MAX_VALUE
> max.in.flight.requests.per.connection <= 5
> ```
>
> Spring 会做这几件事(核心逻辑在 `DefaultKafkaProducerFactory`):
>
> 自动推导 transactional.id
>
> ```
> tx- + 递增序号
> ```
>
> 例如:
>
> ```
> tx-0
> tx-1
> tx-2
> ```
>
> **开启事务后,`acks` 必须是 `all`(或 `-1`)**
>
> 而且:不是 Spring 要求的,是 Kafka 客户端强制要求的
10. properties(万能入口)
```yaml
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
```
**作用:** 覆盖 Kafka 原生所有配置
常用高级参数:
- 幂等性
防止重复消息
```yaml
enable.idempotence: true
```
- **飞行请求数**
定义(核心):**一个连接上,已经发送但“还没收到响应”的请求数量**
```yaml
max.in.flight.requests.per.connection: 5
```
作用:防止乱序
> 通俗理解:Producer 发消息是异步的:
>
> ```
> 发送1 → 未返回
> 发送2 → 未返回
> 发送3 → 未返回
> ```
>
> 这 3 个请求就叫:
>
> ```
> in-flight requests(飞行中的请求)
> ```
>
> 为什么需要这个东西?--》 **提高吞吐量**
>
> 对比两种模式
>
> - 串行(in-flight = 1)
>
> 慢但安全
>
> ```
> 发1 → 等返回 → 发2 → 等返回
> ```
>
> - 并行(in-flight = 5)
>
> 快很多
>
> ```
> 发1
> 发2
> 发3
> 发4
> 发5
> ```
>
> **乱序问题**
>
> 场景
>
> ```
> 发送顺序:A → B
> ```
>
> 发生异常:
>
> ```
> A 发送失败(需要重试)
> B 发送成功
> ```
>
> 最终结果:
>
> ```
> Broker里顺序:B → A ❌
> ```
>
> 顺序被打乱
>
> **Kafka怎么解决这个问题?**
>
> **普通模式(默认)**
>
> - 不保证顺序(除非你设置 in-flight=1)
>
> **幂等性模式**
>
> 当你开启:
>
> ```
> enable.idempotence: true
> ```
>
> Kafka要求:
>
> ```
> max.in.flight.requests.per.connection ≤ 5
> ```
>
> **为什么是 5?**
>
> 因为 Kafka 内部设计,每个 Producer 有:
>
> - PID(生产者ID)
> - Sequence Number(序列号)
>
> Kafka 能保证:
>
> - **最多缓存 5 个未确认的 sequence**
>
> 超过就可能:
>
> - **无法正确去重 / 排序**
- 最大请求大小
```yaml
max.request.size: 1048576
```
- 重试间隔
```yaml
retry.backoff.ms: 100
```
2.4 Consumer 配置
基本配置
spring:
kafka:
consumer:
group-id: test-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
Consumer 的本质不是“读消息”,而是“维护消费进度(offset)
所以配置可以分成三类:
| 分类 | 作用 |
|---|---|
| 拉取控制 | 怎么拉数据 |
| 消费进度 | offset怎么提交 |
| 组协调 | 怎么做负载均衡 |
-
groupId(消费者组)
** 作用: ** 标识“消费身份”
group-id: order-group核心能力:
能力 说明 负载均衡 一个 group 内分区分摊 消费进度 offset 按 group 维护 举例:
topic 有 3 个分区 group 有 3 个消费者每人一个分区
-
enableAutoCommit(是否自动提交 offset)
** 作用: ** 是否自动提交 offset
enable-auto-commit: false两种模式:
-
自动提交:
Kafka 自动定时提交 offset
简单,但有风险
-
手动提交(推荐)
业务成功 → 手动 ack保证不丢数据
-
-
autoCommitInterval(自动提交间隔)
作用:多久提交一次 offset
auto-commit-interval: 5s仅在:
enable-auto-commit: true时生效
-
autoOffsetReset(起始消费位置)
什么时候生效? 没有 offset 时
auto-offset-reset: earliest可选值:
值 含义 earliest 从最早开始 latest 从最新开始 none 报错 常见坑:新 group + latest → 收不到历史数据
-
keyDeserializer / valueDeserializer(反序列化)
作用:
byte[] → Java对象
常见:
StringDeserializer JsonDeserializer必须和 Producer 匹配!
就是将 Producer 的 xxxSerializer --》XXXDeserializer
-
maxPollRecords(每次拉多少条)
作用:一次 poll() 返回多少条
max-poll-records: 500 -
fetchMinSize(最小拉取字节数)
作用:至少攒这么多数据才返回,类似 Producer 的 batch
fetch-min-size: 1KB -
fetchMaxWait(最大等待时间)
作用: 等待 fetchMinSize 的最长时间
fetch-max-wait: 500ms和 fetchMinSize 配合:
参数 控制 fetchMinSize 数据量 fetchMaxWait 时间 -
heartbeatInterval(心跳)
作用:Consumer 向 Group Coordinator 发心跳
heartbeat-interval: 3s如果不发:
被踢出消费者组 → 触发 rebalance影响稳定性
-
isolationLevel(事务读取)
```yaml
isolation-level: read_committed
```
**可选:**
| 值 | 含义 |
| ---------------- | ---------- |
| read_uncommitted | 读所有 |
| read_committed | 只读已提交 |
11. properties(高级配置入口)
```yaml
properties:
max.poll.interval.ms: 300000
```
常见高级配置:
- max.poll.interval.ms
```yaml
properties:
max.poll.interval.ms: 300000
```
如果你处理太慢:Kafka认为你挂了 → 触发rebalance
- fetch.max.bytes
```yaml
properties:
fetch.max.bytes: 300000
```
- **session.timeout.ms**
**作用**:Consumer 多久没心跳就被认为“死了”
```yaml
properties:
session.timeout.ms: 15000
```
举例:
```
session.timeout.ms: 15000
heartbeat.interval.ms: 3000
```
表示:
- 每 3 秒发一次心跳
- 15 秒没收到 → 判定挂了
2.5 Listener 配置(Spring特有)
是 Spring Kafka 和原生 Kafka 最大的差异点。
- Kafka 原生只有
poll() - Spring 帮你封装成:
@KafkaListener + Listener Container
所以这一块本质是:“Spring 是怎么帮你消费消息的(线程 + ack + 容器模型)”
Listener 配置整体结构
spring:
kafka:
listener:
concurrency: 3
ack-mode: manual
poll-timeout: 1500
type: single
idle-event-interval: 60000
核心配置
- ackMode(确认机制)
```yaml
spring:
kafka:
listener:
ack-mode: manual
```
ack-mode 是
- **决定“什么时候提交 offset”**
**可选值**
| 值 | 含义 | 解释 | 优点 | 缺点 |
| ---------------- | --------- | --------------------------- | ------------------ | -------------- |
| RECORD | 逐条提交 | 处理一条 → 提交一条 | 最安全 | 性能差 |
| BATCH | 批量提交 | 一批消息处理完 → 一起提交 | 性能好 | 失败时可能重复 |
| TIME | 定时提交 | 每隔 N 秒提交,类似自动提交 | | |
| COUNT | 条数提交 | 处理 N 条 → 提交 | | |
| COUNT_TIME | 条数+时间 | | | |
| MANUAL | 手动 | ack.acknowledge(); | 你自己控制提交时机 | |
| MANUAL_IMMEDIATE | 立即提交 | 调用 ack 立刻提交(同步) | | |
**手动提交示例**
```java
@Slf4j
@Component
public class MessageConsumer {
@KafkaListener(topics = "spring-topic", groupId = "my-spring-group")
public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
try{
// 执行业务
acknowledgment.acknowledge();
}catch (Exception e){
log.error("消费消息异常: {}", e.getMessage());
}
}
}
```
2. asyncAcks(异步提交 offset)
**作用: ** `ack()` 是不是**异步提交 offset**,false(同步)
**开启:**
```
spring:
kafka:
listener:
async-acks: true
```
**行为**:
| 模式 | 行为 |
| ----- | ---------- |
| false | ack 阻塞 |
| true | ack 不阻塞 |
风险:业务成功了,但 offset 提交失败
一般建议: **默认别开,除非你在做高吞吐优化**
3. idleBetweenPolls(poll 间隔)
```yaml
spring:
kafka:
listener:
idleBetweenPolls: 500ms
```
**作用:** poll() → 处理 → sleep → 再 poll
**防止**:CPU 空转(一直 poll)
4. idleEventInterval(空闲事件)
**作用**: 多久没消息 → 触发事件
```yaml
spring:
kafka:
listener:
idleEventInterval: 100m
```
使用:
```java
@EventListener
public void onIdle(ListenerContainerIdleEvent event) {
// 可以做监控 / 定时任务
}
```
**实战用途:**
- 判断系统是否“没流量”
- 定时扫描
5. noPollThreshold(卡死检测)
**作用: ** 检测 consumer 是否“卡住了”
```yaml
spring:
kafka:
# kafka连接池
bootstrap-servers: 192.168.116.127:9092
listener:
# 卡死检测
noPollThreshold: 10
```
**原理:**
```
如果:
超过 noPollThreshold × pollTimeout
还没 poll
→ 认为异常
```
**举例**
```
poll-timeout: 1s
no-poll-threshold: 3
```
超过 3 秒没 poll → 告警
```java
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KafkaConsumerMonitor {
@EventListener
public void handleNonResponsiveConsumer(NonResponsiveConsumerEvent event) {
log.error("⚠️ Kafka消费者卡死检测到!");
// 1️⃣ 基本信息
log.error("监听器ID: {}", event.getListenerId());
log.error("消费者: {}", event.getConsumer());
log.error("主题分区: {}", event.getTopicPartitions());
// 2️⃣ 卡住时间(关键)
log.error("多久没poll了(ms): {}", event.getTimeSinceLastPoll());
// 3️⃣ Consumer对象(可以做高级操作)
log.error("Consumer实例: {}", event.getConsumer());
// 4️⃣ 可选:自动恢复(慎用)
// event.getSource(MessageListenerContainer.class).stop();
// event.getSource(MessageListenerContainer.class).start();
}
}
```
6. missingTopicsFatal(Topic 不存在)
默认:true(报错)
```yaml
spring:
kafka:
# kafka连接池
bootstrap-servers: 192.168.116.127:9092
listener:
missingTopicsFatal: true
```
7. immediateStop(停止策略)
```yaml
spring:
kafka:
# kafka连接池
bootstrap-servers: 192.168.116.127:9092
listener:
immediateStop: false
```
**含义:**
| 值 | 行为 |
| ----- | ---------- |
| true | 立刻停 |
| false | 处理完再停 |
8. changeConsumerThreadName(修改线程名)
```yaml
spring:
kafka:
# kafka连接池
bootstrap-servers: 192.168.116.127:9092
listener:
changeConsumerThreadName: true
```
**作用**:是否让 Spring Kafka 给消费者线程设置“有意义的名字”
```
kafka-consumer-xxx
```
9. logContainerConfig(启动时打印配置)
```yaml
spring:
kafka:
# kafka连接池
bootstrap-servers: 192.168.116.127:9092
listener:
logContainerConfig: true
```
2.6 Admin 配置(可选)
spring:
kafka:
admin:
auto-create: true
常用配置
-
clientId(客户端标识)
spring: kafka: admin: client-id: my-admin用于:
- 日志
- 监控
-
autoCreate(自动创建 topic)
含义:启动时自动创建 Topic(通过 Bean)
spring: kafka: admin: auto-create: true注意:不是 Kafka 自动创建(不是 broker 配置)
写法
import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaTopicConfig { @Bean public NewTopic testTopic() { return new NewTopic("spring-topic", 3, (short) 1); } }含义:
topic = spring-topic 分区 = 3 副本 = 1 -
failFast(快速失败)
**含义**:启动时连接 Kafka 失败 → 是否直接报错
```yaml
spring:
kafka:
admin:
fail-fast: true
```
| 值 | 行为 |
| ----- | -------------- |
| true | 启动失败 |
| false | 忽略,继续启动 |
三,自定义Bean配置
3.1 Topic 级别(Topic 管理)
Spring 提供了一个非常关键的类: NewTopic(配合 KafkaAdmin 自动创建)
3.1.1 自定义 Topic Bean
@Bean
public NewTopic orderTopic() {
return TopicBuilder.name("order-topic")
.partitions(3) // 分区数
.replicas(1) // 副本数
.config("retention.ms", "86400000") // 保留时间 1天
.build();
}
3.1.2 自定义 KafkaAdmin
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
这些配置可以看这些 config 的枚举,他们有描述的,基本上都可以用 yaml 进行配置
3.2 Producer(生产者
核心链路:
ProducerFactory → KafkaTemplate
3.2.1 自定义 ProducerFactory
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
// 记得生产者和消费者都加,要不然会报错
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 高级配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 最强可靠性
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 延迟发送
return new DefaultKafkaProducerFactory<>(props);
}
3.2.2 自定义 KafkaTemplate
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
3.2.3 ProducerListener(发送监听)
@Bean
public ProducerListener<String, String> producerListener() {
return new ProducerListener<>() {
@Override
public void onSuccess(ProducerRecord<String, String> record, RecordMetadata metadata) {
System.out.println("发送成功: " + record.value());
}
@Override
public void onError(ProducerRecord<String, String> record, RecordMetadata metadata, Exception exception) {
System.out.println("发送失败: " + exception.getMessage());
}
};
}
绑定:
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());
template.setProducerListener(producerListener());
return template;
}
3.2.4 自定义分区器
public class MyPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
// 可以读取配置
}
@Override
public int partition(String topic,
Object key,
byte[] keyBytes,
Object value,
byte[] valueBytes,
Cluster cluster) {
// 获取可用分区
List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);
int numPartitions = partitions.size();
// 1. key 不为空 → hash
if (keyBytes != null) {
return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
}
// 2. key 为空 → 轮询(更合理)
return ThreadLocalRandom.current().nextInt(numPartitions);
}
@Override
public void close() {
}
}
注册:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
3.3 Consumer(消费者)
核心链路:
ConsumerFactory → ListenerContainerFactory → @KafkaListener
3.3.1 自定义ConsumerFactory
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 高级配置
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // 批量消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
3.3.2 消费者监听ListenerContainerFactory
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 并发消费
factory.setConcurrency(3);
// 手动ACK
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// 批量消费
factory.setBatchListener(true);
return factory;
}
3.3.3 消费者
@Slf4j
@Component
public class MessageConsumer {
@KafkaListener(topics = "spring-topic",containerFactory = "kafkaListenerContainerFactory")
public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
try{
// 执行业务
acknowledgment.acknowledge();
}catch (Exception e){
log.error("消费消息异常: {}", e.getMessage());
}
}
}
记得生产者和消费者都加,要不然会报错
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
四,拦截器(Interceptor)
4.1 Producer 拦截器
/**
* 生产者拦截器
*/
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
System.out.println("发送前拦截");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
// ack的时候拦截
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
// 读取配置
}
}
注册:
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
4.2 Consumer 拦截器
/**
* 消费者拦截器
*/
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("消费前拦截");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
也要注册!
五,生产者详解
5.1 生产者核心结构
在 Spring Kafka 里:
ProducerFactory → KafkaTemplate → send()
所有发送能力都在 KafkaTemplate
5.2 KafkaTemplate 核心 API
核心方法签名
CompletableFuture<SendResult<K, V>> send(...)
5.3 send 方法全家桶
-
最简单发送
kafkaTemplate.send("topic", "value");默认:
- key = null
- partition = Kafka 自动选
-
带 key
kafkaTemplate.send("topic", "key", "value");作用:
特性 说明 分区固定 相同 key → 同一 partition 顺序保证 同 key 顺序消费 -
指定 partition
kafkaTemplate.send("topic", 1, "key", "value");强制发到 partition=1
-
指定 timestamp
kafkaTemplate.send("topic", 1, System.currentTimeMillis(), "key", "value");用于时间语义(日志 / 流处理)
-
发送 ProducerRecord
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value"); record.headers().add("traceId", "123".getBytes()); record.headers().add("source", "order-service".getBytes()); kafkaTemplate.send(record);优点:
- 可加 header
- 可控制全部字段
5.4 发送结果处理
-
异步回调(推荐)
kafkaTemplate.send("topic", "msg") .whenComplete((result, ex) -> { if (ex == null) { System.out.println("成功:" + result.getRecordMetadata()); } else { System.out.println("失败:" + ex.getMessage()); } }); -
同步发送(慎用)
SendResult<String, String> result =
kafkaTemplate.send("topic", "msg").get();
风险:
- 阻塞线程
- 降低吞吐
-
获取元数据
RecordMetadata meta = result.getRecordMetadata(); meta.topic(); meta.partition(); meta.offset();
5.5 批量发送 / 性能相关
5.5.1 批量其实是“自动”的
Kafka Producer 内部:
batch.size=16384
linger.ms=5
Spring 不需要你手写 batch API
5.5.2 强制 flush
kafkaTemplate.flush();
使用场景:
- 程序结束前
- 测试环境
5.6 生产者高级能力
5.6.1 事务
配置事务
@Bean
public KafkaTemplate<String, String> kafkaTemplate(
ProducerFactory<String, String> factory) {
KafkaTemplate<String, String> template = new KafkaTemplate<>(factory);
template.setTransactionIdPrefix("tx-");
return template;
}
使用事务
kafkaTemplate.executeInTransaction(operations -> {
operations.send("topic1", "msg1");
operations.send("topic2", "msg2");
return true;
});
特性:
- 要么全成功
- 要么全失败
5.6.2 幂等性(必须开)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
解决:
- 重试导致重复消息
5.6.3 自定义序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
常见:
- JSON
- Avro
- Protobuf
5.6.5 路由(多 KafkaTemplate)
@Bean
public KafkaTemplate<String, String> kafkaTemplate1(...)
@Bean
public KafkaTemplate<String, String> kafkaTemplate2(...)
用于:
- 多集群
- 多业务隔离
5.7 易错点
-
key 乱用 → 顺序丢失
没 key:分区随机
send("topic", null, value) -
不处理发送失败
-
事务 + 非事务混用 :会导致异常
-
partition 手动指定错误 : 超出范围直接报错
六,消费者详解
6.1 消费者基本结构
Spring Kafka 中,消费者的核心类是 KafkaListener 注解和 ConcurrentMessageListenerContainer,你会通过 @KafkaListener 注解创建消费者,底层的消息消费逻辑会交给 Spring 处理。
简要模型:
KafkaListener → ConsumerFactory → MessageListenerContainer → KafkaConsumer
6.2 消费者核心 API
6.2.1 创建简单消费者
通过 @KafkaListener 注解,最常见的方式是直接用注解来定义消费者。
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("收到消息: " + message);
}
topics:消费哪个主题(一个或多个)groupId:消费者组 ID(多消费者时同组 ID)- 返回类型:
void或者Future/CompletableFuture
@KafkaListener 是 Spring 提供的用于消费消息的便捷方法,它背后是基于
MessageListener的。
6.2.2 手动提交偏移量
Kafka 消费者默认会自动提交偏移量,但有时你需要手动控制提交。手动提交适用于一些场景,例如:
- 需要保证消息的处理顺序。
- 需要在消息处理成功后再提交偏移量,确保消息不丢失。
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
System.out.println("收到消息: " + record.value());
// 处理消息后手动提交
acknowledgment.acknowledge();
}
ConsumerRecord:包装了接收到的消息内容、偏移量等信息Acknowledgment:手动提交偏移量
通过调用 acknowledge() 方法,你就可以手动提交消息的偏移量。
6.2.3 设置消费并发(线程池)
为了提高消费能力,可以配置 并发消费者。Spring Kafka 支持通过配置 并发消费者线程池 来提高吞吐量。
@KafkaListener(topics = "my-topic", groupId = "my-group", concurrency = "3")
public void listen(String message) {
System.out.println("收到消息: " + message);
}
concurrency:配置消费者线程数。此参数会启动多个消费者实例来并行消费消息。
6.2.4 消费特定分区
如果你需要消费特定分区的消息,可以通过 @PartitionOffset 注解来指定从哪个分区、哪个偏移量开始消费。
@KafkaListener(
topicPartitions = @TopicPartition(
topic = "my-topic",
partitions = { "0", "1" }, // 指定要消费的分区
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"), // 从分区 0 的偏移量 0 开始
@PartitionOffset(partition = "1", initialOffset = "10") // 从分区 1 的偏移量 10 开始
}
),
groupId = "my-group"
)
public void listen(String message) {
System.out.println("接收到的消息: " + message);
}
解释:
-
@TopicPartition:定义了监听的主题和分区,并且通过
partitionOffsets指定了从哪些偏移量开始消费。
-
topic:要消费的 Kafka 主题my-topic。 -
partitions:指定要消费的分区0和1。 -
partitionOffsets:指定从各个分区的特定偏移量开始消费。
@PartitionOffset(partition = "0", initialOffset = "0")表示从分区0的偏移量0开始消费。@PartitionOffset(partition = "1", initialOffset = "10")表示从分区1的偏移量10开始消费。
-
6.2.5 异常处理
在生产中,消息消费过程中可能会遇到异常。Spring Kafka 提供了 ErrorHandler 来处理消费中的异常。
a) 配置 ErrorHandler
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerErrorHandler;
import org.springframework.kafka.listener.config.ErrorHandler;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
@EnableKafka
public class KafkaConsumer {
// 使用默认的异常处理方式
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received message: " + message);
// 模拟异常
if (message.contains("error")) {
throw new RuntimeException("Simulated exception!");
}
}
// 自定义异常处理方法
@KafkaListener(
topics = "my-topic",
groupId = "my-group",
errorHandler = "errorHandler" // 引用一个异常处理器
)
public void listenWithErrorHandling(String message) {
System.out.println("Received message: " + message);
if (message.contains("error")) {
throw new RuntimeException("Simulated exception!");
}
}
// 异常处理器方法
public void errorHandler(ConsumerRecord<?, ?> record, Exception exception) {
// 这里可以处理异常的逻辑,比如日志记录或重新发送消息
System.out.println("Error occurred while consuming message: " + exception.getMessage());
}
}
解释:
@KafkaListener:该注解标注了消费者的方法。消费者从my-topic中消费消息,并且有一个异常处理器errorHandler。errorHandler:这是一个自定义的异常处理方法,用于捕获消费者处理过程中发生的异常。在这个方法中,你可以执行日志记录、报警、重新投递消息等操作。
b) 默认异常处理
DefaultErrorHandler 是 Spring Kafka 2.8.0 引入的一个新的默认错误处理器。它具有以下特点:
- 重试机制:可以配置重试次数和间隔时间,支持控制重试行为。
- 死信队列(Dead-letter Queue,DLQ):允许在消息处理失败时,将失败的消息发送到一个专门的死信队列。
- 消息回滚:能够在特定的情况下回滚消费(例如,当消息处理异常时)。
示例:使用 DefaultErrorHandler 配置消费者
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerErrorHandler;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.annotation.EnableKafka;
@EnableKafka
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received message: " + message);
// 假设消息处理过程中抛出异常
if (message.contains("error")) {
throw new RuntimeException("Simulated processing error!");
}
System.out.println("Message processed successfully.");
}
// 配置 DefaultErrorHandler
@Bean
public DefaultErrorHandler errorHandler() {
// 创建 DefaultErrorHandler
// 配置最大重试次数 3 次,每次重试间隔 1 秒
return new DefaultErrorHandler(
(record, exception) -> {
// 当消费失败时,打印错误信息
System.out.println("Error processing message, retrying... " + record.value());
},
3, // 重试 3 次
1000 // 每次重试间隔 1 秒
);
}
}
详细说明:
DefaultErrorHandler:这个类用来处理消费者消息处理过程中的错误。它不仅支持重试,还支持自定义错误处理逻辑。我们可以指定失败消息的重试次数、重试间隔时间等。- 重试机制
- 重试次数:在这个例子中,消费者遇到异常时会最多重试 3 次。
- 重试间隔:每次重试间隔 1 秒(1000 毫秒)。
- 失败后的处理逻辑
- 我们在错误发生时打印错误信息,当然,你也可以在这里定义其他的处理方式,如将失败的消息转发到死信队列(DLQ)。
配置死信队列
你可以配置死信队列来处理那些无法成功消费的消息。通过 DeadLetterPublishingRecoverer,你可以将失败的消息发送到指定的死信队列(一个独立的 Kafka 主题)。例如:
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, String> kafkaTemplate) {
// 配置重试策略,每次重试间隔 1 秒,最多重试 3 次
BackOff backOff = new FixedBackOff(1000L, 3);
// 创建 DeadLetterPublishingRecoverer,将失败消息发送到死信队列
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
// 创建 DefaultErrorHandler,配置重试和死信队列
return new DefaultErrorHandler(recoverer, backOff);
}
死信队列工作原理:
- 失败消息:当消费失败且无法成功重试时,
DeadLetterPublishingRecoverer会将失败的消息发送到一个新的 Kafka 主题(即死信队列)。 - 死信队列的作用:可以用于持久化失败消息,以便后续进行处理或分析,避免丢失数据。
6.3 高级消费者能力
6.3.1 消费者事务
以下是一个简单的消费者事务的示例,展示了如何使用 Spring Kafka 在消费者端实现事务消费。
示例代码:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TransactionalConsumer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
// 开始事务
kafkaTemplate.executeInTransaction(operations -> {
// 消费消息后执行的操作
System.out.println("消费的消息: " + message);
// 发送事务消息到另一个 topic
operations.send("other-topic", message);
// 可以进行其他的数据库操作或业务操作
// 如果没有问题,提交事务
return true; // 提交事务
});
}
}
详细解释
@KafkaListener注解:通过@KafkaListener注解,我们定义了一个消费者方法,监听my-topic这个 Kafka topic。kafkaTemplate.executeInTransaction():这个方法包裹了消费者消费消息后的所有操作。executeInTransaction是 KafkaTemplate 提供的方法,它确保我们在执行操作时保持事务的一致性。在这个事务块内,我们不仅消费了消息,还向另一个 Kafka topic 发送了一个新消息。operations.send("other-topic", message):这行代码会将消费的消息发送到另一个 Kafka topicother-topic。这个发送操作是和消费操作在同一个事务内进行的。如果发送失败,事务会被回滚,消费的消息不会被确认。- 事务的提交与回滚
- 如果
operations.send成功执行,executeInTransaction方法会返回true,表示事务成功,所有操作被提交。 - 如果在事务过程中出现任何错误(如发送消息失败),Spring Kafka 会自动回滚事务,确保没有消息被消费和处理。
- 如果
6.3.2 延迟消费
可以通过 延迟队列(Delay Queue) 模式来设计延迟消费,通常会用到 ScheduledExecutorService 来模拟。
一种常见的方式是通过 ScheduledExecutorService 来模拟延迟队列。在这个模型中,消息会被消费后,放入一个调度队列中,消费者线程通过调度器来定期检查是否该消费。
基本代码示例:模拟延迟消费
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
public class DelayedConsumer {
// KafkaTemplate 用于发送消息
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 延迟执行器
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
// 假设消息中带有延迟时间,单位为秒
long delay = extractDelayFromMessage(message);
// 模拟延迟消费,将消息在指定时间后消费
scheduler.schedule(() -> {
processMessage(message);
}, delay, TimeUnit.SECONDS);
}
private long extractDelayFromMessage(String message) {
// 假设消息中包含延迟时间(单位:秒),实际根据应用场景修改
return 5; // 延迟 5 秒消费
}
private void processMessage(String message) {
// 模拟消息的处理
System.out.println("消费消息: " + message);
// 消息处理完成后,发送到另一个 topic(可选)
kafkaTemplate.send("processed-topic", message);
}
}
解释
ScheduledExecutorService:这个 Java 类提供了一个调度执行器,它能够在指定的延迟后执行任务。在此例中,我们用它来实现延迟消费。listen方法:这是 Kafka 消费者方法,当有消息进入时,我们首先从消息中提取出延迟时间(例如,消息中带有一个延迟时间字段),然后使用scheduler.schedule方法延迟执行消息处理。processMessage方法:这是模拟消息处理的方法,可以是任何业务逻辑,例如调用数据库、外部服务等。处理完成后,消息可以被转发到另一个 Kafka topic,或者进行其他的业务操作。scheduler.schedule():它会将任务提交到延迟队列,在延迟时间到达后再执行。在这里,我们传入了一个延迟时间,表示多少秒后执行消息的消费。
6.4 消费者常见坑
-
没有设置
group.id或者设置错误Kafka 中消费者需要设置
group.id来标识消费者组。如果不设置,Kafka 会把每个消费者当作独立消费者对待。 -
偏移量提交失败
当消费者在处理消息时,如果没有正确提交偏移量,可能会导致消息丢失或重复消费。
解决办法:
- 定期
commit偏移量。 - 异常处理后手动提交
- 定期
-
消费过多消息导致堆积
如果消费者消费速度慢,消息堆积在 Kafka 中。解决办法:
- 增加并发消费者数(
concurrency)。 - 优化消费处理速度。
- 增加并发消费者数(
SpringBoot集成Kafka
本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。