1. 首页 > Hadoop教程 > 正文

大数据教程FG058-Kafka流处理基础实战

本文档风哥主要介绍Kafka Streams流处理基础实战,包括Kafka Streams核心概念、Kafka Streams架构原理、Kafka Streams应用开发、Kafka Streams状态管理等内容,风哥教程参考Kafka官方文档Streams、Developer Guide等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 Kafka Streams核心概念

Kafka Streams是Apache Kafka开源的流处理库,用于构建实时流处理应用。它是一个轻量级的Java库,可以嵌入到任何Java应用中。学习交流加群风哥微信: itpux-com

Kafka Streams核心概念:

  • Stream:无界、连续的数据流
  • Table:表的变更日志流,支持状态查询
  • Topology:流处理拓扑,定义处理逻辑
  • Processor:处理节点,执行具体处理逻辑
  • State Store:状态存储,支持有状态操作

1.2 Kafka Streams架构原理

Kafka Streams架构设计:

# Kafka Streams架构组件

1. 流处理拓扑
– Source Processor: 从Topic读取数据
– Stream Processor: 处理数据转换
– Sink Processor: 写入数据到Topic

2. 任务模型
– Task: 分区级别的处理单元
– Thread: 运行多个Task的线程
– Instance: 运行多个Thread的实例

3. 状态管理
– Local State Store: 本地状态存储
– Global State Store: 全局状态存储
– Changelog Topic: 状态变更日志

4. 容错机制
– 状态持久化到Kafka
– 自动恢复和重平衡
– 精确一次语义

# 处理模型
Source Topic -> Source Processor -> Stream Processor -> Sink Processor -> Sink Topic
| | | |
v v v v
Partition Task 1 Task 1 Task 1

1.3 Kafka Streams核心特性

Kafka Streams核心特性:

  • 轻量级:只是一个Java库,无需独立集群
  • 高可用:支持故障自动恢复
  • 精确一次:支持精确一次语义
  • 状态管理:支持有状态操作
  • 窗口操作:支持时间窗口聚合
  • 交互式查询:支持状态查询
风哥提示:Kafka Streams是一个轻量级的流处理框架,不需要独立部署集群,可以直接嵌入到Java应用中,非常适合构建微服务架构的流处理应用。

Part02-生产环境规划与建议

2.1 流处理应用规划

流处理应用规划需要考虑以下因素:

# 应用规划要点

1. 数据规模
– 消息吞吐量
– 消息大小
– 分区数量

2. 处理逻辑
– 无状态处理: 简单转换
– 有状态处理: 聚合、连接
– 窗口操作: 时间窗口聚合

3. 资源规划
– CPU: 处理逻辑复杂度
– 内存: 状态存储大小
– 网络: 数据传输量

# 应用实例规划
– 单实例: 开发测试
– 多实例: 生产环境,支持扩展
– 实例数量 = 总分区数 / 每实例处理分区数

# 示例规划
Topic: fgedu-orders (12 partitions)
应用配置: num.stream.threads=3
实例数量: 4个实例
每个实例处理: 3个分区

2.2 Streams配置规划

Kafka Streams核心配置:

# Streams核心配置

# 基础配置
application.id=fgedu-streams-app # 应用ID,唯一标识
bootstrap.servers=192.168.1.51:9092 # Kafka集群地址

# 线程配置
num.stream.threads=3 # 流处理线程数

# 状态存储配置
state.dir=/bigdata/streams/state # 状态存储目录

# 缓存配置
cache.max.bytes.buffering=10485760 # 缓存大小

# 提交配置
commit.interval.ms=1000 # 提交间隔

# 容错配置
processing.guarantee=exactly_once_v2 # 精确一次语义

# 生产者配置
producer.acks=all
producer.enable.idempotence=true

# 消费者配置
consumer.auto.offset.reset=earliest
consumer.max.poll.records=1000

# 完整配置示例
application.id=fgedu-order-streams
bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092
num.stream.threads=3
state.dir=/bigdata/streams/state
cache.max.bytes.buffering=10485760
commit.interval.ms=1000
processing.guarantee=exactly_once_v2
producer.acks=all
producer.enable.idempotence=true
consumer.auto.offset.reset=earliest

2.3 部署模式规划

部署模式选择:

# 部署模式

1. 独立应用模式
– 优点: 部署简单,资源隔离
– 缺点: 需要独立运维
– 适用: 传统部署环境

2. 容器化部署
– 优点: 弹性扩展,易于管理
– 缺点: 需要容器平台
– 适用: Kubernetes环境

3. 微服务集成
– 优点: 与业务服务集成
– 缺点: 资源竞争
– 适用: 微服务架构

# 部署建议
– 开发环境: 独立应用模式
– 生产环境: 容器化部署
– 微服务架构: 集成部署

生产环境建议:生产环境建议使用容器化部署,配合Kubernetes实现弹性扩展和高可用。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 Streams应用开发实战

3.1.1 创建Streams应用

# 创建Maven项目
$ mvn archetype:generate \
-DgroupId=com.fgedu.kafka \
-DartifactId=fgedu-streams-demo \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false

# 添加依赖
$ cat > pom.xml << 'EOF'

org.apache.kafka
kafka-streams
3.6.1


org.slf4j
slf4j-simple
2.0.9


EOF

# 创建应用配置
$ cat > src/main/resources/streams.properties << 'EOF' application.id=fgedu-streams-demo bootstrap.servers=192.168.1.51:9092 num.stream.threads=2 state.dir=/bigdata/streams/state processing.guarantee=exactly_once_v2 EOF # 创建主类 $ cat > src/main/java/com/fgedu/kafka/StreamsDemo.java << 'EOF' package com.fgedu.kafka; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Properties; public class StreamsDemo { public static void main(String[] args) { // 1. 配置Streams Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "fgedu-streams-demo"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.51:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 2. 构建拓扑 StreamsBuilder builder = new StreamsBuilder(); // 从源Topic读取数据 KStream source = builder.stream(“fgedu-input”);

// 处理数据
KStream processed = source
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> value.toUpperCase());

// 写入目标Topic
processed.to(“fgedu-output”);

// 3. 启动应用
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// 4. 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
EOF

3.1.2 编译运行应用

# 编译项目
$ mvn clean package

[INFO] Scanning for projects…
[INFO] Building fgedu-streams-demo 1.0-SNAPSHOT
[INFO]
[INFO] — maven-clean-plugin:2.5:clean —
[INFO] — maven-resources-plugin:2.6:resources —
[INFO] — maven-compiler-plugin:3.1:compile —
[INFO] — maven-jar-plugin:2.4:jar —
[INFO] BUILD SUCCESS

# 创建输入输出Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-input \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092

Created topic fgedu-input.

$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-output \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092

Created topic fgedu-output.

# 运行应用
$ java -cp target/fgedu-streams-demo-1.0-SNAPSHOT.jar:target/dependency/* \
com.fgedu.kafka.StreamsDemo

# 发送测试数据
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-input \
–bootstrap-server 192.168.1.51:9092

>hello world
>kafka streams
>fgedu demo

# 查看输出结果
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-output \
–from-beginning \
–bootstrap-server 192.168.1.51:9092

HELLO WORLD
KAFKA STREAMS
FGEDU DEMO

3.2 流处理操作实战

3.2.1 常用操作算子

# 常用操作算子示例

// 1. 过滤操作
KStream filtered = source
.filter((key, value) -> value.length() > 10);

// 2. 映射操作
KStream mapped = source
.map((key, value) -> new KeyValue<>(key.toUpperCase(), value.toLowerCase()));

// 3. 值映射操作
KStream valueMapped = source
.mapValues(value -> value.toUpperCase());

// 4. 分支操作
KStream[] branches = source
.branch(
(key, value) -> value.startsWith(“A”), // 分支1
(key, value) -> value.startsWith(“B”), // 分支2
(key, value) -> true // 默认分支
);

// 5. 合并操作
KStream merged = branches[0]
.merge(branches[1]);

// 6. 分组操作
KGroupedStream grouped = source
.groupBy((key, value) -> key);

// 7. 聚合操作
KTable counts = source
.groupBy((key, value) -> value)
.count();

// 8. 连接操作
KStream joined = stream1
.join(stream2,
(value1, value2) -> value1 + “:” + value2,
JoinWindows.of(Duration.ofMinutes(5)));

3.2.2 窗口操作

# 窗口操作示例

import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import java.time.Duration;

// 1. 滚动窗口
KTable, Long> tumblingCounts = source
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();

// 2. 滑动窗口
KTable, Long> hoppingCounts = source
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1)))
.count();

// 3. 会话窗口
KTable, Long> sessionCounts = source
.groupBy((key, value) -> value)
.windowedBy(SessionWindows.with(Duration.ofMinutes(5)))
.count();

// 4. 窗口结果输出
tumblingCounts.toStream()
.map((key, value) -> new KeyValue<>(
key.key() + “@” + key.window().start(),
value.toString()))
.to(“fgedu-window-output”);

风哥提示:窗口操作是流处理的核心能力,Kafka Streams支持滚动窗口、滑动窗口和会话窗口三种窗口类型,可以根据业务需求选择合适的窗口类型。更多学习教程公众号风哥教程itpux_com

3.3 状态存储实战

3.3.1 本地状态存储

# 状态存储配置

// 1. 创建状态存储
StoreBuilder> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(“fgedu-state-store”),
Serdes.String(),
Serdes.Long()
);

// 2. 添加状态存储到拓扑
builder.addStateStore(storeBuilder);

// 3. 在Processor中使用状态存储
KStream processed = source
.process(() -> new Processor() {
private KeyValueStore stateStore;

@Override
public void init(ProcessorContext context) {
this.stateStore = context.getStateStore(“fgedu-state-store”);
}

@Override
public void process(String key, String value) {
Long count = stateStore.get(key);
if (count == null) {
count = 0L;
}
stateStore.put(key, count + 1);
}

@Override
public void close() {}
}, “fgedu-state-store”);

# 状态存储类型
– RocksDB: 持久化存储,适合大数据量
– InMemory: 内存存储,适合小数据量
– Persistent: 持久化存储
– Caching: 缓存存储

3.3.2 交互式查询

# 交互式查询配置

// 1. 配置应用服务器
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, “192.168.1.51:8080”);

// 2. 创建可查询的状态存储
KTable counts = source
.groupBy((key, value) -> value)
.count(Materialized.as(“fgedu-counts-store”));

// 3. 查询状态存储
ReadOnlyKeyValueStore store =
streams.store(“fgedu-counts-store”, QueryableStoreTypes.keyValueStore());

// 4. 查询特定Key的值
Long count = store.get(“hello”);

// 5. 查询所有Key
KeyValueIterator all = store.all();
while (all.hasNext()) {
KeyValue next = all.next();
System.out.println(next.key + “: ” + next.value);
}

// 6. 范围查询
KeyValueIterator range = store.range(“a”, “z”);

Part04-生产案例与实战讲解

4.1 WordCount实时统计实战

4.1.1 WordCount应用开发

# WordCount完整示例

package com.fgedu.kafka;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;

public class WordCountDemo {
public static void main(String[] args) {
// 配置
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, “fgedu-wordcount”);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.1.51:9092”);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// 构建拓扑
StreamsBuilder builder = new StreamsBuilder();

// 从Topic读取文本
KStream textLines = builder.stream(“fgedu-text-input”);

// 分词并统计
Pattern pattern = Pattern.compile(“\\W+”, Pattern.UNICODE_CHARACTER_CLASS);
KTable wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();

// 输出结果
wordCounts.toStream().to(
“fgedu-wordcount-output”,
Produced.with(Serdes.String(), Serdes.Long())
);

// 启动应用
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

# 创建Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-text-input \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092

$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-wordcount-output \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092

# 运行应用并发送测试数据
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-text-input \
–bootstrap-server 192.168.1.51:9092

>hello world hello kafka
>kafka streams kafka demo
>fgedu kafka fgedu demo

# 查看统计结果
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-wordcount-output \
–from-beginning \
–property print.key=true \
–property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
–property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer \
–bootstrap-server 192.168.1.51:9092

demo 2
fgedu 2
hello 2
kafka 4
streams 1
world 1

4.2 流连接实战

4.2.1 流-流连接

# 流-流连接示例

// 订单流
KStream orders = builder.stream(“fgedu-orders”);

// 支付流
KStream payments = builder.stream(“fgedu-payments”);

// 连接两个流
KStream joined = orders
.join(payments,
(order, payment) -> new OrderPayment(order, payment),
JoinWindows.of(Duration.ofMinutes(5)),
StreamJoined.with(
Serdes.String(),
new JsonSerde<>(Order.class),
new JsonSerde<>(Payment.class)
)
);

// 输出连接结果
joined.to(“fgedu-order-payment”);

# 创建Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-orders \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092

$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-payments \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092

$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-order-payment \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092

4.3 常见问题处理

4.3.1 应用启动失败

# 问题现象:应用启动失败

# 排查步骤
# 1. 检查配置
$ grep -E “application.id|bootstrap.servers” streams.properties

application.id=fgedu-streams-demo
bootstrap.servers=192.168.1.51:9092

# 2. 检查Topic是否存在
$ /bigdata/app/kafka/bin/kafka-topics.sh –list \
–bootstrap-server 192.168.1.51:9092

# 3. 检查状态目录权限
$ ls -la /bigdata/streams/state

# 4. 检查日志
$ tail -100 logs/streams.log

# 常见错误
# 1. application.id冲突
# 解决:使用唯一的application.id

# 2. Topic不存在
# 解决:先创建Topic

# 3. 状态目录权限问题
# 解决:chmod 755 /bigdata/streams/state

4.3.2 处理延迟问题

# 问题现象:处理延迟高

# 排查步骤
# 1. 检查消费者延迟
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-streams-demo \
–bootstrap-server 192.168.1.51:9092

# 2. 检查线程数
# 增加num.stream.threads

# 3. 检查状态存储大小
$ du -sh /bigdata/streams/state

# 优化方案
# 1. 增加处理线程
num.stream.threads=4

# 2. 增加缓存大小
cache.max.bytes.buffering=52428800

# 3. 调整提交间隔
commit.interval.ms=100

# 4. 增加实例数量
# 部署更多应用实例

Part05-风哥经验总结与分享

5.1 Streams最佳实践

Kafka Streams最佳实践建议:

# 开发最佳实践
1. 使用唯一的application.id
2. 合理设置num.stream.threads
3. 配置状态存储目录
4. 启用精确一次语义
5. 实现优雅关闭

# 部署最佳实践
1. 监控消费者延迟
2. 监控状态存储大小
3. 配置合理的资源
4. 实现健康检查
5. 制定故障恢复预案

# 性能优化
1. 增加处理线程
2. 增加缓存大小
3. 优化状态存储
4. 调整提交间隔

5.2 开发检查清单

开发检查清单:

Kafka Streams开发检查清单:

  • application.id是否唯一
  • bootstrap.servers是否正确
  • Topic是否存在
  • 状态存储目录是否有权限
  • 处理线程数是否合理
  • 精确一次语义是否启用
  • 健康检查是否实现

5.3 调试工具推荐

Kafka Streams调试工具:

  • kafka-streams-application-reset:重置应用状态
  • Kafka Tool:查看Topic数据
  • JMX:监控应用指标
  • Logs:查看应用日志
  • Interactive Queries:查询状态存储
风哥提示:Kafka Streams是构建实时流处理应用的优秀选择,轻量级、高可用、支持精确一次语义。建议在开发时遵循最佳实践,确保应用稳定可靠。from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息