本文档风哥主要介绍MongoDB批量文档写入相关知识,包括MongoDB批量写入的概念、优势、限制、规划、最佳实践、性能优化、实现方法、监控以及生产案例等内容,风哥教程参考MongoDB官方文档CRUD Operations内容编写,适合DBA人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。
Part01-基础概念与理论知识
1.1 MongoDB批量写入的概念
MongoDB批量写入是指通过单次操作插入、更新或删除多个文档的技术,减少网络往返次数,提高操作效率。MongoDB提供了多种批量写入方法,包括insertMany、updateMany、deleteMany等。更多视频教程www.fgedu.net.cn
- 批量插入:使用insertMany插入多个文档
- 批量更新:使用updateMany更新多个文档
- 批量删除:使用deleteMany删除多个文档
- 混合批量操作:使用bulkWrite执行多种操作
1.2 MongoDB批量写入的优势
MongoDB批量写入的主要优势:
- 减少网络往返:单次请求处理多个操作,减少网络延迟
- 提高吞吐量:批量处理比单条处理速度更快
- 原子性操作:批量操作可以保证原子性
- 减少资源消耗:降低客户端和服务器的资源消耗
- 简化代码:减少代码复杂度,提高可维护性
1.3 MongoDB批量写入的限制
MongoDB批量写入的限制:
# 1. 文档大小限制
– 单个文档大小不超过16MB
– 批量操作的总大小受限于网络传输和服务器处理能力
# 2. 操作数量限制
– insertMany默认限制为1000个文档
– 可以通过参数调整,但建议合理设置以避免性能问题
# 3. 内存限制
– 批量操作需要在内存中构建操作列表
– 过大的批量操作可能导致内存不足
# 4. 错误处理
– 默认情况下,批量操作遇到错误会终止
– 可以设置ordered: false继续执行其他操作
Part02-生产环境规划与建议
2.1 MongoDB批量写入规划
MongoDB批量写入规划要点:
# 1. 数据量评估
– 预估批量操作的数据量
– 确定批量大小和频率
# 2. 性能需求分析
– 分析写入性能要求
– 评估网络带宽和延迟
– 考虑服务器处理能力
# 3. 错误处理策略
– 制定错误处理方案
– 确定是否需要事务支持
– 考虑数据一致性要求
# 4. 监控与告警
– 监控批量操作性能
– 设置合理的告警阈值
– 建立操作日志
2.2 MongoDB批量写入最佳实践
MongoDB批量写入最佳实践:
- 合理设置批量大小:根据数据大小和服务器能力设置合适的批量大小
- 使用ordered: false:在不需要严格顺序的情况下提高性能
- 分批处理大任务:将大任务拆分为多个小批量操作
- 使用连接池:保持连接复用,减少连接开销
- 监控操作时间:避免批量操作时间过长影响系统性能
- 合理使用索引:为批量操作的查询条件创建索引
- 考虑写入关注点:根据业务需求设置合适的write concern
- 错误处理:实现合理的错误处理和重试机制
2.3 MongoDB批量写入性能优化
MongoDB批量写入性能优化:
# 1. 服务器配置优化
– 增加服务器内存
– 优化存储系统
– 配置合适的WiredTiger缓存
# 2. MongoDB参数优化
– 设置合适的write concern
– 调整bulkWrite参数
– 优化索引策略
# 3. 应用程序优化
– 使用批量操作替代单条操作
– 合理设置批量大小
– 实现异步写入
– 使用连接池管理连接
# 4. 网络优化
– 减少网络延迟
– 增加网络带宽
– 使用本地连接或高速网络
Part03-生产环境项目实施方案
3.1 MongoDB批量写入方法
3.1.1 insertMany方法
# 基本语法
db.collection.insertMany(
[
{
writeConcern:
ordered:
}
)
# 示例
fgedudb> db.fgedu_users.insertMany([
{name: “fgedu01”, email: “fgedu01@fgedu.net.cn”, age: 25},
{name: “fgedu02”, email: “fgedu02@fgedu.net.cn”, age: 26},
{name: “fgedu03”, email: “fgedu03@fgedu.net.cn”, age: 27}
], {ordered: false})
{
acknowledged: true,
insertedIds: {
‘0’: ObjectId(“6614f8a0a1b2c3d4e5f6g7h8”),
‘1’: ObjectId(“6614f8a0a1b2c3d4e5f6g7h9”),
‘2’: ObjectId(“6614f8a0a1b2c3d4e5f6g7h0”)
}
}
3.1.2 updateMany方法
# 基本语法
db.collection.updateMany(
{
writeConcern:
upsert:
}
)
# 示例
fgedudb> db.fgedu_users.updateMany(
{age: { $lt: 30 }},
{ $set: { status: “active” }}
)
{
acknowledged: true,
matchedCount: 3,
modifiedCount: 3,
upsertedId: null
}
3.1.3 deleteMany方法
# 基本语法
db.collection.deleteMany(
{
writeConcern:
}
)
# 示例
fgedudb> db.fgedu_users.deleteMany(
{age: { $gt: 30 }}
)
{
acknowledged: true,
deletedCount: 0
}
3.1.4 bulkWrite方法
# 基本语法
db.collection.bulkWrite(
[
{ insertOne: { document:
{ updateOne: { filter:
{ updateMany: { filter:
{ deleteOne: { filter:
{ deleteMany: { filter:
{ replaceOne: { filter:
],
{
writeConcern:
ordered:
}
)
# 示例
fgedudb> db.fgedu_users.bulkWrite([
{ insertOne: { document: {name: “fgedu04”, email: “fgedu04@fgedu.net.cn”, age: 28} } },
{ updateOne: { filter: {name: “fgedu01”}, update: { $set: { age: 26 } } } },
{ deleteOne: { filter: {name: “fgedu05”} } }
], {ordered: false})
{
acknowledged: true,
insertedCount: 1,
matchedCount: 1,
modifiedCount: 1,
deletedCount: 0,
upsertedCount: 0,
upsertedIds: {}
}
3.2 MongoDB批量写入实现
3.2.1 使用Node.js实现批量写入
// batch_insert.js
// from:www.itpux.com.qq113257174.wx:itpux-com
// web: `http://www.fgedu.net.cn`
const { MongoClient } = require(‘mongodb’);
async function batchInsert() {
const uri = ‘mongodb://192.168.1.100:27017’;
const client = new MongoClient(uri);
try {
await client.connect();
const db = client.db(‘fgedudb’);
const collection = db.collection(‘fgedu_users’);
// 生成测试数据
const documents = [];
for (let i = 1; i <= 1000; i++) {
documents.push({
name: `fgedu${i.toString().padStart(3, '0')}`,
email: `fgedu${i.toString().padStart(3, '0')}@fgedu.net.cn`,
age: 20 + (i % 30),
created_at: new Date()
});
}
// 批量插入
console.time('batchInsert');
const result = await collection.insertMany(documents, { ordered: false });
console.timeEnd('batchInsert');
console.log(`插入了 ${result.insertedCount} 个文档`);
} catch (error) {
console.error('批量插入失败:', error);
} finally {
await client.close();
}
}
batchInsert();
3.2.2 使用Python实现批量写入
#!/usr/bin/env python3
# batch_insert.py
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: `http://www.fgedu.net.cn`
from pymongo import MongoClient
import time
from datetime import datetime
client = MongoClient(‘mongodb://192.168.1.100:27017’)
db = client[‘fgedudb’]
collection = db[‘fgedu_users’]
# 生成测试数据
documents = []
for i in range(1, 1001):
documents.append({
‘name’: f’fgedu{i:03d}’,
’email’: f’fgedu{i:03d}@fgedu.net.cn’,
‘age’: 20 + (i % 30),
‘created_at’: datetime.now()
})
# 批量插入
start_time = time.time()
result = collection.insert_many(documents, ordered=False)
end_time = time.time()
print(f’插入了 {len(result.inserted_ids)} 个文档’)
print(f’耗时: {end_time – start_time:.2f} 秒’)
client.close()
3.3 MongoDB批量写入监控
3.3.1 监控批量写入性能
# 查看操作性能
fgedudb> db.currentOp()
{
inprog: [
{
opid: 1234,
active: true,
secs_running: 2,
microsecs_running: 2000000,
op: ‘insert’,
ns: ‘fgedudb.fgedu_users’,
command: {
insert: ‘fgedu_users’,
documents: […],
ordered: false
},
…
}
],
…
}
# 查看写入性能指标
fgedudb> db.serverStatus().wiredTiger.cache
{
bytes currently in the cache: 104857600,
bytes read into cache: 209715200,
bytes written from cache: 104857600,
…
}
# 查看集合统计信息
fgedudb> db.fgedu_users.stats()
{
ns: ‘fgedudb.fgedu_users’,
size: 10485760,
count: 10000,
avgObjSize: 1048.576,
storageSize: 41943040,
indexes: 2,
indexSize: 8388608,
…
}
Part04-生产案例与实战讲解
4.1 MongoDB批量写入案例一:用户数据导入
4.1.1 需求分析
需要从CSV文件导入大量用户数据到MongoDB数据库。
4.1.2 解决方案
#!/usr/bin/env python3
# import_users.py
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: `http://www.fgedu.net.cn`
import csv
from pymongo import MongoClient
import time
from datetime import datetime
client = MongoClient(‘mongodb://192.168.1.100:27017’)
db = client[‘fgedudb’]
collection = db[‘fgedu_users’]
# 批量大小
BATCH_SIZE = 1000
# 读取CSV文件并批量导入
def import_users(csv_file):
batch = []
count = 0
start_time = time.time()
with open(csv_file, ‘r’, encoding=’utf-8′) as f:
reader = csv.DictReader(f)
for row in reader:
# 转换数据
user = {
‘name’: row[‘name’],
’email’: row[’email’],
‘phone’: row[‘phone’],
‘age’: int(row[‘age’]),
‘created_at’: datetime.now()
}
batch.append(user)
count += 1
# 达到批量大小,执行插入
if len(batch) >= BATCH_SIZE:
collection.insert_many(batch, ordered=False)
print(f’已导入 {count} 条记录’)
batch = []
# 插入剩余数据
if batch:
collection.insert_many(batch, ordered=False)
print(f’已导入 {count} 条记录’)
end_time = time.time()
print(f’导入完成,共导入 {count} 条记录,耗时: {end_time – start_time:.2f} 秒’)
# 执行导入
import_users(‘users.csv’)
client.close()
4.1.3 执行结果
$ python3 import_users.py
已导入 1000 条记录
已导入 2000 条记录
已导入 3000 条记录
已导入 4000 条记录
已导入 5000 条记录
导入完成,共导入 5000 条记录,耗时: 2.34 秒
# 验证数据
fgedudb> db.fgedu_users.countDocuments()
5000
fgedudb> db.fgedu_users.find().limit(5)
[
{ _id: ObjectId(“6614f8a0a1b2c3d4e5f6g7h8”), name: ‘fgedu001’, email: ‘fgedu001@fgedu.net.cn’, phone: ‘13800138001’, age: 25, created_at: ISODate(“2026-04-08T08:00:00Z”) },
{ _id: ObjectId(“6614f8a0a1b2c3d4e5f6g7h9”), name: ‘fgedu002’, email: ‘fgedu002@fgedu.net.cn’, phone: ‘13800138002’, age: 26, created_at: ISODate(“2026-04-08T08:00:00Z”) },
{ _id: ObjectId(“6614f8a0a1b2c3d4e5f6g7h0”), name: ‘fgedu003’, email: ‘fgedu003@fgedu.net.cn’, phone: ‘13800138003’, age: 27, created_at: ISODate(“2026-04-08T08:00:00Z”) },
{ _id: ObjectId(“6614f8a0a1b2c3d4e5f6g7h1”), name: ‘fgedu004’, email: ‘fgedu004@fgedu.net.cn’, phone: ‘13800138004’, age: 28, created_at: ISODate(“2026-04-08T08:00:00Z”) },
{ _id: ObjectId(“6614f8a0a1b2c3d4e5f6g7h2”), name: ‘fgedu005’, email: ‘fgedu005@fgedu.net.cn’, phone: ‘13800138005’, age: 29, created_at: ISODate(“2026-04-08T08:00:00Z”) }
]
4.2 MongoDB批量写入案例二:订单数据处理
4.2.1 需求分析
需要批量处理大量订单数据,包括插入新订单和更新订单状态。
4.2.2 解决方案
#!/usr/bin/env python3
# process_orders.py
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: `http://www.fgedu.net.cn`
from pymongo import MongoClient
import time
from datetime import datetime
import random
client = MongoClient(‘mongodb://192.168.1.100:27017’)
db = client[‘fgedudb’]
orders_collection = db[‘fgedu_orders’]
# 批量大小
BATCH_SIZE = 500
# 生成测试订单数据
def generate_orders(count):
orders = []
for i in range(count):
order = {
‘order_id’: f’order{i:06d}’,
‘user_id’: f’user{i%1000:04d}’,
‘amount’: random.uniform(100, 10000),
‘status’: ‘pending’,
‘created_at’: datetime.now(),
‘updated_at’: datetime.now()
}
orders.append(order)
return orders
# 批量处理订单
def process_orders():
# 生成订单数据
orders = generate_orders(10000)
# 批量插入订单
start_time = time.time()
# 分批插入
for i in range(0, len(orders), BATCH_SIZE):
batch = orders[i:i+BATCH_SIZE]
orders_collection.insert_many(batch, ordered=False)
print(f’已插入 {i+BATCH_SIZE} 个订单’)
# 批量更新订单状态
update_result = orders_collection.updateMany(
{ ‘status’: ‘pending’ },
{ ‘$set’: { ‘status’: ‘processing’, ‘updated_at’: datetime.now() } }
)
end_time = time.time()
print(f’处理完成,共插入 {len(orders)} 个订单,更新 {update_result.modified_count} 个订单状态,耗时: {end_time – start_time:.2f} 秒’)
# 执行处理
process_orders()
client.close()
4.2.3 执行结果
$ python3 process_orders.py
已插入 500 个订单
已插入 1000 个订单
已插入 1500 个订单
已插入 2000 个订单
已插入 2500 个订单
已插入 3000 个订单
已插入 3500 个订单
已插入 4000 个订单
已插入 4500 个订单
已插入 5000 个订单
已插入 5500 个订单
已插入 6000 个订单
已插入 6500 个订单
已插入 7000 个订单
已插入 7500 个订单
已插入 8000 个订单
已插入 8500 个订单
已插入 9000 个订单
已插入 9500 个订单
已插入 10000 个订单
处理完成,共插入 10000 个订单,更新 10000 个订单状态,耗时: 5.67 秒
# 验证数据
fgedudb> db.fgedu_orders.countDocuments()
10000
fgedudb> db.fgedu_orders.find({ ‘status’: ‘processing’ }).count()
10000
4.3 MongoDB批量写入案例三:日志数据存储
4.3.1 需求分析
需要实时存储大量应用日志数据到MongoDB数据库。
4.3.2 解决方案
#!/usr/bin/env python3
# store_logs.py
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: `http://www.fgedu.net.cn`
from pymongo import MongoClient
import time
from datetime import datetime
import random
import threading
client = MongoClient(‘mongodb://192.168.1.100:27017’)
db = client[‘fgedudb’]
logs_collection = db[‘fgedu_logs’]
# 批量大小
BATCH_SIZE = 1000
# 日志队列
log_queue = []
# 锁
lock = threading.Lock()
# 生成日志数据
def generate_log():
levels = [‘INFO’, ‘WARN’, ‘ERROR’, ‘DEBUG’]
services = [‘api’, ‘auth’, ‘payment’, ‘shipping’]
return {
‘timestamp’: datetime.now(),
‘level’: random.choice(levels),
‘service’: random.choice(services),
‘message’: f’Log message {random.randint(1, 1000000)}’,
‘user_id’: f’user{random.randint(1, 10000):05d}’,
‘ip’: f'{random.randint(1, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}’
}
# 批量写入日志
def write_logs():
global log_queue
while True:
time.sleep(1)
# 每1秒检查一次
with lock:
if len(log_queue) >= BATCH_SIZE:
# 取出批量数据
batch = log_queue[:BATCH_SIZE]
log_queue = log_queue[BATCH_SIZE:]
# 批量插入
try:
logs_collection.insert_many(batch, ordered=False)
print(f’已写入 {len(batch)} 条日志’)
except Exception as e:
print(f’写入日志失败: {e}’)
# 模拟日志生成
def generate_logs():
for i in range(50000):
log = generate_log()
with lock:
log_queue.append(log)
time.sleep(0.001)
# 控制生成速度
print(‘日志生成完成’)
# 启动写入线程
writer_thread = threading.Thread(target=write_logs)
writer_thread.daemon = True
writer_thread.start()
# 开始生成日志
generate_logs()
# 等待所有日志写入完成
while len(log_queue) > 0:
time.sleep(1)
print(‘所有日志已写入’)
client.close()
4.3.3 执行结果
$ python3 store_logs.py
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
已写入 1000 条日志
日志生成完成
所有日志已写入
# 验证数据
fgedudb> db.fgedu_logs.countDocuments()
50000
fgedudb> db.fgedu_logs.find().limit(3)
[
{ _id: ObjectId(“6614f8a0a1b2c3d4e5f6g7h8”), timestamp: ISODate(“2026-04-08T08:00:00Z”), level: ‘INFO’, service: ‘api’, message: ‘Log message 123456’, user_id: ‘user00123’, ip: ‘192.168.1.100’ },
{ _id: ObjectId(“6614f8a0a1b2c3d4e5f6g7h9”), timestamp: ISODate(“2026-04-08T08:00:00Z”), level: ‘ERROR’, service: ‘auth’, message: ‘Log message 789012’, user_id: ‘user00456’, ip: ‘192.168.1.101’ },
{ _id: ObjectId(“6614f8a0a1b2c3d4e5f6g7h0”), timestamp: ISODate(“2026-04-08T08:00:00Z”), level: ‘WARN’, service: ‘payment’, message: ‘Log message 345678’, user_id: ‘user00789’, ip: ‘192.168.1.102’ }
]
Part05-风哥经验总结与分享
5.1 MongoDB批量写入技巧
MongoDB批量写入技巧:
- 合理设置批量大小:根据数据大小和服务器能力设置合适的批量大小,一般建议1000-5000个文档
- 使用ordered: false:在不需要严格顺序的情况下,使用ordered: false提高性能
- 分批处理大任务:将大任务拆分为多个小批量操作,避免内存不足
- 使用连接池:保持连接复用,减少连接开销
- 监控操作时间:避免批量操作时间过长影响系统性能
- 合理使用索引:为批量操作的查询条件创建索引
- 考虑写入关注点:根据业务需求设置合适的write concern
- 错误处理:实现合理的错误处理和重试机制
5.2 MongoDB批量写入脚本
#!/usr/bin/env mongosh
# batch_write.js
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: `http://www.fgedu.net.cn`
// 连接到MongoDB实例
const conn = new Mongo(“192.168.1.100:27017”);
const db = conn.getDB(“fgedudb”);
// 批量插入函数
function batchInsert(collectionName, documents, batchSize = 1000) {
const collection = db.getCollection(collectionName);
let count = 0;
const startTime = new Date();
for (let i = 0; i < documents.length; i += batchSize) { const batch = documents.slice(i, i + batchSize); collection.insertMany(batch, { ordered: false }); count += batch.length; print(`已插入 ${count} 个文档`); } const endTime = new Date(); const duration = (endTime - startTime) / 1000; print(`批量插入完成,共插入 ${count} 个文档,耗时: ${duration.toFixed(2)} 秒`); } // 生成测试数据 function generateTestData(count) { const data = []; for (let i = 1; i <= count; i++) { data.push({ name: `fgedu${i.toString().padStart(5, '0')}`, email: `fgedu${i.toString().padStart(5, '0')}@fgedu.net.cn`, age: 20 + (i % 30), created_at: new Date() }); } return data; } // 执行批量插入 const testData = generateTestData(10000); batchInsert("fgedu_test", testData, 2000); print("操作完成!");
5.3 MongoDB批量写入优化
MongoDB批量写入优化建议:
- 服务器配置优化:增加服务器内存,优化存储系统,配置合适的WiredTiger缓存
- MongoDB参数优化:设置合适的write concern,调整bulkWrite参数,优化索引策略
- 应用程序优化:使用批量操作替代单条操作,合理设置批量大小,实现异步写入,使用连接池管理连接
- 网络优化:减少网络延迟,增加网络带宽,使用本地连接或高速网络
- 监控与调优:定期监控批量操作性能,根据实际情况调整批量大小和写入策略
- 数据模型优化:优化文档结构,减少文档大小,合理使用嵌入和引用
- 错误处理优化:实现合理的错误处理和重试机制,确保数据一致性
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
