Cassandra教程FG007-Cassandra增删改查与批量操作实战
本文档风哥主要介绍Cassandra数据库增删改查与批量操作实战,包括CRUD操作概述、批量操作概念、事务特性、插入操作策略、更新操作策略、批量操作策略、插入操作实战、更新操作实战、批量操作实战、订单管理实战、用户管理实战、日志管理实战等内容,风哥教程参考Cassandra官方文档CQL和Data Modeling内容编写,适合DBA人员和开发人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。
Part01-基础概念与理论知识
1.1 Cassandra数据库CRUD操作概述
CRUD是Create(创建)、Read(读取)、Update(更新)、Delete(删除)四种基本数据库操作的缩写。Cassandra数据库的CRUD操作具有其独特的特点,与关系型数据库有显著差异。更多视频教程www.fgedu.net.cn
1.1.1 Cassandra数据库CRUD操作特点
# 1. Create(插入)
– INSERT语句
– 支持TTL自动过期
– 支持IF NOT EXISTS条件
– 插入即更新(Upsert语义)
# 2. Read(查询)
– SELECT语句
– 必须指定分区键
– 支持范围查询
– 支持分页查询
# 3. Update(更新)
– UPDATE语句
– 插入和更新语义相同
– 支持条件更新
– 支持集合操作
# 4. Delete(删除)
– DELETE语句
– 支持删除整行或部分列
– 支持范围删除
– 删除是墓碑标记
# 与关系型数据库差异
1. 无事务回滚
2. 插入即更新
3. 删除是墓碑
4. 查询必须指定分区键
5. 不支持JOIN
1.1.2 Cassandra数据库CRUD操作语法
# INSERT语法
INSERT INTO table_name (column_names)
VALUES (values)
[USING TTL seconds]
[USING TIMESTAMP timestamp]
[IF NOT EXISTS];
# SELECT语法
SELECT [column_names]
FROM table_name
[WHERE where_clause]
[ORDER BY column_name]
[LIMIT n]
[ALLOW FILTERING];
# UPDATE语法
UPDATE table_name
SET column_name = value [, …]
[USING TTL seconds]
[USING TIMESTAMP timestamp]
WHERE where_clause
[IF EXISTS | IF condition];
# DELETE语法
DELETE [column_names]
FROM table_name
[USING TIMESTAMP timestamp]
WHERE where_clause
[IF EXISTS | IF condition];
# 示例
# 插入
INSERT INTO fgedu_users (user_id, user_name, email)
VALUES (uuid(), ‘zhangsan’, ‘zhangsan@fgedu.net.cn’);
# 查询
SELECT * FROM fgedu_users WHERE user_id = ?;
# 更新
UPDATE fgedu_users SET email = ‘new@fgedu.net.cn’ WHERE user_id = ?;
# 删除
DELETE FROM fgedu_users WHERE user_id = ?;
1.2 Cassandra数据库批量操作概念
Cassandra数据库批量操作(BATCH)允许将多个DML语句组合成一个原子操作,保证操作的原子性和隔离性。
1.2.1 Cassandra数据库BATCH操作特点
# 1. 原子性
– 所有操作要么全部成功,要么全部失败
– 保证数据一致性
# 2. 隔离性
– 批量操作对其他操作不可见
– 直到批量操作完成
# 3. 性能优化
– 减少网络往返
– 提高写入效率
# 4. 限制
– 单个BATCH不能超过65535个语句
– 建议单个BATCH不超过100个语句
– 不建议跨分区BATCH
# BATCH语法
BEGIN [LOGGED | UNLOGGED | COUNTER] BATCH
[USING TIMESTAMP timestamp]
statement1;
statement2;
…
APPLY BATCH;
# BATCH类型
1. LOGGED BATCH(默认)
– 保证原子性
– 有额外开销
– 适合跨分区操作
2. UNLOGGED BATCH
– 不保证原子性
– 性能更好
– 适合单分区操作
3. COUNTER BATCH
– 专门用于Counter表
– 保证Counter操作原子性
1.2.2 Cassandra数据库BATCH使用场景
# 适合使用BATCH的场景
1. 同一分区的多个操作
2. 需要保证原子性的操作
3. 多表关联数据写入
4. 索引表数据同步
# 不适合使用BATCH的场景
1. 大量数据导入
2. 跨分区的大批量操作
3. 不需要原子性的操作
# BATCH示例
# 场景1: 用户注册(多表写入)
BEGIN BATCH
INSERT INTO fgedu_users (user_id, user_name, email)
VALUES (?, ‘zhangsan’, ‘zhangsan@fgedu.net.cn’);
INSERT INTO fgedu_users_by_email (email, user_id)
VALUES (‘zhangsan@fgedu.net.cn’, ?);
APPLY BATCH;
# 场景2: 订单创建(多行写入)
BEGIN BATCH
INSERT INTO fgedu_orders (order_id, user_id, amount)
VALUES (?, ?, 1000.00);
INSERT INTO fgedu_order_items (order_id, item_id, quantity)
VALUES (?, ?, 2);
APPLY BATCH;
# 场景3: Counter更新
BEGIN COUNTER BATCH
UPDATE fgedu_user_counters SET login_count = login_count + 1 WHERE user_id = ?;
UPDATE fgedu_user_counters SET total_login_time = total_login_time + 3600 WHERE user_id = ?;
APPLY BATCH;
1.3 Cassandra数据库事务特性
Cassandra数据库的事务特性与传统关系型数据库不同,需要理解其独特的机制。
1.3.1 Cassandra数据库事务特性说明
# 1. 原子性(Atomicity)
– 单分区操作保证原子性
– BATCH操作保证原子性
– 跨分区操作不保证原子性
# 2. 一致性(Consistency)
– 可调一致性级别
– 最终一致性模型
– 支持轻量级事务(LWT)
# 3. 隔离性(Isolation)
– 单行操作隔离
– BATCH操作隔离
– 无传统事务隔离级别
# 4. 持久性(Durability)
– 提交日志保证持久性
– 可配置持久化策略
– 支持批量提交
# 轻量级事务(LWT)
# 使用Paxos协议实现
# 支持条件操作
# 性能开销较大
# LWT语法
INSERT INTO table_name (columns)
VALUES (values)
IF NOT EXISTS;
UPDATE table_name
SET column = value
WHERE condition
IF condition;
# LWT示例
# 条件插入
INSERT INTO fgedu_users (user_id, user_name)
VALUES (?, ‘zhangsan’)
IF NOT EXISTS;
# 条件更新
UPDATE fgedu_users
SET email = ‘new@fgedu.net.cn’
WHERE user_id = ?
IF email = ‘old@fgedu.net.cn’;
1.3.2 Cassandra数据库与关系型数据库事务对比
# 关系型数据库(RDBMS)
– ACID事务
– 支持事务回滚
– 支持多表事务
– 支持隔离级别
– 支持锁机制
# Cassandra
– 单分区原子性
– 无事务回滚
– BATCH保证原子性
– 无隔离级别
– 无锁机制
– 最终一致性
# 选择建议
# 需要强一致性事务的场景
→ 选择关系型数据库
# 需要高可用性和可扩展性的场景
→ 选择Cassandra
# 混合场景
→ 根据业务特点选择合适的数据库
→ 或使用多数据库架构
Part02-生产环境规划与建议
2.1 Cassandra数据库插入操作策略
Cassandra数据库插入操作策略详解:
2.1.1 Cassandra数据库插入操作设计原则
# 1. 批量插入优化
– 使用BATCH减少网络往返
– 控制BATCH大小
– 同一分区优先
# 2. TTL使用策略
– 根据数据生命周期设置TTL
– 避免TTL过期导致墓碑
– 监控TTL过期情况
# 3. 时间戳管理
– 使用客户端时间戳
– 避免时钟不同步
– 处理时间戳冲突
# 4. 一致性级别选择
– 写入一致性级别影响性能
– 根据业务需求选择
– 平衡一致性和性能
# 插入操作最佳实践
1. 使用Prepared语句
2. 合理设置TTL
3. 使用BATCH优化
4. 选择合适的一致性级别
5. 处理插入失败
2.1.2 Cassandra数据库插入性能优化
# 1. 使用Prepared语句
# 好的做法
PreparedStatement prepared = session.prepare(
“INSERT INTO fgedu_users (user_id, user_name, email) VALUES (?, ?, ?)”
);
# 不好的做法
String query = String.format(
“INSERT INTO fgedu_users (user_id, user_name, email) VALUES (%s, ‘%s’, ‘%s’)”,
userId, userName, email
);
# 2. 批量插入
# 使用UNLOGGED BATCH提高性能
BEGIN UNLOGGED BATCH
INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?);
INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?);
INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?);
APPLY BATCH;
# 3. 异步插入
# 使用异步API提高吞吐量
ResultSetFuture future = session.executeAsync(
“INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?)”,
userId, userName
);
# 4. 调整一致性级别
# 降低一致性级别提高性能
CONSISTENCY ANY; — 最快,但可能丢失数据
CONSISTENCY ONE; — 平衡性能和可靠性
CONSISTENCY QUORUM; — 较高可靠性
CONSISTENCY ALL; — 最高可靠性,性能最差
2.2 Cassandra数据库更新操作策略
Cassandra数据库更新操作策略详解:
2.2.1 Cassandra数据库更新操作设计原则
# 1. 理解Upsert语义
– INSERT和UPDATE语义相同
– 不存在的行会自动创建
– 需要处理空值问题
# 2. 条件更新
– 使用IF EXISTS条件
– 使用轻量级事务
– 处理更新失败
# 3. 集合操作
– 使用+/-操作符
– 避免读取-修改-写入模式
– 使用原子操作
# 4. Counter操作
– 使用专门的Counter表
– 使用COUNTER BATCH
– 注意Counter限制
# 更新操作最佳实践
1. 使用条件更新保证正确性
2. 使用原子集合操作
3. 合理使用轻量级事务
4. 处理并发更新
5. 监控更新性能
2.2.2 Cassandra数据库更新性能优化
# 1. 避免读取-修改-写入
# 不好的做法
row = SELECT * FROM fgedu_users WHERE user_id = ?;
newTags = row.tags + {‘new_tag’};
UPDATE fgedu_users SET tags = newTags WHERE user_id = ?;
# 好的做法
UPDATE fgedu_users SET tags = tags + {‘new_tag’} WHERE user_id = ?;
# 2. 使用条件更新
# 避免不必要的更新
UPDATE fgedu_users
SET email = ‘new@fgedu.net.cn’
WHERE user_id = ?
IF email != ‘new@fgedu.net.cn’;
# 3. 批量更新
BEGIN BATCH
UPDATE fgedu_users SET status = ‘ACTIVE’ WHERE user_id = ?;
UPDATE fgedu_user_status SET last_update = toTimestamp(now()) WHERE user_id = ?;
APPLY BATCH;
# 4. 使用TTL
# 设置字段过期时间
UPDATE fgedu_users
SET temp_token = ‘token123’
USING TTL 3600
WHERE user_id = ?;
# 5. 使用时间戳
# 控制更新顺序
UPDATE fgedu_users
USING TIMESTAMP 1234567890
SET email = ‘new@fgedu.net.cn’
WHERE user_id = ?;
2.3 Cassandra数据库批量操作策略
Cassandra数据库批量操作策略详解:
2.3.1 Cassandra数据库批量操作设计原则
# 1. BATCH大小控制
– 单个BATCH不超过100条语句
– 单个BATCH不超过50KB
– 超大批量分批执行
# 2. BATCH类型选择
– 单分区操作: UNLOGGED BATCH
– 跨分区操作: LOGGED BATCH
– Counter操作: COUNTER BATCH
# 3. BATCH使用场景
– 多表关联写入
– 索引表同步
– 原子性要求高的操作
# 4. BATCH性能考虑
– LOGGED BATCH有额外开销
– UNLOGGED BATCH性能更好
– 避免跨分区BATCH
# 批量操作最佳实践
1. 控制BATCH大小
2. 选择合适的BATCH类型
3. 同一分区优先
4. 处理BATCH失败
5. 监控BATCH性能
2.3.2 Cassandra数据库批量操作性能优化
# 1. 使用UNLOGGED BATCH
# 单分区操作
BEGIN UNLOGGED BATCH
INSERT INTO fgedu_logs (log_id, log_time, message) VALUES (?, ?, ?);
INSERT INTO fgedu_logs (log_id, log_time, message) VALUES (?, ?, ?);
INSERT INTO fgedu_logs (log_id, log_time, message) VALUES (?, ?, ?);
APPLY BATCH;
# 2. 分批处理大数据量
#!/bin/bash
# batch_insert.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
BATCH_SIZE=100
TOTAL=10000
for ((i=0; i
for (Statement statement : statements) {
futures.add(session.executeAsync(statement));
}
for (ResultSetFuture future : futures) {
future.get(); // 等待完成
}
# 4. 使用DSBulk工具
# 大数据量导入使用DSBulk
# dsbulk load -k fgedudb -t fgedu_users -url /path/to/data.csv
Part03-生产环境项目实施方案
3.1 Cassandra数据库插入操作实战
Cassandra数据库插入操作实战:
3.1.1 Cassandra数据库基本插入操作
# cqlsh 192.168.1.101 9042 -u fgedu -p Fgedu@2024
Connected to fgedu_cluster at 192.168.1.101:9042
# 切换到Keyspace
cqlsh> USE fgedudb;
cqlsh:fgedudb>
# 基本插入
cqlsh:fgedudb> INSERT INTO fgedu_users
… (user_id, user_name, email, phone, age, created_at)
… VALUES (
… uuid(),
… ‘zhangsan’,
… ‘zhangsan@fgedu.net.cn’,
… ‘13800138000’,
… 25,
… toTimestamp(now())
… );
# 查询验证
cqlsh:fgedudb> SELECT * FROM fgedu_users LIMIT 1;
user_id | age | created_at | email | phone | user_name
————————————–+—–+———————————-+———————-+————-+———–
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | 25 | 2024-01-15 10:30:00.123000+0000 | zhangsan@fgedu.net.cn | 13800138000 | zhangsan
(1 row)
# 使用TTL插入
cqlsh:fgedudb> INSERT INTO fgedu_temp_tokens
… (token_id, user_id, token, created_at)
… VALUES (uuid(), uuid(), ‘temp_token_123’, toTimestamp(now()))
… USING TTL 3600;
# 条件插入
cqlsh:fgedudb> INSERT INTO fgedu_users
… (user_id, user_name, email)
… VALUES (uuid(), ‘lisi’, ‘lisi@fgedu.net.cn’)
… IF NOT EXISTS;
[applied]
———–
True
(1 rows)
# 重复插入(已存在)
cqlsh:fgedudb> INSERT INTO fgedu_users
… (user_id, user_name, email)
… VALUES (8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b, ‘wangwu’, ‘wangwu@fgedu.net.cn’)
… IF NOT EXISTS;
[applied]
———–
False
(1 rows)
# 使用JSON插入
cqlsh:fgedudb> INSERT INTO fgedu_users JSON ‘{
… “user_id”: “uuid()”,
… “user_name”: “wangwu”,
… “email”: “wangwu@fgedu.net.cn”,
… “age”: 30
… }’;
3.1.2 Cassandra数据库批量插入操作
cqlsh:fgedudb> BEGIN UNLOGGED BATCH
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Log message 1’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Log message 2’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Log message 3’);
… APPLY BATCH;
# 批量插入(多表关联)
cqlsh:fgedudb> BEGIN BATCH
… INSERT INTO fgedu_users (user_id, user_name, email)
… VALUES (uuid(), ‘user001’, ‘user001@fgedu.net.cn’);
… INSERT INTO fgedu_users_by_email (email, user_id)
… VALUES (‘user001@fgedu.net.cn’, uuid());
… INSERT INTO fgedu_user_profiles (user_id, nickname, avatar)
… VALUES (uuid(), ‘用户001’, ‘avatar001.jpg’);
… APPLY BATCH;
# 批量插入(使用变量)
cqlsh:fgedudb> PREPARE INSERT INTO fgedu_users (user_id, user_name, email) VALUES (?, ?, ?);
cqlsh:fgedudb> EXECUTE (?, ?, ?) USING CONSISTENCY QUORUM;
# 大批量插入脚本
#!/bin/bash
# bulk_insert.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
KEYSPACE=”fgedudb”
TABLE=”fgedu_users”
BATCH_SIZE=50
TOTAL_RECORDS=1000
echo “开始批量插入 $TOTAL_RECORDS 条记录…”
for ((i=1; i<=TOTAL_RECORDS; i+=BATCH_SIZE)); do
echo "BEGIN UNLOGGED BATCH"
for ((j=0; j Cassandra数据库更新操作实战: # 查询验证 user_id | email | phone | updated_at (1 row) # 条件更新 [applied] (1 rows) # 带条件的轻量级事务 [applied] | email (1 rows) # 使用TTL更新 # 使用时间戳更新 # 删除元素 # 替换整个集合 # 更新LIST列表 # 添加元素到开头 # 删除元素 # 按索引更新 # 更新MAP映射 # 删除键值对 # 更新Counter # 查询Counter user_id | login_count | page_views (1 row)
Cassandra数据库批量操作实战: # 查询验证 # 订单创建BATCH # 用户状态更新BATCH # 同一分区批量操作 # COUNTER BATCH Cassandra数据库订单管理实战案例: # 步骤1: 创建订单(使用BATCH) # 步骤2: 查询订单 order_id | created_at | order_time | status | total_amount | user_id (1 row) # 步骤3: 查询订单明细 order_id | item_id | product_id | product_name | quantity | total_price | unit_price (2 rows)
# 更新订单状态(使用轻量级事务) [applied] | created_at | order_time | paid_at | status | total_amount | updated_at (1 rows) # 记录状态变更日志 # 订单发货 # 订单完成 Cassandra数据库用户管理实战案例: # 步骤1: 检查邮箱是否已存在 user_id (0 rows) # 步骤2: 创建用户(使用BATCH) # 步骤3: 创建激活令牌 # 步骤4: 发送激活邮件(应用层处理) # 更新用户昵称 # 更新用户头像 # 添加用户标签 # 更新用户地址 # 记录用户活动 # 批量更新用户状态 Cassandra数据库日志管理实战案例: # 单条日志写入 # 批量日志写入 # 带TTL的日志写入(自动过期) # 日志写入脚本 LOG_FILE=”/var/log/fgedu/app.log” tail -f $LOG_FILE | while read line; do # 查询最近日志 # 按时间范围查询 # 统计日志数量 log_level | count (4 rows) # 清理过期日志 Cassandra数据库CRUD最佳实践总结: # 1. BATCH大小控制 # 2. BATCH类型选择 # 3. BATCH使用场景 # 4. BATCH注意事项 # 5. 大数据量导入 Cassandra数据库性能优化技巧: # 1. 使用Prepared语句 # 2. 批量写入 # 3. 异步写入 # 4. 调整一致性级别 # 5. 使用Token感知 # 6. 调整批处理大小 # 1. 指定分区键 # 2. 使用分页 # 3. 使用索引 # 4. 避免ALLOW FILTERING # 5. 使用缓存 # 6. 异步读取 # 7. 使用并发读取 本文档详细介绍了Cassandra数据库增删改查与批量操作实战,包括CRUD操作概述、批量操作概念、事务特性、插入操作策略、更新操作策略、批量操作策略、插入操作实战、更新操作实战、批量操作实战、订单管理实战、用户管理实战、日志管理实战、CRUD最佳实践、批量操作最佳实践、性能优化技巧等内容。通过学习本文档,读者可以掌握Cassandra数据库CRUD操作和批量操作技能,为应用开发打下坚实基础。 本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html3.2 Cassandra数据库更新操作实战
3.2.1 Cassandra数据库基本更新操作
cqlsh:fgedudb> UPDATE fgedu_users
… SET email = ‘newemail@fgedu.net.cn’,
… phone = ‘13900139000’,
… updated_at = toTimestamp(now())
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
cqlsh:fgedudb> SELECT user_id, email, phone, updated_at FROM fgedu_users
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
————————————–+———————-+————-+———————————-
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | newemail@fgedu.net.cn | 13900139000 | 2024-01-15 11:00:00.123000+0000
cqlsh:fgedudb> UPDATE fgedu_users
… SET email = ‘updated@fgedu.net.cn’
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b
… IF EXISTS;
———–
True
cqlsh:fgedudb> UPDATE fgedu_users
… SET email = ‘final@fgedu.net.cn’
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b
… IF email = ‘updated@fgedu.net.cn’;
———–+———————-
True | updated@fgedu.net.cn
cqlsh:fgedudb> UPDATE fgedu_users
… SET temp_field = ‘temporary_value’
… USING TTL 3600
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
cqlsh:fgedudb> UPDATE fgedu_users
… USING TIMESTAMP 1705312800000
… SET email = ‘timestamped@fgedu.net.cn’
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
3.2.2 Cassandra数据库集合更新操作
# 添加元素
cqlsh:fgedudb> UPDATE fgedu_users
… SET tags = tags + {‘new_tag1’, ‘new_tag2’}
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
cqlsh:fgedudb> UPDATE fgedu_users
… SET tags = tags – {‘old_tag’}
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
cqlsh:fgedudb> UPDATE fgedu_users
… SET tags = {‘tag1’, ‘tag2’, ‘tag3’}
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
# 添加元素到末尾
cqlsh:fgedudb> UPDATE fgedu_users
… SET items = items + [‘item1’, ‘item2’]
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
cqlsh:fgedudb> UPDATE fgedu_users
… SET items = [‘item0’] + items
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
cqlsh:fgedudb> UPDATE fgedu_users
… SET items = items – [‘item1’]
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
cqlsh:fgedudb> UPDATE fgedu_users
… SET items[0] = ‘updated_item’
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
# 添加键值对
cqlsh:fgedudb> UPDATE fgedu_users
… SET attributes = attributes + {‘key1’: ‘value1’, ‘key2’: ‘value2’}
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
cqlsh:fgedudb> DELETE attributes[‘key1’] FROM fgedu_users
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
cqlsh:fgedudb> UPDATE fgedu_user_counters
… SET login_count = login_count + 1,
… page_views = page_views + 10
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
cqlsh:fgedudb> SELECT * FROM fgedu_user_counters
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
————————————–+————-+————
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | 15 | 1503.3 Cassandra数据库批量操作实战
3.3.1 Cassandra数据库LOGGED BATCH操作
cqlsh:fgedudb> BEGIN LOGGED BATCH
… INSERT INTO fgedu_users (user_id, user_name, email)
… VALUES (uuid(), ‘batch_user1’, ‘batch1@fgedu.net.cn’);
… INSERT INTO fgedu_users_by_email (email, user_id)
… VALUES (‘batch1@fgedu.net.cn’, uuid());
… UPDATE fgedu_user_counters
… SET total_users = total_users + 1
… WHERE counter_id = ‘global’;
… APPLY BATCH;
cqlsh:fgedudb> SELECT * FROM fgedu_users WHERE user_name = ‘batch_user1’;
cqlsh:fgedudb> BEGIN BATCH
… INSERT INTO fgedu_orders (order_id, user_id, order_time, amount, status)
… VALUES (uuid(), uuid(), toTimestamp(now()), 9999.00, ‘CREATED’);
… INSERT INTO fgedu_order_items (order_id, item_id, product_name, quantity, price)
… VALUES (uuid(), uuid(), ‘iPhone 15 Pro’, 1, 9999.00);
… UPDATE fgedu_user_counters
… SET order_count = order_count + 1
… WHERE user_id = ?;
… APPLY BATCH;
cqlsh:fgedudb> BEGIN BATCH
… UPDATE fgedu_users
… SET status = ‘ACTIVE’, updated_at = toTimestamp(now())
… WHERE user_id = ?;
… INSERT INTO fgedu_user_activities (user_id, activity_time, activity_type)
… VALUES (?, toTimestamp(now()), ‘ACCOUNT_ACTIVATED’);
… APPLY BATCH;
3.3.2 Cassandra数据库UNLOGGED BATCH操作
cqlsh:fgedudb> BEGIN UNLOGGED BATCH
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Batch log 1’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Batch log 2’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Batch log 3’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Batch log 4’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Batch log 5’);
… APPLY BATCH;
cqlsh:fgedudb> BEGIN UNLOGGED BATCH
… INSERT INTO fgedu_user_messages (user_id, message_time, sender_id, content)
… VALUES (?, ‘2024-01-15 10:00:00’, ?, ‘Message 1’);
… INSERT INTO fgedu_user_messages (user_id, message_time, sender_id, content)
… VALUES (?, ‘2024-01-15 10:01:00’, ?, ‘Message 2’);
… INSERT INTO fgedu_user_messages (user_id, message_time, sender_id, content)
… VALUES (?, ‘2024-01-15 10:02:00’, ?, ‘Message 3’);
… APPLY BATCH;
cqlsh:fgedudb> BEGIN COUNTER BATCH
… UPDATE fgedu_user_counters
… SET login_count = login_count + 1
… WHERE user_id = ?;
… UPDATE fgedu_user_counters
… SET session_count = session_count + 1
… WHERE user_id = ?;
… UPDATE fgedu_global_counters
… SET total_logins = total_logins + 1
… WHERE counter_id = ‘global’;
… APPLY BATCH;
Part04-生产案例与实战讲解
4.1 Cassandra数据库订单管理实战
4.1.1 Cassandra数据库订单创建实战
cqlsh:fgedudb> BEGIN BATCH
… — 插入订单主表
… INSERT INTO fgedu_orders (
… order_id, user_id, order_time,
… total_amount, status, created_at
… ) VALUES (
… 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b,
… 9f7g8h9i-0j1k-2l3m-4n5o-6p7q8r9s0t1u,
… toTimestamp(now()),
… 15998.00,
… ‘CREATED’,
… toTimestamp(now())
… );
… — 插入订单明细
… INSERT INTO fgedu_order_items (
… order_id, item_id, product_id,
… product_name, quantity, unit_price, total_price
… ) VALUES (
… 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b,
… uuid(),
… uuid(),
… ‘iPhone 15 Pro Max’,
… 1,
… 9999.00,
… 9999.00
… );
… INSERT INTO fgedu_order_items (
… order_id, item_id, product_id,
… product_name, quantity, unit_price, total_price
… ) VALUES (
… 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b,
… uuid(),
… uuid(),
… ‘AirPods Pro’,
… 1,
… 1999.00,
… 1999.00
… );
… — 更新用户订单计数
… UPDATE fgedu_user_counters
… SET order_count = order_count + 1
… WHERE user_id = 9f7g8h9i-0j1k-2l3m-4n5o-6p7q8r9s0t1u;
… APPLY BATCH;
cqlsh:fgedudb> SELECT * FROM fgedu_orders
… WHERE order_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
————————————–+———————————-+———————————-+———+————–+————————————–
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | 2024-01-15 10:30:00.123000+0000 | 2024-01-15 10:30:00.123000+0000 | CREATED | 15998.00 | 9f7g8h9i-0j1k-2l3m-4n5o-6p7q8r9s0t1u
cqlsh:fgedudb> SELECT * FROM fgedu_order_items
… WHERE order_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;
————————————–+————————————–+————————————–+——————+———-+————-+————
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | a1b2c3d4-e5f6-7890-abcd-ef1234567890 | b2c3d4e5-f6a7-8901-bcde-f12345678901 | iPhone 15 Pro Max | 1 | 9999.00 | 9999.00
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | c3d4e5f6-a7b8-9012-cdef-123456789012 | d4e5f6a7-b8c9-0123-def1-234567890123 | AirPods Pro | 1 | 1999.00 | 1999.004.1.2 Cassandra数据库订单状态更新实战
cqlsh:fgedudb> UPDATE fgedu_orders
… SET status = ‘PAID’,
… paid_at = toTimestamp(now()),
… updated_at = toTimestamp(now())
… WHERE order_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b
… IF status = ‘CREATED’;
———–+———————————-+———————————-+———+———+————–+———————————-
True | 2024-01-15 10:30:00.123000+0000 | 2024-01-15 10:30:00.123000+0000 | null | CREATED | 15998.00 | 2024-01-15 10:30:00.123000+0000
cqlsh:fgedudb> INSERT INTO fgedu_order_status_log
… (order_id, status_time, old_status, new_status, operator)
… VALUES (
… 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b,
… toTimestamp(now()),
… ‘CREATED’,
… ‘PAID’,
… ‘system’
… );
cqlsh:fgedudb> BEGIN BATCH
… UPDATE fgedu_orders
… SET status = ‘SHIPPED’,
… shipped_at = toTimestamp(now()),
… tracking_number = ‘SF1234567890’
… WHERE order_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b
… IF status = ‘PAID’;
… INSERT INTO fgedu_order_status_log
… (order_id, status_time, old_status, new_status, operator)
… VALUES (
… 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b,
… toTimestamp(now()),
… ‘PAID’,
… ‘SHIPPED’,
… ‘warehouse’
… );
… APPLY BATCH;
cqlsh:fgedudb> BEGIN BATCH
… UPDATE fgedu_orders
… SET status = ‘COMPLETED’,
… completed_at = toTimestamp(now())
… WHERE order_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b
… IF status = ‘SHIPPED’;
… UPDATE fgedu_user_counters
… SET completed_orders = completed_orders + 1
… WHERE user_id = ?;
… APPLY BATCH;
4.2 Cassandra数据库用户管理实战
4.2.1 Cassandra数据库用户注册实战
cqlsh:fgedudb> SELECT user_id FROM fgedu_users_by_email
… WHERE email = ‘newuser@fgedu.net.cn’;
———
0
cqlsh:fgedudb> BEGIN BATCH
… — 插入用户主表
… INSERT INTO fgedu_users (
… user_id, user_name, email, password_hash,
… nickname, status, created_at, updated_at
… ) VALUES (
… uuid(),
… ‘newuser’,
… ‘newuser@fgedu.net.cn’,
… ‘hashed_password_123’,
… ‘新用户’,
… ‘PENDING_ACTIVATION’,
… toTimestamp(now()),
… toTimestamp(now())
… );
… — 插入邮箱索引表
… INSERT INTO fgedu_users_by_email (email, user_id)
… VALUES (‘newuser@fgedu.net.cn’, uuid());
… — 插入用户名索引表
… INSERT INTO fgedu_users_by_name (user_name, user_id)
… VALUES (‘newuser’, uuid());
… — 初始化用户计数器
… INSERT INTO fgedu_user_counters (user_id, login_count, order_count)
… VALUES (uuid(), 0, 0);
… APPLY BATCH;
cqlsh:fgedudb> INSERT INTO fgedu_activation_tokens
… (token, user_id, email, created_at)
… VALUES (
… ‘activation_token_abc123’,
… uuid(),
… ‘newuser@fgedu.net.cn’,
… toTimestamp(now())
… )
… USING TTL 86400; — 24小时后过期
# 发送邮件到 newuser@fgedu.net.cn
4.2.2 Cassandra数据库用户信息更新实战
cqlsh:fgedudb> UPDATE fgedu_users
… SET nickname = ‘新昵称’,
… updated_at = toTimestamp(now())
… WHERE user_id = ?;
cqlsh:fgedudb> UPDATE fgedu_users
… SET avatar = ‘https://cdn.fgedu.net.cn/avatar/new_avatar.jpg’,
… updated_at = toTimestamp(now())
… WHERE user_id = ?;
cqlsh:fgedudb> UPDATE fgedu_users
… SET tags = tags + {‘VIP’, ‘活跃用户’}
… WHERE user_id = ?;
cqlsh:fgedudb> UPDATE fgedu_users
… SET address = {
… province: ‘北京’,
… city: ‘北京市’,
… street: ‘朝阳区xxx街道’,
… zip_code: ‘100000’
… }
… WHERE user_id = ?;
cqlsh:fgedudb> INSERT INTO fgedu_user_activities
… (user_id, activity_time, activity_type, activity_data)
… VALUES (
… ?,
… toTimestamp(now()),
… ‘PROFILE_UPDATED’,
… ‘{“fields”: [“nickname”, “avatar”]}’
… );
cqlsh:fgedudb> BEGIN BATCH
… UPDATE fgedu_users
… SET status = ‘ACTIVE’,
… activated_at = toTimestamp(now())
… WHERE user_id = ?;
… DELETE FROM fgedu_activation_tokens
… WHERE token = ‘activation_token_abc123’;
… UPDATE fgedu_global_counters
… SET active_users = active_users + 1
… WHERE counter_id = ‘global’;
… APPLY BATCH;
4.3 Cassandra数据库日志管理实战
4.3.1 Cassandra数据库日志写入实战
cqlsh:fgedudb> INSERT INTO fgedu_logs
… (log_id, log_time, log_level, logger, message, stack_trace)
… VALUES (
… uuid(),
… toTimestamp(now()),
… ‘ERROR’,
… ‘com.fgedu.service.UserService’,
… ‘用户登录失败: 用户名或密码错误’,
… ‘java.lang.Exception: Login failed\n\tat com.fgedu.service.UserService.login(UserService.java:123)’
… );
cqlsh:fgedudb> BEGIN UNLOGGED BATCH
… INSERT INTO fgedu_logs (log_id, log_time, log_level, logger, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘com.fgedu.App’, ‘应用启动’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, logger, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘com.fgedu.App’, ‘数据库连接成功’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, logger, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘com.fgedu.App’, ‘缓存初始化完成’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, logger, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘com.fgedu.App’, ‘应用启动完成’);
… APPLY BATCH;
cqlsh:fgedudb> INSERT INTO fgedu_logs
… (log_id, log_time, log_level, logger, message)
… VALUES (uuid(), toTimestamp(now()), ‘DEBUG’, ‘com.fgedu.App’, ‘调试日志’)
… USING TTL 604800; — 7天后自动删除
#!/bin/bash
# log_writer.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
CQLSH=”cqlsh 192.168.1.101 9042 -u fgedu -p Fgedu@2024″
# 解析日志级别
LOG_LEVEL=$(echo $line | grep -oP ‘(?<=\[)(ERROR|WARN|INFO|DEBUG)(?=\])' | head -1)
[ -z "$LOG_LEVEL" ] && LOG_LEVEL="INFO"
# 写入Cassandra
$CQLSH -e "INSERT INTO fgedudb.fgedu_logs (log_id, log_time, log_level, message)
VALUES (uuid(), toTimestamp(now()), '$LOG_LEVEL', '$(echo $line | sed "s/'/''/g")')"
done
4.3.2 Cassandra数据库日志查询实战
cqlsh:fgedudb> SELECT * FROM fgedu_logs
… LIMIT 10;
cqlsh:fgedudb> SELECT * FROM fgedu_logs
… WHERE log_date = ‘2024-01-15’
… AND log_time >= ‘2024-01-15 10:00:00’
… AND log_time < '2024-01-15 11:00:00';
# 查询错误日志
cqlsh:fgedudb> SELECT * FROM fgedu_logs
… WHERE log_date = ‘2024-01-15’
… AND log_level = ‘ERROR’;
cqlsh:fgedudb> SELECT log_level, COUNT(*)
… FROM fgedu_logs
… WHERE log_date = ‘2024-01-15’
… GROUP BY log_level;
———–+——-
ERROR | 15
INFO | 1234
DEBUG | 5678
WARN | 23
cqlsh:fgedudb> DELETE FROM fgedu_logs
… WHERE log_date < '2024-01-01';
Part05-风哥经验总结与分享
5.1 Cassandra数据库CRUD最佳实践
5.2 Cassandra数据库批量操作最佳实践
– 单个BATCH不超过100条语句
– 单个BATCH不超过50KB
– 超大批量分批执行
– 单分区操作: UNLOGGED BATCH
– 跨分区操作: LOGGED BATCH
– Counter操作: COUNTER BATCH
– 多表关联写入
– 索引表同步
– 原子性要求高的操作
– 避免跨分区的大批量操作
– 监控BATCH性能
– 处理BATCH失败
– 使用异步BATCH
– 使用DSBulk工具
– 分批导入
– 监控导入进度
– 验证导入结果
5.3 Cassandra数据库性能优化技巧
5.3.1 Cassandra数据库写入优化
PreparedStatement prepared = session.prepare(
“INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?)”
);
BEGIN UNLOGGED BATCH
INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?);
INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?);
APPLY BATCH;
ResultSetFuture future = session.executeAsync(statement);
CONSISTENCY ANY; — 最快
CONSISTENCY ONE; — 平衡
# 将数据写入正确的节点
TokenAwarePolicy policy = new TokenAwarePolicy(
new DCAwareRoundRobinPolicy()
);
# 在cassandra.yaml中配置
batch_size_warn_threshold_in_kb: 64
batch_size_fail_threshold_in_kb: 128
5.3.2 Cassandra数据库读取优化
SELECT * FROM fgedu_users WHERE user_id = ?;
SELECT * FROM fgedu_users WHERE user_id = ? LIMIT 100;
CREATE INDEX idx_email ON fgedu_users (email);
# 重新设计表结构
# 调整缓存配置
key_cache_size_in_mb: 100
row_cache_size_in_mb: 0
ResultSetFuture future = session.executeAsync(statement);
# 同时读取多个分区
List
for (UUID userId : userIds) {
futures.add(session.executeAsync(
“SELECT * FROM fgedu_users WHERE user_id = ?”, userId
));
}
