1. 首页 > Cassandra教程 > 正文

Cassandra教程FG007-Cassandra增删改查与批量操作实战

本文档风哥主要介绍Cassandra数据库增删改查与批量操作实战,包括CRUD操作概述、批量操作概念、事务特性、插入操作策略、更新操作策略、批量操作策略、插入操作实战、更新操作实战、批量操作实战、订单管理实战、用户管理实战、日志管理实战等内容,风哥教程参考Cassandra官方文档CQL和Data Modeling内容编写,适合DBA人员和开发人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。

Part01-基础概念与理论知识

1.1 Cassandra数据库CRUD操作概述

CRUD是Create(创建)、Read(读取)、Update(更新)、Delete(删除)四种基本数据库操作的缩写。Cassandra数据库的CRUD操作具有其独特的特点,与关系型数据库有显著差异。更多视频教程www.fgedu.net.cn

1.1.1 Cassandra数据库CRUD操作特点

# Cassandra CRUD操作特点

# 1. Create(插入)
– INSERT语句
– 支持TTL自动过期
– 支持IF NOT EXISTS条件
– 插入即更新(Upsert语义)

# 2. Read(查询)
– SELECT语句
– 必须指定分区键
– 支持范围查询
– 支持分页查询

# 3. Update(更新)
– UPDATE语句
– 插入和更新语义相同
– 支持条件更新
– 支持集合操作

# 4. Delete(删除)
– DELETE语句
– 支持删除整行或部分列
– 支持范围删除
– 删除是墓碑标记

# 与关系型数据库差异
1. 无事务回滚
2. 插入即更新
3. 删除是墓碑
4. 查询必须指定分区键
5. 不支持JOIN

1.1.2 Cassandra数据库CRUD操作语法

# CRUD操作语法

# INSERT语法
INSERT INTO table_name (column_names)
VALUES (values)
[USING TTL seconds]
[USING TIMESTAMP timestamp]
[IF NOT EXISTS];

# SELECT语法
SELECT [column_names]
FROM table_name
[WHERE where_clause]
[ORDER BY column_name]
[LIMIT n]
[ALLOW FILTERING];

# UPDATE语法
UPDATE table_name
SET column_name = value [, …]
[USING TTL seconds]
[USING TIMESTAMP timestamp]
WHERE where_clause
[IF EXISTS | IF condition];

# DELETE语法
DELETE [column_names]
FROM table_name
[USING TIMESTAMP timestamp]
WHERE where_clause
[IF EXISTS | IF condition];

# 示例
# 插入
INSERT INTO fgedu_users (user_id, user_name, email)
VALUES (uuid(), ‘zhangsan’, ‘zhangsan@fgedu.net.cn’);

# 查询
SELECT * FROM fgedu_users WHERE user_id = ?;

# 更新
UPDATE fgedu_users SET email = ‘new@fgedu.net.cn’ WHERE user_id = ?;

# 删除
DELETE FROM fgedu_users WHERE user_id = ?;

1.2 Cassandra数据库批量操作概念

Cassandra数据库批量操作(BATCH)允许将多个DML语句组合成一个原子操作,保证操作的原子性和隔离性。

1.2.1 Cassandra数据库BATCH操作特点

# BATCH操作特点

# 1. 原子性
– 所有操作要么全部成功,要么全部失败
– 保证数据一致性

# 2. 隔离性
– 批量操作对其他操作不可见
– 直到批量操作完成

# 3. 性能优化
– 减少网络往返
– 提高写入效率

# 4. 限制
– 单个BATCH不能超过65535个语句
– 建议单个BATCH不超过100个语句
– 不建议跨分区BATCH

# BATCH语法
BEGIN [LOGGED | UNLOGGED | COUNTER] BATCH
[USING TIMESTAMP timestamp]
statement1;
statement2;

APPLY BATCH;

# BATCH类型
1. LOGGED BATCH(默认)
– 保证原子性
– 有额外开销
– 适合跨分区操作

2. UNLOGGED BATCH
– 不保证原子性
– 性能更好
– 适合单分区操作

3. COUNTER BATCH
– 专门用于Counter表
– 保证Counter操作原子性

1.2.2 Cassandra数据库BATCH使用场景

# BATCH使用场景

# 适合使用BATCH的场景
1. 同一分区的多个操作
2. 需要保证原子性的操作
3. 多表关联数据写入
4. 索引表数据同步

# 不适合使用BATCH的场景
1. 大量数据导入
2. 跨分区的大批量操作
3. 不需要原子性的操作

# BATCH示例
# 场景1: 用户注册(多表写入)
BEGIN BATCH
INSERT INTO fgedu_users (user_id, user_name, email)
VALUES (?, ‘zhangsan’, ‘zhangsan@fgedu.net.cn’);
INSERT INTO fgedu_users_by_email (email, user_id)
VALUES (‘zhangsan@fgedu.net.cn’, ?);
APPLY BATCH;

# 场景2: 订单创建(多行写入)
BEGIN BATCH
INSERT INTO fgedu_orders (order_id, user_id, amount)
VALUES (?, ?, 1000.00);
INSERT INTO fgedu_order_items (order_id, item_id, quantity)
VALUES (?, ?, 2);
APPLY BATCH;

# 场景3: Counter更新
BEGIN COUNTER BATCH
UPDATE fgedu_user_counters SET login_count = login_count + 1 WHERE user_id = ?;
UPDATE fgedu_user_counters SET total_login_time = total_login_time + 3600 WHERE user_id = ?;
APPLY BATCH;

1.3 Cassandra数据库事务特性

Cassandra数据库的事务特性与传统关系型数据库不同,需要理解其独特的机制。

1.3.1 Cassandra数据库事务特性说明

# Cassandra事务特性

# 1. 原子性(Atomicity)
– 单分区操作保证原子性
– BATCH操作保证原子性
– 跨分区操作不保证原子性

# 2. 一致性(Consistency)
– 可调一致性级别
– 最终一致性模型
– 支持轻量级事务(LWT)

# 3. 隔离性(Isolation)
– 单行操作隔离
– BATCH操作隔离
– 无传统事务隔离级别

# 4. 持久性(Durability)
– 提交日志保证持久性
– 可配置持久化策略
– 支持批量提交

# 轻量级事务(LWT)
# 使用Paxos协议实现
# 支持条件操作
# 性能开销较大

# LWT语法
INSERT INTO table_name (columns)
VALUES (values)
IF NOT EXISTS;

UPDATE table_name
SET column = value
WHERE condition
IF condition;

# LWT示例
# 条件插入
INSERT INTO fgedu_users (user_id, user_name)
VALUES (?, ‘zhangsan’)
IF NOT EXISTS;

# 条件更新
UPDATE fgedu_users
SET email = ‘new@fgedu.net.cn’
WHERE user_id = ?
IF email = ‘old@fgedu.net.cn’;

1.3.2 Cassandra数据库与关系型数据库事务对比

# 事务特性对比

# 关系型数据库(RDBMS)
– ACID事务
– 支持事务回滚
– 支持多表事务
– 支持隔离级别
– 支持锁机制

# Cassandra
– 单分区原子性
– 无事务回滚
– BATCH保证原子性
– 无隔离级别
– 无锁机制
– 最终一致性

# 选择建议
# 需要强一致性事务的场景
→ 选择关系型数据库

# 需要高可用性和可扩展性的场景
→ 选择Cassandra

# 混合场景
→ 根据业务特点选择合适的数据库
→ 或使用多数据库架构

Part02-生产环境规划与建议

2.1 Cassandra数据库插入操作策略

Cassandra数据库插入操作策略详解:

2.1.1 Cassandra数据库插入操作设计原则

# 插入操作设计原则

# 1. 批量插入优化
– 使用BATCH减少网络往返
– 控制BATCH大小
– 同一分区优先

# 2. TTL使用策略
– 根据数据生命周期设置TTL
– 避免TTL过期导致墓碑
– 监控TTL过期情况

# 3. 时间戳管理
– 使用客户端时间戳
– 避免时钟不同步
– 处理时间戳冲突

# 4. 一致性级别选择
– 写入一致性级别影响性能
– 根据业务需求选择
– 平衡一致性和性能

# 插入操作最佳实践
1. 使用Prepared语句
2. 合理设置TTL
3. 使用BATCH优化
4. 选择合适的一致性级别
5. 处理插入失败

2.1.2 Cassandra数据库插入性能优化

# 插入性能优化

# 1. 使用Prepared语句
# 好的做法
PreparedStatement prepared = session.prepare(
“INSERT INTO fgedu_users (user_id, user_name, email) VALUES (?, ?, ?)”
);

# 不好的做法
String query = String.format(
“INSERT INTO fgedu_users (user_id, user_name, email) VALUES (%s, ‘%s’, ‘%s’)”,
userId, userName, email
);

# 2. 批量插入
# 使用UNLOGGED BATCH提高性能
BEGIN UNLOGGED BATCH
INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?);
INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?);
INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?);
APPLY BATCH;

# 3. 异步插入
# 使用异步API提高吞吐量
ResultSetFuture future = session.executeAsync(
“INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?)”,
userId, userName
);

# 4. 调整一致性级别
# 降低一致性级别提高性能
CONSISTENCY ANY; — 最快,但可能丢失数据
CONSISTENCY ONE; — 平衡性能和可靠性
CONSISTENCY QUORUM; — 较高可靠性
CONSISTENCY ALL; — 最高可靠性,性能最差

2.2 Cassandra数据库更新操作策略

Cassandra数据库更新操作策略详解:

2.2.1 Cassandra数据库更新操作设计原则

# 更新操作设计原则

# 1. 理解Upsert语义
– INSERT和UPDATE语义相同
– 不存在的行会自动创建
– 需要处理空值问题

# 2. 条件更新
– 使用IF EXISTS条件
– 使用轻量级事务
– 处理更新失败

# 3. 集合操作
– 使用+/-操作符
– 避免读取-修改-写入模式
– 使用原子操作

# 4. Counter操作
– 使用专门的Counter表
– 使用COUNTER BATCH
– 注意Counter限制

# 更新操作最佳实践
1. 使用条件更新保证正确性
2. 使用原子集合操作
3. 合理使用轻量级事务
4. 处理并发更新
5. 监控更新性能

2.2.2 Cassandra数据库更新性能优化

# 更新性能优化

# 1. 避免读取-修改-写入
# 不好的做法
row = SELECT * FROM fgedu_users WHERE user_id = ?;
newTags = row.tags + {‘new_tag’};
UPDATE fgedu_users SET tags = newTags WHERE user_id = ?;

# 好的做法
UPDATE fgedu_users SET tags = tags + {‘new_tag’} WHERE user_id = ?;

# 2. 使用条件更新
# 避免不必要的更新
UPDATE fgedu_users
SET email = ‘new@fgedu.net.cn’
WHERE user_id = ?
IF email != ‘new@fgedu.net.cn’;

# 3. 批量更新
BEGIN BATCH
UPDATE fgedu_users SET status = ‘ACTIVE’ WHERE user_id = ?;
UPDATE fgedu_user_status SET last_update = toTimestamp(now()) WHERE user_id = ?;
APPLY BATCH;

# 4. 使用TTL
# 设置字段过期时间
UPDATE fgedu_users
SET temp_token = ‘token123’
USING TTL 3600
WHERE user_id = ?;

# 5. 使用时间戳
# 控制更新顺序
UPDATE fgedu_users
USING TIMESTAMP 1234567890
SET email = ‘new@fgedu.net.cn’
WHERE user_id = ?;

2.3 Cassandra数据库批量操作策略

Cassandra数据库批量操作策略详解:

2.3.1 Cassandra数据库批量操作设计原则

# 批量操作设计原则

# 1. BATCH大小控制
– 单个BATCH不超过100条语句
– 单个BATCH不超过50KB
– 超大批量分批执行

# 2. BATCH类型选择
– 单分区操作: UNLOGGED BATCH
– 跨分区操作: LOGGED BATCH
– Counter操作: COUNTER BATCH

# 3. BATCH使用场景
– 多表关联写入
– 索引表同步
– 原子性要求高的操作

# 4. BATCH性能考虑
– LOGGED BATCH有额外开销
– UNLOGGED BATCH性能更好
– 避免跨分区BATCH

# 批量操作最佳实践
1. 控制BATCH大小
2. 选择合适的BATCH类型
3. 同一分区优先
4. 处理BATCH失败
5. 监控BATCH性能

2.3.2 Cassandra数据库批量操作性能优化

# 批量操作性能优化

# 1. 使用UNLOGGED BATCH
# 单分区操作
BEGIN UNLOGGED BATCH
INSERT INTO fgedu_logs (log_id, log_time, message) VALUES (?, ?, ?);
INSERT INTO fgedu_logs (log_id, log_time, message) VALUES (?, ?, ?);
INSERT INTO fgedu_logs (log_id, log_time, message) VALUES (?, ?, ?);
APPLY BATCH;

# 2. 分批处理大数据量
#!/bin/bash
# batch_insert.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

BATCH_SIZE=100
TOTAL=10000

for ((i=0; i futures = new ArrayList<>();
for (Statement statement : statements) {
futures.add(session.executeAsync(statement));
}
for (ResultSetFuture future : futures) {
future.get(); // 等待完成
}

# 4. 使用DSBulk工具
# 大数据量导入使用DSBulk
# dsbulk load -k fgedudb -t fgedu_users -url /path/to/data.csv

风哥提示:批量操作是提高Cassandra写入性能的重要手段。合理使用BATCH可以减少网络往返,提高吞吐量。但需要注意BATCH大小控制,避免过大BATCH导致性能下降。学习交流加群风哥微信: itpux-com

Part03-生产环境项目实施方案

3.1 Cassandra数据库插入操作实战

Cassandra数据库插入操作实战:

3.1.1 Cassandra数据库基本插入操作

# 连接Cassandra
# cqlsh 192.168.1.101 9042 -u fgedu -p Fgedu@2024
Connected to fgedu_cluster at 192.168.1.101:9042

# 切换到Keyspace
cqlsh> USE fgedudb;
cqlsh:fgedudb>

# 基本插入
cqlsh:fgedudb> INSERT INTO fgedu_users
… (user_id, user_name, email, phone, age, created_at)
… VALUES (
… uuid(),
… ‘zhangsan’,
… ‘zhangsan@fgedu.net.cn’,
… ‘13800138000’,
… 25,
… toTimestamp(now())
… );

# 查询验证
cqlsh:fgedudb> SELECT * FROM fgedu_users LIMIT 1;

user_id | age | created_at | email | phone | user_name
————————————–+—–+———————————-+———————-+————-+———–
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | 25 | 2024-01-15 10:30:00.123000+0000 | zhangsan@fgedu.net.cn | 13800138000 | zhangsan

(1 row)

# 使用TTL插入
cqlsh:fgedudb> INSERT INTO fgedu_temp_tokens
… (token_id, user_id, token, created_at)
… VALUES (uuid(), uuid(), ‘temp_token_123’, toTimestamp(now()))
… USING TTL 3600;

# 条件插入
cqlsh:fgedudb> INSERT INTO fgedu_users
… (user_id, user_name, email)
… VALUES (uuid(), ‘lisi’, ‘lisi@fgedu.net.cn’)
… IF NOT EXISTS;

[applied]
———–
True

(1 rows)

# 重复插入(已存在)
cqlsh:fgedudb> INSERT INTO fgedu_users
… (user_id, user_name, email)
… VALUES (8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b, ‘wangwu’, ‘wangwu@fgedu.net.cn’)
… IF NOT EXISTS;

[applied]
———–
False

(1 rows)

# 使用JSON插入
cqlsh:fgedudb> INSERT INTO fgedu_users JSON ‘{
… “user_id”: “uuid()”,
… “user_name”: “wangwu”,
… “email”: “wangwu@fgedu.net.cn”,
… “age”: 30
… }’;

3.1.2 Cassandra数据库批量插入操作

# 批量插入(同一分区)
cqlsh:fgedudb> BEGIN UNLOGGED BATCH
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Log message 1’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Log message 2’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Log message 3’);
… APPLY BATCH;

# 批量插入(多表关联)
cqlsh:fgedudb> BEGIN BATCH
… INSERT INTO fgedu_users (user_id, user_name, email)
… VALUES (uuid(), ‘user001’, ‘user001@fgedu.net.cn’);
… INSERT INTO fgedu_users_by_email (email, user_id)
… VALUES (‘user001@fgedu.net.cn’, uuid());
… INSERT INTO fgedu_user_profiles (user_id, nickname, avatar)
… VALUES (uuid(), ‘用户001’, ‘avatar001.jpg’);
… APPLY BATCH;

# 批量插入(使用变量)
cqlsh:fgedudb> PREPARE INSERT INTO fgedu_users (user_id, user_name, email) VALUES (?, ?, ?);
cqlsh:fgedudb> EXECUTE (?, ?, ?) USING CONSISTENCY QUORUM;

# 大批量插入脚本
#!/bin/bash
# bulk_insert.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

KEYSPACE=”fgedudb”
TABLE=”fgedu_users”
BATCH_SIZE=50
TOTAL_RECORDS=1000

echo “开始批量插入 $TOTAL_RECORDS 条记录…”

for ((i=1; i<=TOTAL_RECORDS; i+=BATCH_SIZE)); do echo "BEGIN UNLOGGED BATCH" for ((j=0; j

3.2 Cassandra数据库更新操作实战

Cassandra数据库更新操作实战:

3.2.1 Cassandra数据库基本更新操作

# 基本更新
cqlsh:fgedudb> UPDATE fgedu_users
… SET email = ‘newemail@fgedu.net.cn’,
… phone = ‘13900139000’,
… updated_at = toTimestamp(now())
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 查询验证
cqlsh:fgedudb> SELECT user_id, email, phone, updated_at FROM fgedu_users
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

user_id | email | phone | updated_at
————————————–+———————-+————-+———————————-
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | newemail@fgedu.net.cn | 13900139000 | 2024-01-15 11:00:00.123000+0000

(1 row)

# 条件更新
cqlsh:fgedudb> UPDATE fgedu_users
… SET email = ‘updated@fgedu.net.cn’
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b
… IF EXISTS;

[applied]
———–
True

(1 rows)

# 带条件的轻量级事务
cqlsh:fgedudb> UPDATE fgedu_users
… SET email = ‘final@fgedu.net.cn’
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b
… IF email = ‘updated@fgedu.net.cn’;

[applied] | email
———–+———————-
True | updated@fgedu.net.cn

(1 rows)

# 使用TTL更新
cqlsh:fgedudb> UPDATE fgedu_users
… SET temp_field = ‘temporary_value’
… USING TTL 3600
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 使用时间戳更新
cqlsh:fgedudb> UPDATE fgedu_users
… USING TIMESTAMP 1705312800000
… SET email = ‘timestamped@fgedu.net.cn’
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

3.2.2 Cassandra数据库集合更新操作

# 更新SET集合
# 添加元素
cqlsh:fgedudb> UPDATE fgedu_users
… SET tags = tags + {‘new_tag1’, ‘new_tag2’}
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 删除元素
cqlsh:fgedudb> UPDATE fgedu_users
… SET tags = tags – {‘old_tag’}
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 替换整个集合
cqlsh:fgedudb> UPDATE fgedu_users
… SET tags = {‘tag1’, ‘tag2’, ‘tag3’}
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 更新LIST列表
# 添加元素到末尾
cqlsh:fgedudb> UPDATE fgedu_users
… SET items = items + [‘item1’, ‘item2’]
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 添加元素到开头
cqlsh:fgedudb> UPDATE fgedu_users
… SET items = [‘item0’] + items
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 删除元素
cqlsh:fgedudb> UPDATE fgedu_users
… SET items = items – [‘item1’]
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 按索引更新
cqlsh:fgedudb> UPDATE fgedu_users
… SET items[0] = ‘updated_item’
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 更新MAP映射
# 添加键值对
cqlsh:fgedudb> UPDATE fgedu_users
… SET attributes = attributes + {‘key1’: ‘value1’, ‘key2’: ‘value2’}
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 删除键值对
cqlsh:fgedudb> DELETE attributes[‘key1’] FROM fgedu_users
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 更新Counter
cqlsh:fgedudb> UPDATE fgedu_user_counters
… SET login_count = login_count + 1,
… page_views = page_views + 10
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

# 查询Counter
cqlsh:fgedudb> SELECT * FROM fgedu_user_counters
… WHERE user_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

user_id | login_count | page_views
————————————–+————-+————
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | 15 | 150

(1 row)

3.3 Cassandra数据库批量操作实战

Cassandra数据库批量操作实战:

3.3.1 Cassandra数据库LOGGED BATCH操作

# LOGGED BATCH(保证原子性)
cqlsh:fgedudb> BEGIN LOGGED BATCH
… INSERT INTO fgedu_users (user_id, user_name, email)
… VALUES (uuid(), ‘batch_user1’, ‘batch1@fgedu.net.cn’);
… INSERT INTO fgedu_users_by_email (email, user_id)
… VALUES (‘batch1@fgedu.net.cn’, uuid());
… UPDATE fgedu_user_counters
… SET total_users = total_users + 1
… WHERE counter_id = ‘global’;
… APPLY BATCH;

# 查询验证
cqlsh:fgedudb> SELECT * FROM fgedu_users WHERE user_name = ‘batch_user1’;

# 订单创建BATCH
cqlsh:fgedudb> BEGIN BATCH
… INSERT INTO fgedu_orders (order_id, user_id, order_time, amount, status)
… VALUES (uuid(), uuid(), toTimestamp(now()), 9999.00, ‘CREATED’);
… INSERT INTO fgedu_order_items (order_id, item_id, product_name, quantity, price)
… VALUES (uuid(), uuid(), ‘iPhone 15 Pro’, 1, 9999.00);
… UPDATE fgedu_user_counters
… SET order_count = order_count + 1
… WHERE user_id = ?;
… APPLY BATCH;

# 用户状态更新BATCH
cqlsh:fgedudb> BEGIN BATCH
… UPDATE fgedu_users
… SET status = ‘ACTIVE’, updated_at = toTimestamp(now())
… WHERE user_id = ?;
… INSERT INTO fgedu_user_activities (user_id, activity_time, activity_type)
… VALUES (?, toTimestamp(now()), ‘ACCOUNT_ACTIVATED’);
… APPLY BATCH;

3.3.2 Cassandra数据库UNLOGGED BATCH操作

# UNLOGGED BATCH(性能优先)
cqlsh:fgedudb> BEGIN UNLOGGED BATCH
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Batch log 1’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Batch log 2’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Batch log 3’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Batch log 4’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘Batch log 5’);
… APPLY BATCH;

# 同一分区批量操作
cqlsh:fgedudb> BEGIN UNLOGGED BATCH
… INSERT INTO fgedu_user_messages (user_id, message_time, sender_id, content)
… VALUES (?, ‘2024-01-15 10:00:00’, ?, ‘Message 1’);
… INSERT INTO fgedu_user_messages (user_id, message_time, sender_id, content)
… VALUES (?, ‘2024-01-15 10:01:00’, ?, ‘Message 2’);
… INSERT INTO fgedu_user_messages (user_id, message_time, sender_id, content)
… VALUES (?, ‘2024-01-15 10:02:00’, ?, ‘Message 3’);
… APPLY BATCH;

# COUNTER BATCH
cqlsh:fgedudb> BEGIN COUNTER BATCH
… UPDATE fgedu_user_counters
… SET login_count = login_count + 1
… WHERE user_id = ?;
… UPDATE fgedu_user_counters
… SET session_count = session_count + 1
… WHERE user_id = ?;
… UPDATE fgedu_global_counters
… SET total_logins = total_logins + 1
… WHERE counter_id = ‘global’;
… APPLY BATCH;

操作建议:批量操作时选择合适的BATCH类型。LOGGED BATCH保证原子性但有性能开销,UNLOGGED BATCH性能更好但不保证原子性。根据业务需求选择合适的类型。更多学习教程公众号风哥教程itpux_com

Part04-生产案例与实战讲解

4.1 Cassandra数据库订单管理实战

Cassandra数据库订单管理实战案例:

4.1.1 Cassandra数据库订单创建实战

# 订单创建完整流程

# 步骤1: 创建订单(使用BATCH)
cqlsh:fgedudb> BEGIN BATCH
… — 插入订单主表
… INSERT INTO fgedu_orders (
… order_id, user_id, order_time,
… total_amount, status, created_at
… ) VALUES (
… 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b,
… 9f7g8h9i-0j1k-2l3m-4n5o-6p7q8r9s0t1u,
… toTimestamp(now()),
… 15998.00,
… ‘CREATED’,
… toTimestamp(now())
… );
… — 插入订单明细
… INSERT INTO fgedu_order_items (
… order_id, item_id, product_id,
… product_name, quantity, unit_price, total_price
… ) VALUES (
… 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b,
… uuid(),
… uuid(),
… ‘iPhone 15 Pro Max’,
… 1,
… 9999.00,
… 9999.00
… );
… INSERT INTO fgedu_order_items (
… order_id, item_id, product_id,
… product_name, quantity, unit_price, total_price
… ) VALUES (
… 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b,
… uuid(),
… uuid(),
… ‘AirPods Pro’,
… 1,
… 1999.00,
… 1999.00
… );
… — 更新用户订单计数
… UPDATE fgedu_user_counters
… SET order_count = order_count + 1
… WHERE user_id = 9f7g8h9i-0j1k-2l3m-4n5o-6p7q8r9s0t1u;
… APPLY BATCH;

# 步骤2: 查询订单
cqlsh:fgedudb> SELECT * FROM fgedu_orders
… WHERE order_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

order_id | created_at | order_time | status | total_amount | user_id
————————————–+———————————-+———————————-+———+————–+————————————–
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | 2024-01-15 10:30:00.123000+0000 | 2024-01-15 10:30:00.123000+0000 | CREATED | 15998.00 | 9f7g8h9i-0j1k-2l3m-4n5o-6p7q8r9s0t1u

(1 row)

# 步骤3: 查询订单明细
cqlsh:fgedudb> SELECT * FROM fgedu_order_items
… WHERE order_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b;

order_id | item_id | product_id | product_name | quantity | total_price | unit_price
————————————–+————————————–+————————————–+——————+———-+————-+————
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | a1b2c3d4-e5f6-7890-abcd-ef1234567890 | b2c3d4e5-f6a7-8901-bcde-f12345678901 | iPhone 15 Pro Max | 1 | 9999.00 | 9999.00
8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b | c3d4e5f6-a7b8-9012-cdef-123456789012 | d4e5f6a7-b8c9-0123-def1-234567890123 | AirPods Pro | 1 | 1999.00 | 1999.00

(2 rows)

4.1.2 Cassandra数据库订单状态更新实战

# 订单状态更新流程

# 更新订单状态(使用轻量级事务)
cqlsh:fgedudb> UPDATE fgedu_orders
… SET status = ‘PAID’,
… paid_at = toTimestamp(now()),
… updated_at = toTimestamp(now())
… WHERE order_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b
… IF status = ‘CREATED’;

[applied] | created_at | order_time | paid_at | status | total_amount | updated_at
———–+———————————-+———————————-+———+———+————–+———————————-
True | 2024-01-15 10:30:00.123000+0000 | 2024-01-15 10:30:00.123000+0000 | null | CREATED | 15998.00 | 2024-01-15 10:30:00.123000+0000

(1 rows)

# 记录状态变更日志
cqlsh:fgedudb> INSERT INTO fgedu_order_status_log
… (order_id, status_time, old_status, new_status, operator)
… VALUES (
… 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b,
… toTimestamp(now()),
… ‘CREATED’,
… ‘PAID’,
… ‘system’
… );

# 订单发货
cqlsh:fgedudb> BEGIN BATCH
… UPDATE fgedu_orders
… SET status = ‘SHIPPED’,
… shipped_at = toTimestamp(now()),
… tracking_number = ‘SF1234567890’
… WHERE order_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b
… IF status = ‘PAID’;
… INSERT INTO fgedu_order_status_log
… (order_id, status_time, old_status, new_status, operator)
… VALUES (
… 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b,
… toTimestamp(now()),
… ‘PAID’,
… ‘SHIPPED’,
… ‘warehouse’
… );
… APPLY BATCH;

# 订单完成
cqlsh:fgedudb> BEGIN BATCH
… UPDATE fgedu_orders
… SET status = ‘COMPLETED’,
… completed_at = toTimestamp(now())
… WHERE order_id = 8e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b
… IF status = ‘SHIPPED’;
… UPDATE fgedu_user_counters
… SET completed_orders = completed_orders + 1
… WHERE user_id = ?;
… APPLY BATCH;

4.2 Cassandra数据库用户管理实战

Cassandra数据库用户管理实战案例:

4.2.1 Cassandra数据库用户注册实战

# 用户注册完整流程

# 步骤1: 检查邮箱是否已存在
cqlsh:fgedudb> SELECT user_id FROM fgedu_users_by_email
… WHERE email = ‘newuser@fgedu.net.cn’;

user_id
———
0

(0 rows)

# 步骤2: 创建用户(使用BATCH)
cqlsh:fgedudb> BEGIN BATCH
… — 插入用户主表
… INSERT INTO fgedu_users (
… user_id, user_name, email, password_hash,
… nickname, status, created_at, updated_at
… ) VALUES (
… uuid(),
… ‘newuser’,
… ‘newuser@fgedu.net.cn’,
… ‘hashed_password_123’,
… ‘新用户’,
… ‘PENDING_ACTIVATION’,
… toTimestamp(now()),
… toTimestamp(now())
… );
… — 插入邮箱索引表
… INSERT INTO fgedu_users_by_email (email, user_id)
… VALUES (‘newuser@fgedu.net.cn’, uuid());
… — 插入用户名索引表
… INSERT INTO fgedu_users_by_name (user_name, user_id)
… VALUES (‘newuser’, uuid());
… — 初始化用户计数器
… INSERT INTO fgedu_user_counters (user_id, login_count, order_count)
… VALUES (uuid(), 0, 0);
… APPLY BATCH;

# 步骤3: 创建激活令牌
cqlsh:fgedudb> INSERT INTO fgedu_activation_tokens
… (token, user_id, email, created_at)
… VALUES (
… ‘activation_token_abc123’,
… uuid(),
… ‘newuser@fgedu.net.cn’,
… toTimestamp(now())
… )
… USING TTL 86400; — 24小时后过期

# 步骤4: 发送激活邮件(应用层处理)
# 发送邮件到 newuser@fgedu.net.cn

4.2.2 Cassandra数据库用户信息更新实战

# 用户信息更新流程

# 更新用户昵称
cqlsh:fgedudb> UPDATE fgedu_users
… SET nickname = ‘新昵称’,
… updated_at = toTimestamp(now())
… WHERE user_id = ?;

# 更新用户头像
cqlsh:fgedudb> UPDATE fgedu_users
… SET avatar = ‘https://cdn.fgedu.net.cn/avatar/new_avatar.jpg’,
… updated_at = toTimestamp(now())
… WHERE user_id = ?;

# 添加用户标签
cqlsh:fgedudb> UPDATE fgedu_users
… SET tags = tags + {‘VIP’, ‘活跃用户’}
… WHERE user_id = ?;

# 更新用户地址
cqlsh:fgedudb> UPDATE fgedu_users
… SET address = {
… province: ‘北京’,
… city: ‘北京市’,
… street: ‘朝阳区xxx街道’,
… zip_code: ‘100000’
… }
… WHERE user_id = ?;

# 记录用户活动
cqlsh:fgedudb> INSERT INTO fgedu_user_activities
… (user_id, activity_time, activity_type, activity_data)
… VALUES (
… ?,
… toTimestamp(now()),
… ‘PROFILE_UPDATED’,
… ‘{“fields”: [“nickname”, “avatar”]}’
… );

# 批量更新用户状态
cqlsh:fgedudb> BEGIN BATCH
… UPDATE fgedu_users
… SET status = ‘ACTIVE’,
… activated_at = toTimestamp(now())
… WHERE user_id = ?;
… DELETE FROM fgedu_activation_tokens
… WHERE token = ‘activation_token_abc123’;
… UPDATE fgedu_global_counters
… SET active_users = active_users + 1
… WHERE counter_id = ‘global’;
… APPLY BATCH;

4.3 Cassandra数据库日志管理实战

Cassandra数据库日志管理实战案例:

4.3.1 Cassandra数据库日志写入实战

# 日志写入流程

# 单条日志写入
cqlsh:fgedudb> INSERT INTO fgedu_logs
… (log_id, log_time, log_level, logger, message, stack_trace)
… VALUES (
… uuid(),
… toTimestamp(now()),
… ‘ERROR’,
… ‘com.fgedu.service.UserService’,
… ‘用户登录失败: 用户名或密码错误’,
… ‘java.lang.Exception: Login failed\n\tat com.fgedu.service.UserService.login(UserService.java:123)’
… );

# 批量日志写入
cqlsh:fgedudb> BEGIN UNLOGGED BATCH
… INSERT INTO fgedu_logs (log_id, log_time, log_level, logger, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘com.fgedu.App’, ‘应用启动’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, logger, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘com.fgedu.App’, ‘数据库连接成功’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, logger, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘com.fgedu.App’, ‘缓存初始化完成’);
… INSERT INTO fgedu_logs (log_id, log_time, log_level, logger, message)
… VALUES (uuid(), toTimestamp(now()), ‘INFO’, ‘com.fgedu.App’, ‘应用启动完成’);
… APPLY BATCH;

# 带TTL的日志写入(自动过期)
cqlsh:fgedudb> INSERT INTO fgedu_logs
… (log_id, log_time, log_level, logger, message)
… VALUES (uuid(), toTimestamp(now()), ‘DEBUG’, ‘com.fgedu.App’, ‘调试日志’)
… USING TTL 604800; — 7天后自动删除

# 日志写入脚本
#!/bin/bash
# log_writer.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

LOG_FILE=”/var/log/fgedu/app.log”
CQLSH=”cqlsh 192.168.1.101 9042 -u fgedu -p Fgedu@2024″

tail -f $LOG_FILE | while read line; do
# 解析日志级别
LOG_LEVEL=$(echo $line | grep -oP ‘(?<=\[)(ERROR|WARN|INFO|DEBUG)(?=\])' | head -1) [ -z "$LOG_LEVEL" ] && LOG_LEVEL="INFO" # 写入Cassandra $CQLSH -e "INSERT INTO fgedudb.fgedu_logs (log_id, log_time, log_level, message) VALUES (uuid(), toTimestamp(now()), '$LOG_LEVEL', '$(echo $line | sed "s/'/''/g")')" done

4.3.2 Cassandra数据库日志查询实战

# 日志查询流程

# 查询最近日志
cqlsh:fgedudb> SELECT * FROM fgedu_logs
… LIMIT 10;

# 按时间范围查询
cqlsh:fgedudb> SELECT * FROM fgedu_logs
… WHERE log_date = ‘2024-01-15’
… AND log_time >= ‘2024-01-15 10:00:00’
… AND log_time < '2024-01-15 11:00:00'; # 查询错误日志 cqlsh:fgedudb> SELECT * FROM fgedu_logs
… WHERE log_date = ‘2024-01-15’
… AND log_level = ‘ERROR’;

# 统计日志数量
cqlsh:fgedudb> SELECT log_level, COUNT(*)
… FROM fgedu_logs
… WHERE log_date = ‘2024-01-15’
… GROUP BY log_level;

log_level | count
———–+——-
ERROR | 15
INFO | 1234
DEBUG | 5678
WARN | 23

(4 rows)

# 清理过期日志
cqlsh:fgedudb> DELETE FROM fgedu_logs
… WHERE log_date < '2024-01-01';

风哥提示:日志管理是系统运维的重要组成部分。使用TTL自动清理过期日志,避免数据无限增长。对于重要日志,建议同时备份到其他存储系统。from Cassandra视频:www.itpux.com

Part05-风哥经验总结与分享

5.1 Cassandra数据库CRUD最佳实践

Cassandra数据库CRUD最佳实践总结:

  • 插入操作:使用Prepared语句,合理设置TTL,使用BATCH优化
  • 查询操作:必须指定分区键,避免ALLOW FILTERING,使用分页查询
  • 更新操作:使用条件更新,使用原子集合操作,合理使用轻量级事务
  • 删除操作:注意墓碑问题,使用TTL自动清理,定期执行compaction
  • 一致性级别:根据业务需求选择合适的一致性级别
  • 性能优化:使用异步操作,批量处理,合理设计数据模型

5.2 Cassandra数据库批量操作最佳实践

# 批量操作最佳实践

# 1. BATCH大小控制
– 单个BATCH不超过100条语句
– 单个BATCH不超过50KB
– 超大批量分批执行

# 2. BATCH类型选择
– 单分区操作: UNLOGGED BATCH
– 跨分区操作: LOGGED BATCH
– Counter操作: COUNTER BATCH

# 3. BATCH使用场景
– 多表关联写入
– 索引表同步
– 原子性要求高的操作

# 4. BATCH注意事项
– 避免跨分区的大批量操作
– 监控BATCH性能
– 处理BATCH失败
– 使用异步BATCH

# 5. 大数据量导入
– 使用DSBulk工具
– 分批导入
– 监控导入进度
– 验证导入结果

5.3 Cassandra数据库性能优化技巧

Cassandra数据库性能优化技巧:

5.3.1 Cassandra数据库写入优化

# 写入优化技巧

# 1. 使用Prepared语句
PreparedStatement prepared = session.prepare(
“INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?)”
);

# 2. 批量写入
BEGIN UNLOGGED BATCH
INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?);
INSERT INTO fgedu_users (user_id, user_name) VALUES (?, ?);
APPLY BATCH;

# 3. 异步写入
ResultSetFuture future = session.executeAsync(statement);

# 4. 调整一致性级别
CONSISTENCY ANY; — 最快
CONSISTENCY ONE; — 平衡

# 5. 使用Token感知
# 将数据写入正确的节点
TokenAwarePolicy policy = new TokenAwarePolicy(
new DCAwareRoundRobinPolicy()
);

# 6. 调整批处理大小
# 在cassandra.yaml中配置
batch_size_warn_threshold_in_kb: 64
batch_size_fail_threshold_in_kb: 128

5.3.2 Cassandra数据库读取优化

# 读取优化技巧

# 1. 指定分区键
SELECT * FROM fgedu_users WHERE user_id = ?;

# 2. 使用分页
SELECT * FROM fgedu_users WHERE user_id = ? LIMIT 100;

# 3. 使用索引
CREATE INDEX idx_email ON fgedu_users (email);

# 4. 避免ALLOW FILTERING
# 重新设计表结构

# 5. 使用缓存
# 调整缓存配置
key_cache_size_in_mb: 100
row_cache_size_in_mb: 0

# 6. 异步读取
ResultSetFuture future = session.executeAsync(statement);

# 7. 使用并发读取
# 同时读取多个分区
List futures = new ArrayList<>();
for (UUID userId : userIds) {
futures.add(session.executeAsync(
“SELECT * FROM fgedu_users WHERE user_id = ?”, userId
));
}

优化建议:性能优化需要结合具体业务场景。写入密集型场景重点优化写入,读取密集型场景重点优化查询。定期监控性能指标,及时发现和解决问题。

本文档详细介绍了Cassandra数据库增删改查与批量操作实战,包括CRUD操作概述、批量操作概念、事务特性、插入操作策略、更新操作策略、批量操作策略、插入操作实战、更新操作实战、批量操作实战、订单管理实战、用户管理实战、日志管理实战、CRUD最佳实践、批量操作最佳实践、性能优化技巧等内容。通过学习本文档,读者可以掌握Cassandra数据库CRUD操作和批量操作技能,为应用开发打下坚实基础。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息