本文档风哥主要介绍Kafka Streams流处理基础实战,包括Kafka Streams核心概念、Kafka Streams架构原理、Kafka Streams应用开发、Kafka Streams状态管理等内容,风哥教程参考Kafka官方文档Streams、Developer Guide等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Kafka Streams核心概念
Kafka Streams是Apache Kafka开源的流处理库,用于构建实时流处理应用。它是一个轻量级的Java库,可以嵌入到任何Java应用中。学习交流加群风哥微信: itpux-com
- Stream:无界、连续的数据流
- Table:表的变更日志流,支持状态查询
- Topology:流处理拓扑,定义处理逻辑
- Processor:处理节点,执行具体处理逻辑
- State Store:状态存储,支持有状态操作
1.2 Kafka Streams架构原理
Kafka Streams架构设计:
1. 流处理拓扑
– Source Processor: 从Topic读取数据
– Stream Processor: 处理数据转换
– Sink Processor: 写入数据到Topic
2. 任务模型
– Task: 分区级别的处理单元
– Thread: 运行多个Task的线程
– Instance: 运行多个Thread的实例
3. 状态管理
– Local State Store: 本地状态存储
– Global State Store: 全局状态存储
– Changelog Topic: 状态变更日志
4. 容错机制
– 状态持久化到Kafka
– 自动恢复和重平衡
– 精确一次语义
# 处理模型
Source Topic -> Source Processor -> Stream Processor -> Sink Processor -> Sink Topic
| | | |
v v v v
Partition Task 1 Task 1 Task 1
1.3 Kafka Streams核心特性
Kafka Streams核心特性:
- 轻量级:只是一个Java库,无需独立集群
- 高可用:支持故障自动恢复
- 精确一次:支持精确一次语义
- 状态管理:支持有状态操作
- 窗口操作:支持时间窗口聚合
- 交互式查询:支持状态查询
Part02-生产环境规划与建议
2.1 流处理应用规划
流处理应用规划需要考虑以下因素:
1. 数据规模
– 消息吞吐量
– 消息大小
– 分区数量
2. 处理逻辑
– 无状态处理: 简单转换
– 有状态处理: 聚合、连接
– 窗口操作: 时间窗口聚合
3. 资源规划
– CPU: 处理逻辑复杂度
– 内存: 状态存储大小
– 网络: 数据传输量
# 应用实例规划
– 单实例: 开发测试
– 多实例: 生产环境,支持扩展
– 实例数量 = 总分区数 / 每实例处理分区数
# 示例规划
Topic: fgedu-orders (12 partitions)
应用配置: num.stream.threads=3
实例数量: 4个实例
每个实例处理: 3个分区
2.2 Streams配置规划
Kafka Streams核心配置:
# 基础配置
application.id=fgedu-streams-app # 应用ID,唯一标识
bootstrap.servers=192.168.1.51:9092 # Kafka集群地址
# 线程配置
num.stream.threads=3 # 流处理线程数
# 状态存储配置
state.dir=/bigdata/streams/state # 状态存储目录
# 缓存配置
cache.max.bytes.buffering=10485760 # 缓存大小
# 提交配置
commit.interval.ms=1000 # 提交间隔
# 容错配置
processing.guarantee=exactly_once_v2 # 精确一次语义
# 生产者配置
producer.acks=all
producer.enable.idempotence=true
# 消费者配置
consumer.auto.offset.reset=earliest
consumer.max.poll.records=1000
# 完整配置示例
application.id=fgedu-order-streams
bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092
num.stream.threads=3
state.dir=/bigdata/streams/state
cache.max.bytes.buffering=10485760
commit.interval.ms=1000
processing.guarantee=exactly_once_v2
producer.acks=all
producer.enable.idempotence=true
consumer.auto.offset.reset=earliest
2.3 部署模式规划
部署模式选择:
1. 独立应用模式
– 优点: 部署简单,资源隔离
– 缺点: 需要独立运维
– 适用: 传统部署环境
2. 容器化部署
– 优点: 弹性扩展,易于管理
– 缺点: 需要容器平台
– 适用: Kubernetes环境
3. 微服务集成
– 优点: 与业务服务集成
– 缺点: 资源竞争
– 适用: 微服务架构
# 部署建议
– 开发环境: 独立应用模式
– 生产环境: 容器化部署
– 微服务架构: 集成部署
Part03-生产环境项目实施方案
3.1 Streams应用开发实战
3.1.1 创建Streams应用
$ mvn archetype:generate \
-DgroupId=com.fgedu.kafka \
-DartifactId=fgedu-streams-demo \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false
# 添加依赖
$ cat > pom.xml << 'EOF'
EOF
# 创建应用配置
$ cat > src/main/resources/streams.properties << 'EOF'
application.id=fgedu-streams-demo
bootstrap.servers=192.168.1.51:9092
num.stream.threads=2
state.dir=/bigdata/streams/state
processing.guarantee=exactly_once_v2
EOF
# 创建主类
$ cat > src/main/java/com/fgedu/kafka/StreamsDemo.java << 'EOF'
package com.fgedu.kafka;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class StreamsDemo {
public static void main(String[] args) {
// 1. 配置Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "fgedu-streams-demo");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.51:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 2. 构建拓扑
StreamsBuilder builder = new StreamsBuilder();
// 从源Topic读取数据
KStream
// 处理数据
KStream
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> value.toUpperCase());
// 写入目标Topic
processed.to(“fgedu-output”);
// 3. 启动应用
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// 4. 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
EOF
3.1.2 编译运行应用
$ mvn clean package
[INFO] Scanning for projects…
[INFO] Building fgedu-streams-demo 1.0-SNAPSHOT
[INFO]
[INFO] — maven-clean-plugin:2.5:clean —
[INFO] — maven-resources-plugin:2.6:resources —
[INFO] — maven-compiler-plugin:3.1:compile —
[INFO] — maven-jar-plugin:2.4:jar —
[INFO] BUILD SUCCESS
# 创建输入输出Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-input \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092
Created topic fgedu-input.
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-output \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092
Created topic fgedu-output.
# 运行应用
$ java -cp target/fgedu-streams-demo-1.0-SNAPSHOT.jar:target/dependency/* \
com.fgedu.kafka.StreamsDemo
# 发送测试数据
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-input \
–bootstrap-server 192.168.1.51:9092
>hello world
>kafka streams
>fgedu demo
# 查看输出结果
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-output \
–from-beginning \
–bootstrap-server 192.168.1.51:9092
HELLO WORLD
KAFKA STREAMS
FGEDU DEMO
3.2 流处理操作实战
3.2.1 常用操作算子
// 1. 过滤操作
KStream
.filter((key, value) -> value.length() > 10);
// 2. 映射操作
KStream
.map((key, value) -> new KeyValue<>(key.toUpperCase(), value.toLowerCase()));
// 3. 值映射操作
KStream
.mapValues(value -> value.toUpperCase());
// 4. 分支操作
KStream
.branch(
(key, value) -> value.startsWith(“A”), // 分支1
(key, value) -> value.startsWith(“B”), // 分支2
(key, value) -> true // 默认分支
);
// 5. 合并操作
KStream
.merge(branches[1]);
// 6. 分组操作
KGroupedStream
.groupBy((key, value) -> key);
// 7. 聚合操作
KTable
.groupBy((key, value) -> value)
.count();
// 8. 连接操作
KStream
.join(stream2,
(value1, value2) -> value1 + “:” + value2,
JoinWindows.of(Duration.ofMinutes(5)));
3.2.2 窗口操作
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import java.time.Duration;
// 1. 滚动窗口
KTable
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// 2. 滑动窗口
KTable
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1)))
.count();
// 3. 会话窗口
KTable
.groupBy((key, value) -> value)
.windowedBy(SessionWindows.with(Duration.ofMinutes(5)))
.count();
// 4. 窗口结果输出
tumblingCounts.toStream()
.map((key, value) -> new KeyValue<>(
key.key() + “@” + key.window().start(),
value.toString()))
.to(“fgedu-window-output”);
3.3 状态存储实战
3.3.1 本地状态存储
// 1. 创建状态存储
StoreBuilder
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(“fgedu-state-store”),
Serdes.String(),
Serdes.Long()
);
// 2. 添加状态存储到拓扑
builder.addStateStore(storeBuilder);
// 3. 在Processor中使用状态存储
KStream
.process(() -> new Processor
private KeyValueStore
@Override
public void init(ProcessorContext context) {
this.stateStore = context.getStateStore(“fgedu-state-store”);
}
@Override
public void process(String key, String value) {
Long count = stateStore.get(key);
if (count == null) {
count = 0L;
}
stateStore.put(key, count + 1);
}
@Override
public void close() {}
}, “fgedu-state-store”);
# 状态存储类型
– RocksDB: 持久化存储,适合大数据量
– InMemory: 内存存储,适合小数据量
– Persistent: 持久化存储
– Caching: 缓存存储
3.3.2 交互式查询
// 1. 配置应用服务器
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, “192.168.1.51:8080”);
// 2. 创建可查询的状态存储
KTable
.groupBy((key, value) -> value)
.count(Materialized.as(“fgedu-counts-store”));
// 3. 查询状态存储
ReadOnlyKeyValueStore
streams.store(“fgedu-counts-store”, QueryableStoreTypes.keyValueStore());
// 4. 查询特定Key的值
Long count = store.get(“hello”);
// 5. 查询所有Key
KeyValueIterator
while (all.hasNext()) {
KeyValue
System.out.println(next.key + “: ” + next.value);
}
// 6. 范围查询
KeyValueIterator
Part04-生产案例与实战讲解
4.1 WordCount实时统计实战
4.1.1 WordCount应用开发
package com.fgedu.kafka;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;
public class WordCountDemo {
public static void main(String[] args) {
// 配置
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, “fgedu-wordcount”);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.1.51:9092”);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 构建拓扑
StreamsBuilder builder = new StreamsBuilder();
// 从Topic读取文本
KStream
// 分词并统计
Pattern pattern = Pattern.compile(“\\W+”, Pattern.UNICODE_CHARACTER_CLASS);
KTable
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
// 输出结果
wordCounts.toStream().to(
“fgedu-wordcount-output”,
Produced.with(Serdes.String(), Serdes.Long())
);
// 启动应用
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
# 创建Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-text-input \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-wordcount-output \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092
# 运行应用并发送测试数据
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-text-input \
–bootstrap-server 192.168.1.51:9092
>hello world hello kafka
>kafka streams kafka demo
>fgedu kafka fgedu demo
# 查看统计结果
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-wordcount-output \
–from-beginning \
–property print.key=true \
–property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
–property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer \
–bootstrap-server 192.168.1.51:9092
demo 2
fgedu 2
hello 2
kafka 4
streams 1
world 1
4.2 流连接实战
4.2.1 流-流连接
// 订单流
KStream
// 支付流
KStream
// 连接两个流
KStream
.join(payments,
(order, payment) -> new OrderPayment(order, payment),
JoinWindows.of(Duration.ofMinutes(5)),
StreamJoined.with(
Serdes.String(),
new JsonSerde<>(Order.class),
new JsonSerde<>(Payment.class)
)
);
// 输出连接结果
joined.to(“fgedu-order-payment”);
# 创建Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-orders \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-payments \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-order-payment \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092
4.3 常见问题处理
4.3.1 应用启动失败
# 排查步骤
# 1. 检查配置
$ grep -E “application.id|bootstrap.servers” streams.properties
application.id=fgedu-streams-demo
bootstrap.servers=192.168.1.51:9092
# 2. 检查Topic是否存在
$ /bigdata/app/kafka/bin/kafka-topics.sh –list \
–bootstrap-server 192.168.1.51:9092
# 3. 检查状态目录权限
$ ls -la /bigdata/streams/state
# 4. 检查日志
$ tail -100 logs/streams.log
# 常见错误
# 1. application.id冲突
# 解决:使用唯一的application.id
# 2. Topic不存在
# 解决:先创建Topic
# 3. 状态目录权限问题
# 解决:chmod 755 /bigdata/streams/state
4.3.2 处理延迟问题
# 排查步骤
# 1. 检查消费者延迟
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-streams-demo \
–bootstrap-server 192.168.1.51:9092
# 2. 检查线程数
# 增加num.stream.threads
# 3. 检查状态存储大小
$ du -sh /bigdata/streams/state
# 优化方案
# 1. 增加处理线程
num.stream.threads=4
# 2. 增加缓存大小
cache.max.bytes.buffering=52428800
# 3. 调整提交间隔
commit.interval.ms=100
# 4. 增加实例数量
# 部署更多应用实例
Part05-风哥经验总结与分享
5.1 Streams最佳实践
Kafka Streams最佳实践建议:
1. 使用唯一的application.id
2. 合理设置num.stream.threads
3. 配置状态存储目录
4. 启用精确一次语义
5. 实现优雅关闭
# 部署最佳实践
1. 监控消费者延迟
2. 监控状态存储大小
3. 配置合理的资源
4. 实现健康检查
5. 制定故障恢复预案
# 性能优化
1. 增加处理线程
2. 增加缓存大小
3. 优化状态存储
4. 调整提交间隔
5.2 开发检查清单
开发检查清单:
- application.id是否唯一
- bootstrap.servers是否正确
- Topic是否存在
- 状态存储目录是否有权限
- 处理线程数是否合理
- 精确一次语义是否启用
- 健康检查是否实现
5.3 调试工具推荐
Kafka Streams调试工具:
- kafka-streams-application-reset:重置应用状态
- Kafka Tool:查看Topic数据
- JMX:监控应用指标
- Logs:查看应用日志
- Interactive Queries:查询状态存储
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
