OceanBase教程FG064-OceanBase批量插入优化实战
本文档风哥主要介绍OceanBase数据库批量插入优化相关知识,包括OceanBase批量插入概念、OceanBase批量插入优势、OceanBase批量插入场景、OceanBase批量大小规划、OceanBase SQL批量插入、OceanBase LOAD DATA、OceanBase JDBC批量插入等内容,风哥教程参考OceanBase官方文档性能优化、批量操作等内容编写,适合DBA人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。
Part01-基础概念与理论知识
1.1 OceanBase批量插入概念
批量插入是将多条记录合并为一个操作批量提交到数据库,相比单条插入可以显著提升性能。更多视频教程www.fgedu.net.cn
- 多值INSERT:INSERT INTO … VALUES (…), (…), (…)
- INSERT SELECT:INSERT INTO … SELECT … FROM …
- LOAD DATA:从文件批量导入数据
- JDBC Batch:使用addBatch和executeBatch
- 并行插入:多线程并行批量插入
1.2 OceanBase批量插入优势
1. 减少网络往返
– 单条插入:N条数据 = N次网络往返
– 批量插入:N条数据 = 1次网络往返
– 性能提升:10-100倍
2. 减少事务开销
– 单条插入:N条数据 = N个事务
– 批量插入:N条数据 = 1个事务
– 减少日志写入
3. 优化内存使用
– 减少内存分配次数
– 提高缓存命中率
– 降低GC压力
4. 提升磁盘效率
– 顺序写入优化
– 减少随机IO
– 提高磁盘吞吐量
1.3 OceanBase批量插入场景
1. 数据迁移
– 从其他数据库迁移
– 历史数据导入
– 数据仓库加载
2. 数据同步
– 主从数据同步
– 跨系统数据同步
– 实时数据同步
3. ETL处理
– 数据抽取
– 数据转换
– 数据加载
4. 批量业务
– 批量订单导入
– 批量用户注册
– 批量数据更新
Part02-生产环境规划与建议
2.1 OceanBase批量大小规划
1. 影响因素
– 网络延迟
– 内存限制
– 事务大小
– 表结构复杂度
2. 推荐批量大小
┌─────────────────┬─────────────────────────────────────┐
│ 场景 │ 推荐批量大小 │
├─────────────────┼─────────────────────────────────────┤
│ 小表简单结构 │ 1000-5000条 │
│ 大表复杂结构 │ 500-2000条 │
│ 网络延迟高 │ 500-1000条 │
│ 内存受限 │ 100-500条 │
│ 超大文本字段 │ 50-200条 │
└─────────────────┴─────────────────────────────────────┘
3. 批量大小测试
– 从小到大逐步测试
– 监控内存使用
– 观察性能曲线
– 找到最优值
2.2 OceanBase事务规划
1. 事务大小
– 推荐:每事务1000-10000条
– 最大:不超过100000条
– 根据内存和性能调整
2. 事务提交策略
– 定时提交:每N秒提交
– 定量提交:每N条提交
– 混合策略:定时或定量
3. 错误处理
– 批量失败重试
– 单条失败记录
– 事务回滚处理
4. 并发控制
– 批量插入并发数
– 锁等待处理
– 死锁避免,风哥提示:。
2.3 OceanBase资源规划
1. 内存规划
– 批量缓冲区:1-2GB
– 会话内存:512MB-1GB
– 系统预留:20%
2. CPU规划
– 批量插入线程数:CPU核数的1-2倍
– 并行度:根据表分区数
– 后台任务:预留20% CPU
3. 磁盘规划
– 数据文件:SSD存储
– 日志文件:独立磁盘
– 临时文件:高速存储
4. 网络规划
– 带宽:1Gbps以上
– 延迟:< 10ms
- 稳定性:99.9%
Part03-生产环境项目实施方案
3.1 OceanBase SQL批量插入
3.1.1 多值INSERT
1. 基本语法
$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p -e ”
INSERT INTO fgedudb.fgedu_order (order_id, user_id, amount, create_time) VALUES
(1, 10001, 199.99, ‘2024-01-20 10:00:00’),
(2, 10002, 299.99, ‘2024-01-20 10:00:01’),
(3, 10003, 399.99, ‘2024-01-20 10:00:02’),
…
(1000, 11000, 99.99, ‘2024-01-20 10:00:59’);
”
Query OK, 1000 rows affected
Records: 1000 Duplicates: 0 Warnings: 0
Time: 0.5s
2. 性能对比
# 单条插入(1000条)
单条INSERT 1000次:50秒
# 批量插入(1000条)
批量INSERT 1次:0.5秒
性能提升:100倍
3. 自动生成批量SQL
#!/bin/bash
# generate_batch_insert.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
TABLE_NAME=”fgedudb.fgedu_order”
BATCH_SIZE=1000,学习交流加群风哥QQ113257174。
START_ID=1
END_ID=10000
echo “INSERT INTO $TABLE_NAME (order_id, user_id, amount, create_time) VALUES”
for ((i=START_ID; i<=END_ID; i++)); do USER_ID=$((10000 + i)) AMOUNT=$(echo "scale=2; $RANDOM/100" | bc) CREATE_TIME=$(date '+%Y-%m-%d %H:%M:%S') if [ $i -eq $END_ID ]; then echo "($i, $USER_ID, $AMOUNT, '$CREATE_TIME');" elif [ $((i % BATCH_SIZE)) -eq 0 ]; then echo "($i, $USER_ID, $AMOUNT, '$CREATE_TIME');" echo "INSERT INTO $TABLE_NAME (order_id, user_id, amount, create_time) VALUES" else echo "($i, $USER_ID, $AMOUNT, '$CREATE_TIME')," fi done
3.1.2 INSERT SELECT
$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p -e ”
— 从临时表导入数据
INSERT INTO fgedudb.fgedu_order
SELECT order_id, user_id, amount, create_time
FROM fgedudb.fgedu_order_temp
WHERE create_time >= ‘2024-01-01’;
Query OK, 100000 rows affected
Records: 100000 Duplicates: 0 Warnings: 0
Time: 5.2s
— 使用并行插入
INSERT /*+ PARALLEL(4) */ INTO fgedudb.fgedu_order
SELECT /*+ PARALLEL(4) */ order_id, user_id, amount, create_time
FROM fgedudb.fgedu_order_temp;更多视频教程www.fgedu.net.cn。
Query OK, 100000 rows affected
Time: 2.1s
”
3.2 OceanBase LOAD DATA
1. 准备数据文件
$ cat /data/orders.csv
1,10001,199.99,2024-01-20 10:00:00
2,10002,299.99,2024-01-20 10:00:01
3,10003,399.99,2024-01-20 10:00:02
…
100000,110000,99.99,2024-01-20 10:10:00
2. 执行LOAD DATA
$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p -e ”
LOAD DATA INFILE ‘/data/orders.csv’
INTO TABLE fgedudb.fgedu_order
FIELDS TERMINATED BY ‘,’
LINES TERMINATED BY ‘\n’
(order_id, user_id, amount, create_time);
”
Query OK, 100000 rows affected
Records: 100000 Deleted: 0 Skipped: 0 Warnings: 0
Time: 3.5s
3. 高级选项
$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p -e ”
LOAD DATA INFILE ‘/data/orders.csv’
INTO TABLE fgedudb.fgedu_order
FIELDS TERMINATED BY ‘,’
ENCLOSED BY ‘\”‘
ESCAPED BY ‘\\’
LINES TERMINATED BY ‘\n’,更多学习教程公众号风哥教程itpux_com。
IGNORE 1 LINES
(order_id, user_id, amount, @create_time)
SET create_time = STR_TO_DATE(@create_time, ‘%Y-%m-%d %H:%i:%s’);
”
4. 性能对比
– 单条INSERT:100000条 = 500秒
– 批量INSERT:100000条 = 50秒
– LOAD DATA:100000条 = 3.5秒
LOAD DATA性能最优
3.3 OceanBase JDBC批量插入
1. 基本批量插入
“`java
// Java代码
String url = “jdbc:oceanbase://192.168.1.101:2881/fgedudb”;
Connection conn = DriverManager.getConnection(url, “fgedu”, “password”);
String sql = “INSERT INTO fgedu_order (order_id, user_id, amount) VALUES (?, ?, ?)”;
PreparedStatement pstmt = conn.prepareStatement(sql);
// 关闭自动提交
conn.setAutoCommit(false);from DB视频:www.itpux.com。
int batchSize = 1000;
int count = 0;
for (Order order : orders) {
pstmt.setLong(1, order.getOrderId());
pstmt.setLong(2, order.getUserId());
pstmt.setBigDecimal(3, order.getAmount());
pstmt.addBatch();
if (++count % batchSize == 0) {
pstmt.executeBatch();
conn.commit();
}
}
// 提交剩余批次
pstmt.executeBatch();
conn.commit();
pstmt.close();
conn.close();
“`
2. 重写批量插入
“`java
// 使用rewriteBatchedStatements优化
String url = “jdbc:oceanbase://192.168.1.101:2881/fgedudb?” +
“rewriteBatchedStatements=true&” +
“useServerPrepStmts=true&” +
“cachePrepStmts=true”;
Connection conn = DriverManager.getConnection(url, “fgedu”, “password”);
// … 批量插入代码
“`
3. 多线程批量插入
“`java
// 多线程并行批量插入
ExecutorService executor = Executors.newFixedThreadPool(4);
List> partitions = partitionOrders(orders, 4);
for (List
executor.submit(() -> {
insertBatch(partition);
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
“`
4. 性能对比
– 单条插入:1000 TPS
– 批量插入(1000条):10000 TPS
– 多线程批量(4线程):35000 TPS
Part04-生产案例与实战讲解
4.1 OceanBase数据迁移批量插入案例
– 从MySQL迁移到OceanBase
– 数据量:1000万条
– 表结构:订单表
# 迁移方案
1. 数据导出
$ mysqldump -h192.168.1.100 -P3306 -uroot -p fgedudb fgedu_order \
–no-create-info –extended-insert > /backup/orders.sql
2. 数据转换
#!/bin/bash
# convert_data.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# 转换SQL格式
sed -i ‘s/INSERT INTO `fgedu_order`/INSERT INTO fgedudb.fgedu_order/g’ /backup/orders.sql
3. 批量导入
$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p < /backup/orders.sql
# 或使用LOAD DATA
$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p -e "
LOAD DATA INFILE '/backup/orders.csv'
INTO TABLE fgedudb.fgedu_order
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
"
4. 性能数据
- 数据量:1000万条
- 导入时间:15分钟
- 平均速度:11111条/秒
# 迁移完成
4.2 OceanBase ETL批量插入案例
– 数据仓库ETL
– 每日处理:500万条
– 时间窗口:2小时
# ETL方案
1. 数据抽取
“`python
# Python ETL脚本
import pandas as pd
from sqlalchemy import create_engine
# 连接源数据库
source_engine = create_engine(‘mysql://user:pass@source/db’)
# 抽取数据
df = pd.read_sql(“SELECT * FROM source_table WHERE create_time >= CURDATE()”, source_engine)
# 数据转换
df[‘amount’] = df[‘amount’].astype(float)
df[‘create_time’] = pd.to_datetime(df[‘create_time’])
“`
2. 批量加载
“`python
# 连接OceanBase
target_engine = create_engine(‘oceanbase://fgedu:pass@192.168.1.101:2881/fgedudb’)
# 批量插入
chunk_size = 10000
for i in range(0, len(df), chunk_size):
chunk = df[i:i+chunk_size]
chunk.to_sql(‘fgedu_order’, target_engine, if_exists=’append’, index=False)
print(f”Inserted {i+len(chunk)} rows”)
“`
3. 性能优化
– 批量大小:10000条
– 并行度:4线程
– 预处理:禁用索引
4. 性能数据
– 数据量:500万条
– 处理时间:45分钟
– 平均速度:18518条/秒
# ETL完成
4.3 OceanBase数据同步批量插入案例
– 实时数据同步
– 峰值:10000条/秒
– 延迟要求:< 5秒 # 同步方案 1. 架构设计 ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ 业务系统 │ --> │ 消息队列 │ –> │ 同步服务 │
└─────────────┘ └─────────────┘ └─────────────┘
│
v
┌─────────────┐
│ OceanBase │
└─────────────┘
2. 同步服务代码
“`java
// Kafka消费者批量插入
@KafkaListener(topics = “order_topic”)
public void consume(List
List
.map(ConsumerRecord::value)
.collect(Collectors.toList());
// 批量插入
orderService.batchInsert(orders);
}
// 批量插入服务
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Transactional
public void batchInsert(List
// 分批插入,每批1000条
Lists.partition(orders, 1000).forEach(batch -> {
orderMapper.batchInsert(batch);
});
}
}
“`
3. 性能优化
– 批量大小:1000条
– 消费并发:10线程
– 批量提交:每批一个事务
4. 性能数据
– 峰值处理:12000条/秒
– 平均延迟:2秒
– 成功率:99.99%
# 同步完成
Part05-风哥经验总结与分享
5.1 OceanBase批量插入最佳实践
1. 批量大小选择
– 测试确定最优值
– 考虑网络和内存
– 监控性能指标
2. 事务管理
– 合理控制事务大小
– 及时提交避免锁等待
– 错误处理和重试
3. 并发控制
– 控制并发线程数
– 避免热点冲突
– 监控锁等待
4. 数据准备
– 数据清洗验证
– 格式转换
– 去重处理
5. 性能监控
– 监控插入速度
– 监控资源使用
– 及时调整优化
5.2 OceanBase批量插入性能调优
1. 数据库参数
$ obclient -h192.168.1.101 -P2881 -uroot@sys -p -e ”
— 优化批量插入参数
ALTER SYSTEM SET memstore_limit_percentage=60;
ALTER SYSTEM SET freeze_trigger_percentage=80;
ALTER SYSTEM SET minor_freeze_times=5;
”
2. 表结构优化
– 减少索引数量
– 使用分区表
– 选择合适的数据类型
3. 应用优化
– 使用连接池
– 批量提交
– 异步处理
4. 监控指标
#!/bin/bash
# batch_insert_monitor.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
echo “=== Batch Insert Monitor ===”
# 监控插入TPS
obclient -e “SELECT value FROM oceanbase.__all_virtual_stat WHERE stat_id=10000”
# 监控MemTable使用
obclient -e “SELECT active_memstore_used/1024/1024/1024 FROM oceanbase.__all_virtual_memstore_info”
# 监控锁等待
obclient -e “SELECT COUNT(*) FROM oceanbase.__all_virtual_lock_wait_stat”
echo “=============================”
5.3 OceanBase批量插入常见问题
Q1: 批量插入报内存不足?
A1: 减小批量大小,增加系统内存,优化MemTable配置
Q2: 批量插入速度慢?
A2: 增大批量大小,使用LOAD DATA,优化网络连接
Q3: 批量插入锁等待?
A3: 减小事务大小,调整并发度,避免热点
Q4: 批量插入数据重复?
A4: 使用主键或唯一索引,插入前数据去重
Q5: 批量插入部分失败?
A5: 分批重试,记录失败数据,单独处理
Q6: 如何选择批量大小?
A6: 根据测试确定,一般1000-5000条
Q7: LOAD DATA和INSERT哪个快?
A7: LOAD DATA更快,适合大批量导入
Q8: 批量插入如何监控进度?
A8: 使用进度条,记录已处理数量,定期输出
Q9: 批量插入失败如何回滚?
A9: 使用事务,失败回滚,记录失败数据
Q10: 如何提升批量插入并发?
A10: 多线程并行,分区插入,避免锁冲突
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
