本文档风哥主要介绍MongoDB聚合管道统计相关知识,包括MongoDB聚合管道的概念、聚合操作符、聚合原理、规划、最佳实践、性能考虑、实现方法、优化、使用场景以及生产案例等内容,风哥教程参考MongoDB官方文档Aggregation Pipeline内容编写,适合DBA人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。
Part01-基础概念与理论知识
1.1 MongoDB聚合管道的概念
MongoDB聚合管道是指将多个聚合操作组合成一个管道,数据通过管道中的每个阶段进行处理和转换,最终生成聚合结果。聚合管道是MongoDB中用于数据处理和分析的强大工具,可以实现复杂的数据统计和分析功能。更多视频教程www.fgedu.net.cn
- 支持多个聚合阶段的组合
- 数据在管道中流式处理
- 支持复杂的数据转换和计算
- 可以处理大量数据
- 支持索引优化
- 提供丰富的聚合操作符
1.2 MongoDB聚合操作符
MongoDB聚合操作符是用于在聚合管道中处理数据的特殊操作符,主要包括:
# 1. 管道操作符
– $match:过滤文档
– $project:投影字段
– $group:分组聚合
– $sort:排序
– $limit:限制结果数量
– $skip:跳过结果数量
– $unwind:展开数组
– $lookup:关联查询
– $graphLookup:图关联查询
– $facet:多维度聚合
– $bucket:分桶聚合
– $bucketAuto:自动分桶
– $addFields:添加字段
– $replaceRoot:替换根文档
– $merge:合并结果到集合
– $out:输出结果到集合
# 2. 聚合表达式操作符
– $sum:求和
– $avg:平均值
– $min:最小值
– $max:最大值
– $first:第一个值
– $last:最后一个值
– $push:将值添加到数组
– $addToSet:将值添加到集合(去重)
– $count:计数
– $stdDevPop:总体标准差
– $stdDevSamp:样本标准差
# 3. 表达式操作符
– $add:加法
– $subtract:减法
– $multiply:乘法
– $divide:除法
– $mod:取模
– $concat:字符串连接
– $toUpper:转大写
– $toLower:转小写
– $dateToString:日期转字符串
– $cond:条件表达式
– $ifNull:空值处理
– $switch:多条件判断
1.3 MongoDB聚合原理
MongoDB聚合原理基于以下几点:
- 管道处理:数据通过多个阶段的管道进行处理,每个阶段对数据进行转换和处理
- 流式处理:数据在管道中流式处理,减少内存使用
- 并行处理:在分片集群中,聚合操作可以在多个分片上并行执行
- 索引使用:聚合操作可以使用索引提高性能
- 内存限制:聚合操作有内存使用限制,超过限制会使用磁盘
Part02-生产环境规划与建议
2.1 MongoDB聚合规划
MongoDB聚合规划要点:
# 1. 分析业务需求
– 识别需要的统计和分析功能
– 确定数据来源和处理逻辑
– 评估数据量和增长趋势
# 2. 设计聚合管道
– 确定管道阶段的顺序
– 选择合适的聚合操作符
– 优化管道性能
# 3. 索引策略规划
– 为$match阶段创建索引
– 为$sort阶段创建索引
– 考虑覆盖索引
# 4. 性能目标设定
– 设定聚合操作的响应时间目标
– 评估系统的聚合处理能力
– 制定监控和优化策略
# 5. 测试计划
– 制定聚合性能测试计划
– 模拟真实负载测试
– 分析测试结果,优化系统
2.2 MongoDB聚合最佳实践
MongoDB聚合最佳实践:
- 使用$match减少数据量:在管道开始阶段使用$match过滤数据,减少后续阶段处理的数据量
- 使用$project减少字段数:使用$project只保留需要的字段,减少数据传输和处理开销
- 合理使用$group:根据业务需求合理设计分组键,避免过度分组
- 使用索引:为$match和$sort阶段的字段创建索引
- 避免使用$unwind处理大数组:处理大数组会增加内存使用和处理时间
- 使用$out或$merge:对于复杂的聚合操作,考虑将结果输出到集合,避免重复计算
- 监控聚合性能:定期分析慢聚合操作
- 优化聚合管道顺序:将过滤和投影操作放在管道的前面
2.3 MongoDB聚合性能考虑
MongoDB聚合性能考虑:
# 1. 数据量影响
– 处理大量数据会增加聚合时间
– 考虑使用分片集群分散处理负载
– 对于超大数据集,考虑使用$sample采样
# 2. 内存使用
– 聚合操作有内存限制(默认100MB)
– 超过内存限制会使用磁盘,性能下降
– 可以调整aggregationMemoryLimit参数
# 3. 索引使用
– 使用索引可以显著提高聚合性能
– 为$match和$sort阶段的字段创建索引
– 考虑使用覆盖索引
# 4. 管道顺序
– 优化管道顺序,将过滤和投影操作放在前面
– 减少后续阶段处理的数据量
# 5. 硬件影响
– 内存大小会影响聚合性能
– CPU性能会影响聚合计算速度
– 磁盘IO会影响使用磁盘时的性能
# 6. 配置影响
– aggregationMemoryLimit参数设置
– wiredTiger缓存大小
– 并发连接数
Part03-生产环境项目实施方案
3.1 MongoDB聚合实现
3.1.1 基本聚合操作
# 基本语法
db.collection.aggregate([
{ $match:
{ $group:
{ $sort:
…
])
# 示例:统计每个用户的订单总数
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed” } },
{ $group: { _id: “$user_id”, order_count: { $sum: 1 } } },
{ $sort: { order_count: -1 } }
])
# 示例:统计每个用户的订单总金额
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed” } },
{ $group: { _id: “$user_id”, total_amount: { $sum: “$amount” } } },
{ $sort: { total_amount: -1 } }
])
# 示例:统计每个分类的商品数量
fgedudb> db.fgedu_products.aggregate([
{ $group: { _id: “$category”, product_count: { $sum: 1 } } },
{ $sort: { product_count: -1 } }
])
3.1.2 高级聚合操作
# 1. 使用$lookup关联查询
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed” } },
{ $lookup: {
from: “fgedu_users”,
localField: “user_id”,
foreignField: “_id”,
as: “user_info”
}
},
{ $unwind: “$user_info” },
{ $group: {
_id: “$user_info.name”,
total_amount: { $sum: “$amount” }
}
},
{ $sort: { total_amount: -1 } }
])
# 2. 使用$facet多维度聚合
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed” } },
{ $facet: {
“by_status”: [
{ $group: { _id: “$status”, count: { $sum: 1 } } }
],
“by_user”: [
{ $group: { _id: “$user_id”, count: { $sum: 1 } } },
{ $sort: { count: -1 } },
{ $limit: 5 }
],
“total_amount”: [
{ $group: { _id: null, total: { $sum: “$amount” } } }
]
}
}
])
# 3. 使用$bucket分桶聚合
fgedudb> db.fgedu_products.aggregate([
{ $bucket: {
groupBy: “$price”,
boundaries: [0, 100, 500, 1000, 5000],
default: “Other”,
output: {
count: { $sum: 1 },
avg_price: { $avg: “$price” }
}
}
}
])
3.1.3 聚合管道优化
# 1. 优化管道顺序
# 不推荐
db.fgedu_orders.aggregate([
{ $group: { _id: “$user_id”, total: { $sum: “$amount” } } },
{ $match: { total: { $gt: 1000 } } }
])
# 推荐
db.fgedu_orders.aggregate([
{ $match: { amount: { $gt: 0 } } },
{ $group: { _id: “$user_id”, total: { $sum: “$amount” } } },
{ $match: { total: { $gt: 1000 } } }
])
# 2. 使用索引加速$match
fgedudb> db.fgedu_orders.createIndex({ status: 1, created_at: -1 })
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed”, created_at: { $gte: new Date(“2026-04-01”) } } },
{ $group: { _id: “$user_id”, total: { $sum: “$amount” } } }
])
# 3. 使用$project减少字段
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed” } },
{ $project: { user_id: 1, amount: 1, _id: 0 } },
{ $group: { _id: “$user_id”, total: { $sum: “$amount” } } }
])
# 4. 使用$out输出结果
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed” } },
{ $group: { _id: “$user_id”, total: { $sum: “$amount” } } },
{ $sort: { total: -1 } },
{ $out: “fgedu_user_order_totals” }
])
3.2 MongoDB聚合优化
3.2.1 索引优化
# 1. 为$match阶段创建索引
fgedudb> db.fgedu_orders.createIndex({ status: 1, created_at: -1 })
# 2. 为$sort阶段创建索引
fgedudb> db.fgedu_orders.createIndex({ user_id: 1, amount: -1 })
# 3. 使用复合索引
fgedudb> db.fgedu_orders.createIndex({ status: 1, user_id: 1, created_at: -1 })
# 4. 检查索引使用情况
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed”, created_at: { $gte: new Date(“2026-04-01”) } } },
{ $group: { _id: “$user_id”, total: { $sum: “$amount” } } }
]).explain()
3.2.2 内存优化
# 1. 调整聚合内存限制
fgedudb> db.adminCommand({ setParameter: 1, aggregationMemoryLimitMiB: 200 })
# 2. 使用allowDiskUse
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed” } },
{ $group: { _id: “$user_id”, orders: { $push: “$$ROOT” } } },
{ $sort: { “orders.0.created_at”: -1 } }
], { allowDiskUse: true })
# 3. 避免处理大数组
# 不推荐
db.fgedu_users.aggregate([
{ $unwind: “$orders” },
{ $group: { _id: “$orders.status”, count: { $sum: 1 } } }
])
# 推荐
db.fgedu_orders.aggregate([
{ $group: { _id: “$status”, count: { $sum: 1 } } }
])
3.3 MongoDB聚合使用场景
MongoDB聚合使用场景:
- 销售统计:统计销售额、订单数量、客单价等
- 用户行为分析:分析用户登录、浏览、购买等行为
- 产品库存分析:分析产品库存水平、销售趋势等
- 日志分析:分析系统日志、用户操作日志等
- 数据报表:生成各种业务报表
- 数据清洗:处理和转换数据
- 实时分析:实时分析业务数据
- 数据挖掘:发现数据中的模式和趋势
Part04-生产案例与实战讲解
4.1 MongoDB聚合案例一:销售统计
4.1.1 需求分析
需要实现销售数据的统计分析,包括销售额、订单数量、客单价、销售趋势等。
4.1.2 解决方案
# 1. 创建索引
fgedudb> db.fgedu_orders.createIndex({ status: 1, created_at: -1 })
fgedudb> db.fgedu_orders.createIndex({ user_id: 1, status: 1 })
# 2. 统计总销售额和订单数量
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed” } },
{ $group: {
_id: null,
total_sales: { $sum: “$amount” },
total_orders: { $sum: 1 },
avg_order_value: { $avg: “$amount” }
}
}
])
# 3. 按日期统计销售趋势
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed”, created_at: { $gte: new Date(“2026-04-01”), $lte: new Date(“2026-04-30”) } } },
{ $project: {
date: { $dateToString: { format: “%Y-%m-%d”, date: “$created_at” } },
amount: 1
}
},
{ $group: {
_id: “$date”,
daily_sales: { $sum: “$amount” },
order_count: { $sum: 1 }
}
},
{ $sort: { _id: 1 } }
])
# 4. 按用户统计销售额
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed” } },
{ $lookup: {
from: “fgedu_users”,
localField: “user_id”,
foreignField: “_id”,
as: “user_info”
}
},
{ $unwind: “$user_info” },
{ $group: {
_id: “$user_info.name”,
total_sales: { $sum: “$amount” },
order_count: { $sum: 1 }
}
},
{ $sort: { total_sales: -1 } },
{ $limit: 10 }
])
# 5. 按产品类别统计销售额
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed” } },
{ $unwind: “$items” },
{ $lookup: {
from: “fgedu_products”,
localField: “items.product_id”,
foreignField: “_id”,
as: “product_info”
}
},
{ $unwind: “$product_info” },
{ $group: {
_id: “$product_info.category”,
total_sales: { $sum: { $multiply: [“$items.quantity”, “$items.price”] } },
product_count: { $sum: “$items.quantity” }
}
},
{ $sort: { total_sales: -1 } }
])
4.2 MongoDB聚合案例二:用户行为分析
4.2.1 需求分析
需要实现用户行为的分析,包括登录次数、浏览次数、购买次数、活跃用户等。
4.2.2 解决方案
# 1. 创建索引
fgedudb> db.fgedu_user_actions.createIndex({ user_id: 1, action_type: 1, action_time: -1 })
# 2. 统计用户行为
fgedudb> db.fgedu_user_actions.aggregate([
{ $match: { action_time: { $gte: new Date(“2026-04-01”), $lte: new Date(“2026-04-30”) } } },
{ $group: {
_id: { user_id: “$user_id”, action_type: “$action_type” },
count: { $sum: 1 }
}
},
{ $group: {
_id: “$user_id”,
actions: { $push: { action_type: “$action_type”, count: “$count” } }
}
},
{ $lookup: {
from: “fgedu_users”,
localField: “_id”,
foreignField: “_id”,
as: “user_info”
}
},
{ $unwind: “$user_info” },
{ $project: {
user_id: “$_id”,
user_name: “$user_info.name”,
actions: 1,
_id: 0
}
}
])
# 3. 统计活跃用户
fgedudb> db.fgedu_user_actions.aggregate([
{ $match: { action_time: { $gte: new Date(Date.now() – 7 * 24 * 60 * 60 * 1000) } } },
{ $group: {
_id: “$user_id”,
action_count: { $sum: 1 }
}
},
{ $match: { action_count: { $gte: 5 } } },
{ $lookup: {
from: “fgedu_users”,
localField: “_id”,
foreignField: “_id”,
as: “user_info”
}
},
{ $unwind: “$user_info” },
{ $project: {
user_id: “$_id”,
user_name: “$user_info.name”,
action_count: 1,
_id: 0
}
},
{ $sort: { action_count: -1 } }
])
# 4. 分析用户购买行为
fgedudb> db.fgedu_user_actions.aggregate([
{ $match: { action_type: “purchase”, action_time: { $gte: new Date(“2026-04-01”), $lte: new Date(“2026-04-30”) } } },
{ $lookup: {
from: “fgedu_orders”,
localField: “order_id”,
foreignField: “_id”,
as: “order_info”
}
},
{ $unwind: “$order_info” },
{ $group: {
_id: “$user_id”,
purchase_count: { $sum: 1 },
total_amount: { $sum: “$order_info.amount” }
}
},
{ $lookup: {
from: “fgedu_users”,
localField: “_id”,
foreignField: “_id”,
as: “user_info”
}
},
{ $unwind: “$user_info” },
{ $project: {
user_id: “$_id”,
user_name: “$user_info.name”,
purchase_count: 1,
total_amount: 1,
avg_purchase: { $divide: [“$total_amount”, “$purchase_count”] },
_id: 0
}
},
{ $sort: { total_amount: -1 } }
])
4.3 MongoDB聚合案例三:产品库存分析
4.3.1 需求分析
需要实现产品库存的分析,包括库存水平、库存周转率、缺货情况等。
4.3.2 解决方案
# 1. 创建索引
fgedudb> db.fgedu_products.createIndex({ category: 1, stock: 1 })
fgedudb> db.fgedu_orders.createIndex({ status: 1, created_at: -1 })
# 2. 分析库存水平
fgedudb> db.fgedu_products.aggregate([
{ $project: {
name: 1,
category: 1,
stock: 1,
stock_status: {
$cond: {
if: { $lte: [“$stock”, 0] },
then: “out_of_stock”,
else: {
$cond: {
if: { $lte: [“$stock”, 10] },
then: “low_stock”,
else: “in_stock”
}
}
}
}
}
},
{ $group: {
_id: { category: “$category”, stock_status: “$stock_status” },
product_count: { $sum: 1 }
}
},
{ $sort: { “_id.category”: 1, “_id.stock_status”: 1 } }
])
# 3. 分析库存周转率
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed”, created_at: { $gte: new Date(Date.now() – 30 * 24 * 60 * 60 * 1000) } } },
{ $unwind: “$items” },
{ $group: {
_id: “$items.product_id”,
sold_quantity: { $sum: “$items.quantity” }
}
},
{ $lookup: {
from: “fgedu_products”,
localField: “_id”,
foreignField: “_id”,
as: “product_info”
}
},
{ $unwind: “$product_info” },
{ $project: {
product_id: “$_id”,
product_name: “$product_info.name”,
category: “$product_info.category”,
stock: “$product_info.stock”,
sold_quantity: 1,
turnover_rate: { $divide: [“$sold_quantity”, { $add: [“$product_info.stock”, “$sold_quantity”] }] }
}
},
{ $sort: { turnover_rate: -1 } }
])
# 4. 分析缺货情况
fgedudb> db.fgedu_products.aggregate([
{ $match: { stock: { $lte: 0 } } },
{ $group: {
_id: “$category”,
out_of_stock_count: { $sum: 1 }
}
},
{ $sort: { out_of_stock_count: -1 } }
])
# 5. 预测库存需求
fgedudb> db.fgedu_orders.aggregate([
{ $match: { status: “completed”, created_at: { $gte: new Date(Date.now() – 90 * 24 * 60 * 60 * 1000) } } },
{ $unwind: “$items” },
{ $project: {
product_id: “$items.product_id”,
quantity: “$items.quantity”,
month: { $dateToString: { format: “%Y-%m”, date: “$created_at” } }
}
},
{ $group: {
_id: { product_id: “$product_id”, month: “$month” },
monthly_sales: { $sum: “$quantity” }
}
},
{ $group: {
_id: “$product_id”,
avg_monthly_sales: { $avg: “$monthly_sales” },
max_monthly_sales: { $max: “$monthly_sales” }
}
},
{ $lookup: {
from: “fgedu_products”,
localField: “_id”,
foreignField: “_id”,
as: “product_info”
}
},
{ $unwind: “$product_info” },
{ $project: {
product_id: “$_id”,
product_name: “$product_info.name”,
current_stock: “$product_info.stock”,
avg_monthly_sales: 1,
max_monthly_sales: 1,
recommended_stock: { $multiply: [“$max_monthly_sales”, 1.5] },
stock_shortage: {
$cond: {
if: { $lt: [“$product_info.stock”, { $multiply: [“$avg_monthly_sales”, 0.5] }] },
then: “Yes”,
else: “No”
}
}
}
},
{ $sort: { stock_shortage: 1 } }
])
Part05-风哥经验总结与分享
5.1 MongoDB聚合技巧
MongoDB聚合技巧:
- 优化管道顺序:将过滤和投影操作放在管道的前面,减少后续阶段处理的数据量
- 使用索引:为$match和$sort阶段的字段创建索引,提高查询性能
- 减少数据传输:使用$project只保留需要的字段,减少数据传输和处理开销
- 合理使用$group:根据业务需求合理设计分组键,避免过度分组
- 使用$out或$merge:对于复杂的聚合操作,考虑将结果输出到集合,避免重复计算
- 监控内存使用:注意聚合操作的内存使用,避免超过内存限制
- 使用allowDiskUse:对于大型聚合操作,使用allowDiskUse参数允许使用磁盘
- 测试性能:定期测试聚合操作的性能,优化聚合管道
5.2 MongoDB聚合脚本
#!/usr/bin/env mongosh
# aggregation_scripts.js
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: `http://www.fgedu.net.cn`
// 连接到MongoDB实例
const conn = new Mongo(“192.168.1.100:27017”);
const db = conn.getDB(“fgedudb”);
// 销售统计函数
function salesReport() {
print(“生成销售统计报表…”);
const result = db.fgedu_orders.aggregate([
{ $match: { status: “completed”, created_at: { $gte: new Date(Date.now() – 30 * 24 * 60 * 60 * 1000) } } },
{ $group: {
_id: { $dateToString: { format: “%Y-%m-%d”, date: “$created_at” } },
daily_sales: { $sum: “$amount” },
order_count: { $sum: 1 },
avg_order_value: { $avg: “$amount” }
}
},
{ $sort: { _id: 1 } },
{ $out: “fgedu_daily_sales_report” }
]);
print(“销售统计报表生成完成,结果已输出到 fgedu_daily_sales_report 集合”);
}
// 用户行为分析函数
function userBehaviorAnalysis() {
print(“分析用户行为…”);
const result = db.fgedu_user_actions.aggregate([
{ $match: { action_time: { $gte: new Date(Date.now() – 7 * 24 * 60 * 60 * 1000) } } },
{ $group: {
_id: { user_id: “$user_id”, action_type: “$action_type” },
count: { $sum: 1 }
}
},
{ $group: {
_id: “$user_id”,
actions: { $push: { action_type: “$action_type”, count: “$count” } }
}
},
{ $lookup: {
from: “fgedu_users”,
localField: “_id”,
foreignField: “_id”,
as: “user_info”
}
},
{ $unwind: “$user_info” },
{ $project: {
user_id: “$_id”,
user_name: “$user_info.name”,
actions: 1,
_id: 0
}
},
{ $out: “fgedu_user_behavior_report” }
]);
print(“用户行为分析完成,结果已输出到 fgedu_user_behavior_report 集合”);
}
// 库存分析函数
function inventoryAnalysis() {
print(“分析库存情况…”);
const result = db.fgedu_products.aggregate([
{ $project: {
name: 1,
category: 1,
stock: 1,
stock_status: {
$cond: {
if: { $lte: [“$stock”, 0] },
then: “out_of_stock”,
else: {
$cond: {
if: { $lte: [“$stock”, 10] },
then: “low_stock”,
else: “in_stock”
}
}
}
}
}
},
{ $group: {
_id: { category: “$category”, stock_status: “$stock_status” },
product_count: { $sum: 1 }
}
},
{ $sort: { “_id.category”: 1, “_id.stock_status”: 1 } },
{ $out: “fgedu_inventory_report” }
]);
print(“库存分析完成,结果已输出到 fgedu_inventory_report 集合”);
}
// 执行聚合操作
salesReport();
userBehaviorAnalysis();
inventoryAnalysis();
print(“聚合操作完成!”);
5.3 MongoDB聚合监控
MongoDB聚合监控建议:
- 启用慢查询日志:配置慢查询阈值,记录慢聚合操作
- 使用数据库分析器:启用数据库分析器,收集聚合操作的统计信息
- 监控聚合执行时间:定期检查聚合操作的执行时间,识别性能瓶颈
- 分析聚合执行计划:使用explain()分析聚合操作的执行计划
- 监控内存使用:监控聚合操作的内存使用情况,避免内存不足
- 使用MongoDB Atlas:如果使用MongoDB Atlas,利用其监控工具
- 设置聚合告警:为慢聚合操作设置告警,及时发现问题
- 定期性能分析:定期进行性能分析,优化聚合操作
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
