本文档风哥主要介绍Redis Stream类型的消息流实战,包括Stream类型概念、Stream操作命令、Stream使用场景、Stream规划、消费者组、性能考虑、基础操作、高级操作、消费者组操作、过期操作以及实战案例等内容,风哥教程参考Redis官方文档Data types guide等内容编写,适合DBA人员和开发人员在生产环境中使用。
Part01-基础概念与理论知识
1.1 Stream类型概念
Redis Stream是一种用于存储和处理消息流的数据结构,它支持消息的持久化、消费组、消息确认等功能。Stream类型的特点:
- 持久化:消息会持久化到磁盘,确保数据不丢失
- 消费组:支持多个消费者组,每个组可以独立消费消息
- 消息确认:支持消息的确认机制,确保消息被正确处理
- 消息ID:每个消息都有唯一的ID,格式为时间戳-序列号
- 范围查询:支持根据消息ID进行范围查询
1.2 Stream操作命令
## 1. 基础命令
– XADD:添加消息到Stream
– XLEN:获取Stream的消息数量
– XREAD:读取Stream中的消息
– XRANGE:获取指定范围的消息
– XREVRANGE:反向获取指定范围的消息
– XDEL:删除Stream中的消息
– XTRIM:修剪Stream,保留指定数量的消息
## 2. 消费者组命令
– XGROUP CREATE:创建消费者组
– XGROUP DESTROY:销毁消费者组
– XGROUP CREATECONSUMER:创建消费者
– XGROUP DELCONSUMER:删除消费者
– XREADGROUP:从消费者组读取消息
– XACK:确认消息已处理
– XPENDING:查看待处理的消息
– XCLAIM:转移消息所有权
## 3. 过期命令
– EXPIRE:设置Stream的过期时间
– TTL:查看Stream的剩余过期时间
– PERSIST:移除Stream的过期时间
1.3 Stream使用场景
Stream类型的使用场景:
- 消息队列:实现可靠的消息队列
- 事件流:处理实时事件流
- 日志处理:存储和处理日志数据
- 实时处理:实时数据处理和分析
- 监控系统:收集和处理监控数据
- IoT数据:处理物联网设备产生的数据
更多视频教程www.fgedu.net.cn
Part02-生产环境规划与建议
2.1 Stream规划
生产环境Stream规划:
- Stream命名规范:使用业务前缀+Stream类型的方式,如order:events
- 消息大小:控制消息大小,避免过大的消息
- 消息保留:根据业务需求设置合理的消息保留策略
- 消费者组:根据业务需求设计合理的消费者组结构
- 过期时间:根据数据生命周期设置合理的过期时间
2.2 消费者组
- 组内消费者:一个消费者组可以有多个消费者,共同处理组内的消息
- 消息分配:消息会均匀分配给组内的消费者
- 消息确认:消费者处理完消息后需要确认,否则消息会重新分配
- 消费位置:每个消费者组维护自己的消费位置
- 待处理消息:未确认的消息会被标记为待处理
2.3 性能考虑
## 1. 操作性能
– XADD:O(1)
– XLEN:O(1)
– XREAD:O(N)
– XRANGE:O(N)
– XREADGROUP:O(N)
– XACK:O(1)
## 2. 性能优化
– 合理控制Stream的大小
– 优化消费者组的数量和消费者数量
– 及时确认处理完的消息
– 避免长时间的阻塞读取
– 使用管道批量执行操作
## 3. 最佳实践
– 对于大Stream,使用XTRIM定期修剪
– 合理设置消费者组的数量
– 监控消费者的处理速度
– 定期清理过期的Stream数据
学习交流加群风哥QQ113257174
Part03-生产环境项目实施方案
3.1 基础操作
## 1. 添加消息到Stream
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd order:events * type order status created user_id 1001 order_id 2001
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd order:events * type order status paid user_id 1001 order_id 2001
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd order:events * type order status shipped user_id 1001 order_id 2001
# 输出示例
“1704067200000-0”
“1704067201000-0”
“1704067202000-0”
## 2. 获取Stream的消息数量
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xlen order:events
# 输出示例
(integer) 3
## 3. 读取Stream中的消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xread count 2 streams order:events 0
# 输出示例
1) 1) “order:events”
2) 1) 1) “1704067200000-0”
2) 1) “type”
2) “order”
3) “status”
4) “created”
5) “user_id”
6) “1001”
7) “order_id”
8) “2001”
2) 1) “1704067201000-0”
2) 1) “type”
2) “order”
3) “status”
4) “paid”
5) “user_id”
6) “1001”
7) “order_id”
8) “2001”
## 4. 获取指定范围的消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xrange order:events – +
# 输出示例
1) 1) “1704067200000-0”
2) 1) “type”
2) “order”
3) “status”
4) “created”
5) “user_id”
6) “1001”
7) “order_id”
8) “2001”
2) 1) “1704067201000-0”
2) 1) “type”
2) “order”
3) “status”
4) “paid”
5) “user_id”
6) “1001”
7) “order_id”
8) “2001”
3) 1) “1704067202000-0”
2) 1) “type”
2) “order”
3) “status”
4) “shipped”
5) “user_id”
6) “1001”
7) “order_id”
8) “2001”
## 5. 删除Stream中的消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xdel order:events 1704067200000-0
# 输出示例
(integer) 1
## 6. 修剪Stream
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xtrim order:events MAXLEN 2
# 输出示例
(integer) 1
## 7. 查看修剪后的消息数量
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xlen order:events
# 输出示例
(integer) 2
3.2 高级操作
## 1. 反向获取指定范围的消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xrevrange order:events + –
# 输出示例
1) 1) “1704067202000-0”
2) 1) “type”
2) “order”
3) “status”
4) “shipped”
5) “user_id”
6) “1001”
7) “order_id”
8) “2001”
2) 1) “1704067201000-0”
2) 1) “type”
2) “order”
3) “status”
4) “paid”
5) “user_id”
6) “1001”
7) “order_id”
8) “2001”
## 2. 阻塞读取消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xread block 0 count 1 streams order:events $
# 输出示例(需要等待新消息)
## 3. 批量添加消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd order:events * type order status created user_id 1002 order_id 2002
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd order:events * type order status paid user_id 1002 order_id 2002
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd order:events * type order status shipped user_id 1002 order_id 2002
# 输出示例
“1704067203000-0”
“1704067204000-0”
“1704067205000-0”
## 4. 查看Stream的消息数量
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xlen order:events
# 输出示例
(integer) 5
3.3 消费者组操作
## 1. 创建消费者组
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xgroup create order:events group1 0
# 输出示例
OK
## 2. 从消费者组读取消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xreadgroup group group1 consumer1 count 2 streams order:events >
# 输出示例
1) 1) “order:events”
2) 1) 1) “1704067201000-0”
2) 1) “type”
2) “order”
3) “status”
4) “paid”
5) “user_id”
6) “1001”
7) “order_id”
8) “2001”
2) 1) “1704067202000-0”
2) 1) “type”
2) “order”
3) “status”
4) “shipped”
5) “user_id”
6) “1001”
7) “order_id”
8) “2001”
## 3. 确认消息已处理
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xack order:events group1 1704067201000-0 1704067202000-0
# 输出示例
(integer) 2
## 4. 查看待处理的消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xpending order:events group1
# 输出示例
1) (integer) 0
2) “-”
3) “+”
4) (empty list or set)
## 5. 从消费者组读取新消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xreadgroup group group1 consumer1 count 2 streams order:events >
# 输出示例
1) 1) “order:events”
2) 1) 1) “1704067203000-0”
2) 1) “type”
2) “order”
3) “status”
4) “created”
5) “user_id”
6) “1002”
7) “order_id”
8) “2002”
2) 1) “1704067204000-0”
2) 1) “type”
2) “order”
3) “status”
4) “paid”
5) “user_id”
6) “1002”
7) “order_id”
8) “2002”
## 6. 确认新消息已处理
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xack order:events group1 1704067203000-0 1704067204000-0
# 输出示例
(integer) 2
## 7. 销毁消费者组
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xgroup destroy order:events group1
# 输出示例
(integer) 1
3.4 过期操作
## 1. 设置Stream的过期时间(秒)
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd temp:events * type temp value 123
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 expire temp:events 300
# 输出示例
“1704067206000-0”
(integer) 1
## 2. 查看Stream的剩余过期时间(秒)
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 ttl temp:events
# 输出示例
(integer) 299
## 3. 移除Stream的过期时间
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 persist temp:events
# 输出示例
(integer) 1
## 4. 为Stream设置不同的过期时间
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 expire order:events 604800
# 输出示例
(integer) 1
风哥提示:Redis接口限流是保护系统的重要机制,合理的限流策略可以防止系统过载,确保系统的稳定性和可用性。在实际应用中,需要根据具体业务场景和数据特点,选择合适的限流算法和策略。
Part04-生产案例与实战讲解
4.1 消息队列
## 1. 创建消息队列
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd queue:tasks * type task action process user_id 1001 task_id 3001
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd queue:tasks * type task action process user_id 1002 task_id 3002
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd queue:tasks * type task action process user_id 1003 task_id 3003
# 输出示例
“1704067207000-0”
“1704067208000-0”
“1704067209000-0”
## 2. 创建消费者组
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xgroup create queue:tasks workers 0
# 输出示例
OK
## 3. 消费者1处理消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xreadgroup group workers consumer1 count 1 streams queue:tasks >
# 输出示例
1) 1) “queue:tasks”
2) 1) 1) “1704067207000-0”
2) 1) “type”
2) “task”
3) “action”
4) “process”
5) “user_id”
6) “1001”
7) “task_id”
8) “3001”
## 4. 消费者1确认消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xack queue:tasks workers 1704067207000-0
# 输出示例
(integer) 1
## 5. 消费者2处理消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xreadgroup group workers consumer2 count 1 streams queue:tasks >
# 输出示例
1) 1) “queue:tasks”
2) 1) 1) “1704067208000-0”
2) 1) “type”
2) “task”
3) “action”
4) “process”
5) “user_id”
6) “1002”
7) “task_id”
8) “3002”
## 6. 消费者2确认消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xack queue:tasks workers 1704067208000-0
# 输出示例
(integer) 1
## 7. 查看待处理的消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xpending queue:tasks workers
# 输出示例
1) (integer) 1
2) “1704067209000-0”
3) “1704067209000-0”
4) 1) 1) “consumer1”
2) “1”
## 8. 消费者1处理剩余消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xreadgroup group workers consumer1 count 1 streams queue:tasks >
# 输出示例
1) 1) “queue:tasks”
2) 1) 1) “1704067209000-0”
2) 1) “type”
2) “task”
3) “action”
4) “process”
5) “user_id”
6) “1003”
7) “task_id”
8) “3003”
## 9. 消费者1确认消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xack queue:tasks workers 1704067209000-0
# 输出示例
(integer) 1
## 10. 查看待处理的消息
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xpending queue:tasks workers
# 输出示例
1) (integer) 0
2) “-”
3) “+”
4) (empty list or set)
4.2 事件流
## 1. 记录用户事件
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd user:events * type login user_id 1001 timestamp 1704067200
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd user:events * type logout user_id 1001 timestamp 1704067260
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd user:events * type login user_id 1002 timestamp 1704067320
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd user:events * type logout user_id 1002 timestamp 1704067380
# 输出示例
“1704067210000-0”
“1704067211000-0”
“1704067212000-0”
“1704067213000-0”
## 2. 创建消费者组
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xgroup create user:events analytics 0
# 输出示例
OK
## 3. 读取并处理事件
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xreadgroup group analytics consumer1 count 4 streams user:events >
# 输出示例
1) 1) “user:events”
2) 1) 1) “1704067210000-0”
2) 1) “type”
2) “login”
3) “user_id”
4) “1001”
5) “timestamp”
6) “1704067200”
2) 1) “1704067211000-0”
2) 1) “type”
2) “logout”
3) “user_id”
4) “1001”
5) “timestamp”
6) “1704067260”
3) 1) “1704067212000-0”
2) 1) “type”
2) “login”
3) “user_id”
4) “1002”
5) “timestamp”
6) “1704067320”
4) 1) “1704067213000-0”
2) 1) “type”
2) “logout”
3) “user_id”
4) “1002”
5) “timestamp”
6) “1704067380”
## 4. 确认事件已处理
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xack user:events analytics 1704067210000-0 1704067211000-0 1704067212000-0 1704067213000-0
# 输出示例
(integer) 4
## 5. 修剪事件流
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xtrim user:events MAXLEN 1000
# 输出示例
(integer) 0
4.3 日志处理
## 1. 记录系统日志
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd system:logs * level info message “System started” timestamp 1704067200
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd system:logs * level warning message “Disk usage high” timestamp 1704067260
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd system:logs * level error message “Connection failed” timestamp 1704067320
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd system:logs * level info message “Service restarted” timestamp 1704067380
# 输出示例
“1704067214000-0”
“1704067215000-0”
“1704067216000-0”
“1704067217000-0”
## 2. 创建消费者组
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xgroup create system:logs log_processor 0
# 输出示例
OK
## 3. 读取并处理日志
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xreadgroup group log_processor consumer1 count 4 streams system:logs >
# 输出示例
1) 1) “system:logs”
2) 1) 1) “1704067214000-0”
2) 1) “level”
2) “info”
3) “message”
4) “System started”
5) “timestamp”
6) “1704067200”
2) 1) “1704067215000-0”
2) 1) “level”
2) “warning”
3) “message”
4) “Disk usage high”
5) “timestamp”
6) “1704067260”
3) 1) “1704067216000-0”
2) 1) “level”
2) “error”
3) “message”
4) “Connection failed”
5) “timestamp”
6) “1704067320”
4) 1) “1704067217000-0”
2) 1) “level”
2) “info”
3) “message”
4) “Service restarted”
5) “timestamp”
6) “1704067380”
## 4. 确认日志已处理
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xack system:logs log_processor 1704067214000-0 1704067215000-0 1704067216000-0 1704067217000-0
# 输出示例
(integer) 4
## 5. 设置日志流过期时间
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 expire system:logs 86400
# 输出示例
(integer) 1
4.4 实时处理
## 1. 记录实时数据
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd sensor:data * sensor_id 4001 value 25.5 timestamp 1704067200
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd sensor:data * sensor_id 4001 value 25.6 timestamp 1704067260
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd sensor:data * sensor_id 4001 value 25.7 timestamp 1704067320
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xadd sensor:data * sensor_id 4001 value 25.8 timestamp 1704067380
# 输出示例
“1704067218000-0”
“1704067219000-0”
“1704067220000-0”
“1704067221000-0”
## 2. 创建消费者组
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xgroup create sensor:data data_processor 0
# 输出示例
OK
## 3. 读取并处理实时数据
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xreadgroup group data_processor consumer1 count 4 streams sensor:data >
# 输出示例
1) 1) “sensor:data”
2) 1) 1) “1704067218000-0”
2) 1) “sensor_id”
2) “4001”
3) “value”
4) “25.5”
5) “timestamp”
6) “1704067200”
2) 1) “1704067219000-0”
2) 1) “sensor_id”
2) “4001”
3) “value”
4) “25.6”
5) “timestamp”
6) “1704067260”
3) 1) “1704067220000-0”
2) 1) “sensor_id”
2) “4001”
3) “value”
4) “25.7”
5) “timestamp”
6) “1704067320”
4) 1) “1704067221000-0”
2) 1) “sensor_id”
2) “4001”
3) “value”
4) “25.8”
5) “timestamp”
6) “1704067380”
## 4. 确认数据已处理
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xack sensor:data data_processor 1704067218000-0 1704067219000-0 1704067220000-0 1704067221000-0
# 输出示例
(integer) 4
## 5. 修剪数据流
$ /redis/app/bin/redis-cli -h 192.168.1.100 -p 6379 -a fgedu@2026 xtrim sensor:data MAXLEN 100
# 输出示例
(integer) 0
更多学习教程公众号风哥教程itpux_com
Part05-风哥经验总结与分享
5.1 最佳实践
Redis Stream消息流实战最佳实践:
- Stream命名规范:使用业务前缀+Stream类型的方式,如order:events,学习交流加群风哥微信: itpux-com
- 消息大小:控制消息大小,避免过大的消息
- 合理设置过期时间:根据数据生命周期设置合适的过期时间
- 消费者组:根据业务需求设计合理的消费者组结构
- 消息确认:及时确认处理完的消息
- 定期修剪:使用XTRIM定期修剪Stream,避免过大
5.2 常见问题
- 消息堆积:及时处理消息,避免消息堆积
- 消费者组阻塞:合理设置阻塞时间,避免长时间阻塞
- 内存使用:定期修剪Stream,控制内存使用
- 消息丢失:使用消费者组和消息确认机制确保消息不丢失
- 性能问题:优化消费者数量和处理速度
5.3 优化技巧
## 1. Stream设计优化
– 使用简短的Stream名称
– 合理设计消息结构,避免过大的消息
– 为不同的业务场景设计不同的Stream
– 考虑数据的生命周期,设置合理的过期时间
## 2. 操作优化
– 使用XADD批量添加消息
– 优化XREAD和XREADGROUP操作,合理设置count参数
– 及时确认处理完的消息
– 使用管道批量执行操作
## 3. 性能优化
– 监控Stream的大小,及时修剪
– 优化消费者组的数量和消费者数量
– 避免长时间的阻塞读取
– 定期清理过期的Stream数据
## 4. 内存优化
– 合理控制Stream的大小
– 设置合理的过期时间,自动清理过期数据
– 选择合适的内存淘汰策略
## 5. 监控优化
– 监控Stream的大小和内存使用
– 监控消费者的处理速度和消息堆积情况
– 设置合理的告警机制
– 定期分析Stream的使用情况
通过本文档的学习,您应该掌握了Redis Stream类型的消息流实战,能够在生产环境中合理使用Stream类型实现消息队列、事件流、日志处理、实时处理等功能。在实际应用中,需要根据具体业务场景选择合适的操作命令和Stream设计,确保系统的可靠性和性能。
风哥提示:Redis Stream是一种强大的消息流数据结构,适合实现可靠的消息队列、事件流处理、日志处理等功能。在实际应用中,需要根据业务需求合理设计Stream结构和消费者组策略,确保系统的可靠性和性能。
from Redis视频:www.itpux.com
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
