1. 首页 > 国产数据库教程 > OceanBase教程 > 正文

OceanBase教程FG064-OceanBase批量插入优化实战

本文档风哥主要介绍OceanBase数据库批量插入优化相关知识,包括OceanBase批量插入概念、OceanBase批量插入优势、OceanBase批量插入场景、OceanBase批量大小规划、OceanBase SQL批量插入、OceanBase LOAD DATA、OceanBase JDBC批量插入等内容,风哥教程参考OceanBase官方文档性能优化、批量操作等内容编写,适合DBA人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。

Part01-基础概念与理论知识

1.1 OceanBase批量插入概念

批量插入是将多条记录合并为一个操作批量提交到数据库,相比单条插入可以显著提升性能。更多视频教程www.fgedu.net.cn

OceanBase批量插入方式:

  • 多值INSERT:INSERT INTO … VALUES (…), (…), (…)
  • INSERT SELECT:INSERT INTO … SELECT … FROM …
  • LOAD DATA:从文件批量导入数据
  • JDBC Batch:使用addBatch和executeBatch
  • 并行插入:多线程并行批量插入

1.2 OceanBase批量插入优势

# 批量插入优势

1. 减少网络往返
– 单条插入:N条数据 = N次网络往返
– 批量插入:N条数据 = 1次网络往返
– 性能提升:10-100倍

2. 减少事务开销
– 单条插入:N条数据 = N个事务
– 批量插入:N条数据 = 1个事务
– 减少日志写入

3. 优化内存使用
– 减少内存分配次数
– 提高缓存命中率
– 降低GC压力

4. 提升磁盘效率
– 顺序写入优化
– 减少随机IO
– 提高磁盘吞吐量

1.3 OceanBase批量插入场景

# 批量插入场景

1. 数据迁移
– 从其他数据库迁移
– 历史数据导入
– 数据仓库加载

2. 数据同步
– 主从数据同步
– 跨系统数据同步
– 实时数据同步

3. ETL处理
– 数据抽取
– 数据转换
– 数据加载

4. 批量业务
– 批量订单导入
– 批量用户注册
– 批量数据更新

风哥提示:批量插入是提升数据导入性能的关键手段,合理选择批量大小和插入方式可以显著提升性能。

Part02-生产环境规划与建议

2.1 OceanBase批量大小规划

# 批量大小规划

1. 影响因素
– 网络延迟
– 内存限制
– 事务大小
– 表结构复杂度

2. 推荐批量大小
┌─────────────────┬─────────────────────────────────────┐
│ 场景 │ 推荐批量大小 │
├─────────────────┼─────────────────────────────────────┤
│ 小表简单结构 │ 1000-5000条 │
│ 大表复杂结构 │ 500-2000条 │
│ 网络延迟高 │ 500-1000条 │
│ 内存受限 │ 100-500条 │
│ 超大文本字段 │ 50-200条 │
└─────────────────┴─────────────────────────────────────┘

3. 批量大小测试
– 从小到大逐步测试
– 监控内存使用
– 观察性能曲线
– 找到最优值

2.2 OceanBase事务规划

# 事务规划

1. 事务大小
– 推荐:每事务1000-10000条
– 最大:不超过100000条
– 根据内存和性能调整

2. 事务提交策略
– 定时提交:每N秒提交
– 定量提交:每N条提交
– 混合策略:定时或定量

3. 错误处理
– 批量失败重试
– 单条失败记录
– 事务回滚处理

4. 并发控制
– 批量插入并发数
– 锁等待处理
– 死锁避免,风哥提示:。

2.3 OceanBase资源规划

# 资源规划

1. 内存规划
– 批量缓冲区:1-2GB
– 会话内存:512MB-1GB
– 系统预留:20%

2. CPU规划
– 批量插入线程数:CPU核数的1-2倍
– 并行度:根据表分区数
– 后台任务:预留20% CPU

3. 磁盘规划
– 数据文件:SSD存储
– 日志文件:独立磁盘
– 临时文件:高速存储

4. 网络规划
– 带宽:1Gbps以上
– 延迟:< 10ms - 稳定性:99.9%

生产环境建议:批量插入需要合理规划资源,避免影响正常业务。建议在业务低峰期执行大批量插入。学习交流加群风哥微信: itpux-com,学习交流加群风哥微信: itpux-com。

Part03-生产环境项目实施方案

3.1 OceanBase SQL批量插入

3.1.1 多值INSERT

# 多值INSERT

1. 基本语法
$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p -e ”
INSERT INTO fgedudb.fgedu_order (order_id, user_id, amount, create_time) VALUES
(1, 10001, 199.99, ‘2024-01-20 10:00:00’),
(2, 10002, 299.99, ‘2024-01-20 10:00:01’),
(3, 10003, 399.99, ‘2024-01-20 10:00:02’),

(1000, 11000, 99.99, ‘2024-01-20 10:00:59’);

Query OK, 1000 rows affected
Records: 1000 Duplicates: 0 Warnings: 0
Time: 0.5s

2. 性能对比
# 单条插入(1000条)
单条INSERT 1000次:50秒

# 批量插入(1000条)
批量INSERT 1次:0.5秒

性能提升:100倍

3. 自动生成批量SQL
#!/bin/bash
# generate_batch_insert.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

TABLE_NAME=”fgedudb.fgedu_order”
BATCH_SIZE=1000,学习交流加群风哥QQ113257174。
START_ID=1
END_ID=10000

echo “INSERT INTO $TABLE_NAME (order_id, user_id, amount, create_time) VALUES”

for ((i=START_ID; i<=END_ID; i++)); do USER_ID=$((10000 + i)) AMOUNT=$(echo "scale=2; $RANDOM/100" | bc) CREATE_TIME=$(date '+%Y-%m-%d %H:%M:%S') if [ $i -eq $END_ID ]; then echo "($i, $USER_ID, $AMOUNT, '$CREATE_TIME');" elif [ $((i % BATCH_SIZE)) -eq 0 ]; then echo "($i, $USER_ID, $AMOUNT, '$CREATE_TIME');" echo "INSERT INTO $TABLE_NAME (order_id, user_id, amount, create_time) VALUES" else echo "($i, $USER_ID, $AMOUNT, '$CREATE_TIME')," fi done

3.1.2 INSERT SELECT

# INSERT SELECT

$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p -e ”
— 从临时表导入数据
INSERT INTO fgedudb.fgedu_order
SELECT order_id, user_id, amount, create_time
FROM fgedudb.fgedu_order_temp
WHERE create_time >= ‘2024-01-01’;

Query OK, 100000 rows affected
Records: 100000 Duplicates: 0 Warnings: 0
Time: 5.2s

— 使用并行插入
INSERT /*+ PARALLEL(4) */ INTO fgedudb.fgedu_order
SELECT /*+ PARALLEL(4) */ order_id, user_id, amount, create_time
FROM fgedudb.fgedu_order_temp;更多视频教程www.fgedu.net.cn。

Query OK, 100000 rows affected
Time: 2.1s

3.2 OceanBase LOAD DATA

# LOAD DATA

1. 准备数据文件
$ cat /data/orders.csv
1,10001,199.99,2024-01-20 10:00:00
2,10002,299.99,2024-01-20 10:00:01
3,10003,399.99,2024-01-20 10:00:02

100000,110000,99.99,2024-01-20 10:10:00

2. 执行LOAD DATA
$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p -e ”
LOAD DATA INFILE ‘/data/orders.csv’
INTO TABLE fgedudb.fgedu_order
FIELDS TERMINATED BY ‘,’
LINES TERMINATED BY ‘\n’
(order_id, user_id, amount, create_time);

Query OK, 100000 rows affected
Records: 100000 Deleted: 0 Skipped: 0 Warnings: 0
Time: 3.5s

3. 高级选项
$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p -e ”
LOAD DATA INFILE ‘/data/orders.csv’
INTO TABLE fgedudb.fgedu_order
FIELDS TERMINATED BY ‘,’
ENCLOSED BY ‘\”‘
ESCAPED BY ‘\\’
LINES TERMINATED BY ‘\n’,更多学习教程公众号风哥教程itpux_com。
IGNORE 1 LINES
(order_id, user_id, amount, @create_time)
SET create_time = STR_TO_DATE(@create_time, ‘%Y-%m-%d %H:%i:%s’);

4. 性能对比
– 单条INSERT:100000条 = 500秒
– 批量INSERT:100000条 = 50秒
– LOAD DATA:100000条 = 3.5秒

LOAD DATA性能最优

3.3 OceanBase JDBC批量插入

# JDBC批量插入

1. 基本批量插入
“`java
// Java代码
String url = “jdbc:oceanbase://192.168.1.101:2881/fgedudb”;
Connection conn = DriverManager.getConnection(url, “fgedu”, “password”);

String sql = “INSERT INTO fgedu_order (order_id, user_id, amount) VALUES (?, ?, ?)”;
PreparedStatement pstmt = conn.prepareStatement(sql);

// 关闭自动提交
conn.setAutoCommit(false);from DB视频:www.itpux.com。

int batchSize = 1000;
int count = 0;

for (Order order : orders) {
pstmt.setLong(1, order.getOrderId());
pstmt.setLong(2, order.getUserId());
pstmt.setBigDecimal(3, order.getAmount());
pstmt.addBatch();

if (++count % batchSize == 0) {
pstmt.executeBatch();
conn.commit();
}
}

// 提交剩余批次
pstmt.executeBatch();
conn.commit();

pstmt.close();
conn.close();
“`

2. 重写批量插入
“`java
// 使用rewriteBatchedStatements优化
String url = “jdbc:oceanbase://192.168.1.101:2881/fgedudb?” +
“rewriteBatchedStatements=true&” +
“useServerPrepStmts=true&” +
“cachePrepStmts=true”;

Connection conn = DriverManager.getConnection(url, “fgedu”, “password”);
// … 批量插入代码
“`

3. 多线程批量插入
“`java
// 多线程并行批量插入
ExecutorService executor = Executors.newFixedThreadPool(4);

List> partitions = partitionOrders(orders, 4);

for (List partition : partitions) {
executor.submit(() -> {
insertBatch(partition);
});
}

executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
“`

4. 性能对比
– 单条插入:1000 TPS
– 批量插入(1000条):10000 TPS
– 多线程批量(4线程):35000 TPS

风哥提示:JDBC批量插入是Java应用最常用的批量插入方式,合理配置连接参数可以显著提升性能。学习交流加群风哥QQ113257174

Part04-生产案例与实战讲解

4.1 OceanBase数据迁移批量插入案例

# 业务场景
– 从MySQL迁移到OceanBase
– 数据量:1000万条
– 表结构:订单表

# 迁移方案

1. 数据导出
$ mysqldump -h192.168.1.100 -P3306 -uroot -p fgedudb fgedu_order \
–no-create-info –extended-insert > /backup/orders.sql

2. 数据转换
#!/bin/bash
# convert_data.sh
# from:www.itpux.com.qq113257174.wx:itpux-com

# 转换SQL格式
sed -i ‘s/INSERT INTO `fgedu_order`/INSERT INTO fgedudb.fgedu_order/g’ /backup/orders.sql

3. 批量导入
$ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p < /backup/orders.sql # 或使用LOAD DATA $ obclient -h192.168.1.101 -P2881 -uroot@fgedu_tenant -p -e " LOAD DATA INFILE '/backup/orders.csv' INTO TABLE fgedudb.fgedu_order FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'; " 4. 性能数据 - 数据量:1000万条 - 导入时间:15分钟 - 平均速度:11111条/秒 # 迁移完成

4.2 OceanBase ETL批量插入案例

# 业务场景
– 数据仓库ETL
– 每日处理:500万条
– 时间窗口:2小时

# ETL方案

1. 数据抽取
“`python
# Python ETL脚本
import pandas as pd
from sqlalchemy import create_engine

# 连接源数据库
source_engine = create_engine(‘mysql://user:pass@source/db’)

# 抽取数据
df = pd.read_sql(“SELECT * FROM source_table WHERE create_time >= CURDATE()”, source_engine)

# 数据转换
df[‘amount’] = df[‘amount’].astype(float)
df[‘create_time’] = pd.to_datetime(df[‘create_time’])
“`

2. 批量加载
“`python
# 连接OceanBase
target_engine = create_engine(‘oceanbase://fgedu:pass@192.168.1.101:2881/fgedudb’)

# 批量插入
chunk_size = 10000
for i in range(0, len(df), chunk_size):
chunk = df[i:i+chunk_size]
chunk.to_sql(‘fgedu_order’, target_engine, if_exists=’append’, index=False)
print(f”Inserted {i+len(chunk)} rows”)
“`

3. 性能优化
– 批量大小:10000条
– 并行度:4线程
– 预处理:禁用索引

4. 性能数据
– 数据量:500万条
– 处理时间:45分钟
– 平均速度:18518条/秒

# ETL完成

4.3 OceanBase数据同步批量插入案例

# 业务场景
– 实时数据同步
– 峰值:10000条/秒
– 延迟要求:< 5秒 # 同步方案 1. 架构设计 ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ 业务系统 │ --> │ 消息队列 │ –> │ 同步服务 │
└─────────────┘ └─────────────┘ └─────────────┘

v
┌─────────────┐
│ OceanBase │
└─────────────┘

2. 同步服务代码
“`java
// Kafka消费者批量插入
@KafkaListener(topics = “order_topic”)
public void consume(List> records) {
List orders = records.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList());

// 批量插入
orderService.batchInsert(orders);
}

// 批量插入服务
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;

@Transactional
public void batchInsert(List orders) {
// 分批插入,每批1000条
Lists.partition(orders, 1000).forEach(batch -> {
orderMapper.batchInsert(batch);
});
}
}
“`

3. 性能优化
– 批量大小:1000条
– 消费并发:10线程
– 批量提交:每批一个事务

4. 性能数据
– 峰值处理:12000条/秒
– 平均延迟:2秒
– 成功率:99.99%

# 同步完成

生产环境建议:批量插入需要根据业务场景选择合适的方式,持续监控性能指标,及时调整优化。更多学习教程公众号风哥教程itpux_com

Part05-风哥经验总结与分享

5.1 OceanBase批量插入最佳实践

# 批量插入最佳实践

1. 批量大小选择
– 测试确定最优值
– 考虑网络和内存
– 监控性能指标

2. 事务管理
– 合理控制事务大小
– 及时提交避免锁等待
– 错误处理和重试

3. 并发控制
– 控制并发线程数
– 避免热点冲突
– 监控锁等待

4. 数据准备
– 数据清洗验证
– 格式转换
– 去重处理

5. 性能监控
– 监控插入速度
– 监控资源使用
– 及时调整优化

5.2 OceanBase批量插入性能调优

# 性能调优

1. 数据库参数
$ obclient -h192.168.1.101 -P2881 -uroot@sys -p -e ”
— 优化批量插入参数
ALTER SYSTEM SET memstore_limit_percentage=60;
ALTER SYSTEM SET freeze_trigger_percentage=80;
ALTER SYSTEM SET minor_freeze_times=5;

2. 表结构优化
– 减少索引数量
– 使用分区表
– 选择合适的数据类型

3. 应用优化
– 使用连接池
– 批量提交
– 异步处理

4. 监控指标
#!/bin/bash
# batch_insert_monitor.sh
# from:www.itpux.com.qq113257174.wx:itpux-com

echo “=== Batch Insert Monitor ===”

# 监控插入TPS
obclient -e “SELECT value FROM oceanbase.__all_virtual_stat WHERE stat_id=10000”

# 监控MemTable使用
obclient -e “SELECT active_memstore_used/1024/1024/1024 FROM oceanbase.__all_virtual_memstore_info”

# 监控锁等待
obclient -e “SELECT COUNT(*) FROM oceanbase.__all_virtual_lock_wait_stat”

echo “=============================”

5.3 OceanBase批量插入常见问题

# 批量插入常见问题及解决

Q1: 批量插入报内存不足?
A1: 减小批量大小,增加系统内存,优化MemTable配置

Q2: 批量插入速度慢?
A2: 增大批量大小,使用LOAD DATA,优化网络连接

Q3: 批量插入锁等待?
A3: 减小事务大小,调整并发度,避免热点

Q4: 批量插入数据重复?
A4: 使用主键或唯一索引,插入前数据去重

Q5: 批量插入部分失败?
A5: 分批重试,记录失败数据,单独处理

Q6: 如何选择批量大小?
A6: 根据测试确定,一般1000-5000条

Q7: LOAD DATA和INSERT哪个快?
A7: LOAD DATA更快,适合大批量导入

Q8: 批量插入如何监控进度?
A8: 使用进度条,记录已处理数量,定期输出

Q9: 批量插入失败如何回滚?
A9: 使用事务,失败回滚,记录失败数据

Q10: 如何提升批量插入并发?
A10: 多线程并行,分区插入,避免锁冲突

风哥提示:批量插入是数据导入的核心技术,掌握各种批量插入方式和优化技巧可以显著提升数据处理效率。from OceanBase视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息