1. 首页 > MongoDB教程 > 正文

MongoDB教程FG096-MongoDB数据库数据仓库实战

本文主要介绍如何使用MongoDB构建数据仓库,包括ETL流程、数据建模、聚合分析、BI集成等内容。风哥教程参考MongoDB官方文档Data Warehouse相关章节。

目录大纲

Part01-基础概念与理论知识

Part02-生产环境规划与建议

Part03-生产环境项目实施方案

Part04-生产案例与实战讲解

Part05-风哥经验总结与分享

Part01-基础概念与理论知识

1.1 数据仓库概述

数据仓库是一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合,用于支持管理决策。数据仓库的主要特点包括:

  • 面向主题:数据仓库围绕主题组织数据,而不是围绕应用程序
  • 集成性:数据仓库集成来自多个数据源的数据
  • 稳定性:数据仓库中的数据一般不进行更新操作
  • 时变性:数据仓库包含历史数据,反映数据的变化

数据仓库的基本架构包括:

  • 数据源层:包括业务数据库、日志文件、外部数据等
  • ETL层:负责数据的抽取、转换和加载
  • 存储层:存储清洗后的数据
  • 分析层:提供数据分析和报表功能

学习交流加群风哥微信: itpux-com

1.2 MongoDB数据仓库特点

MongoDB作为数据仓库的优势:

  • 灵活的文档模型:支持复杂的嵌套数据结构
  • 强大的聚合框架:支持复杂的数据分析
  • 水平扩展:支持分片集群,处理大规模数据
  • 高性能:支持内存计算和索引优化
  • 实时分析:支持实时数据摄入和分析

MongoDB数据仓库适用场景:

  • 需要处理半结构化或非结构化数据
  • 需要灵活的数据模型
  • 需要实时数据分析
  • 需要水平扩展能力

更多视频教程www.fgedu.net.cn

Part02-生产环境规划与建议

2.1 数据仓库架构设计

MongoDB数据仓库架构设计:

  • 数据源层:业务数据库(MySQL、Oracle等)、日志系统、外部API
  • 数据采集层:使用Change Streams、Kafka、Logstash等工具采集数据
  • 数据存储层:MongoDB分片集群存储原始数据和清洗后的数据
  • 数据处理层:使用Aggregation Pipeline进行数据转换和聚合
  • 数据服务层:提供API接口供BI工具和报表系统使用

数据仓库分层设计:

  • ODS层(操作数据存储):存储原始数据
  • DWD层(明细数据层):存储清洗后的明细数据
  • DWS层(汇总数据层):存储轻度汇总数据
  • ADS层(应用数据层):存储应用所需的数据

风哥提示:数据仓库架构设计需要考虑数据量、查询性能和扩展性。

2.2 硬件与环境规划

硬件规划:

  • CPU:选择多核CPU,建议16核以上
  • 内存:建议内存大小为数据大小的50-80%,最低64GB
  • 存储:使用SSD存储,建议配置RAID 10
  • 网络:使用万兆网络,确保节点间通信

环境规划:

  • 部署模式:使用分片集群部署,支持水平扩展
  • 副本集配置:每个分片配置3节点副本集
  • Config Server:配置3节点Config Server副本集
  • Mongos路由:部署多个Mongos实例,使用负载均衡

更多学习教程公众号风哥教程itpux_com

Part03-生产环境项目实施方案

3.1 ETL流程实现

ETL流程实现:

# 1. 数据抽取(Extract)
# 使用Python从MySQL抽取数据
import pymysql
from pymongo import MongoClient
import json

# 连接MySQL
mysql_conn = pymysql.connect(
host=’192.168.1.100′,
port=3306,
user=’fgedu’,
password=’fgedu123′,
database=’fgedudb’
)

# 连接MongoDB
mongo_client = MongoClient(‘mongodb://fgedu:fgedu123@192.168.1.101:27017/admin’)
mongo_db = mongo_client[‘fgedu_warehouse’]

# 抽取数据
with mysql_conn.cursor() as cursor:
cursor.execute(“SELECT * FROM fgedu_users”)
results = cursor.fetchall()

# 转换数据
for row in results:
doc = {
‘user_id’: row[0],
‘name’: row[1],
‘age’: row[2],
’email’: row[3],
‘created_at’: row[4]
}
# 加载到MongoDB
mongo_db.fgedu_users_ods.insert_one(doc)

# 2. 数据转换(Transform)
# 使用Aggregation Pipeline清洗数据
pipeline = [
{
‘$project’: {
‘user_id’: 1,
‘name’: 1,
‘age’: 1,
’email’: 1,
‘created_at’: 1,
‘age_group’: {
‘$switch’: {
‘branches’: [
{ ‘case’: { ‘$lt’: [‘$age’, 18] }, ‘then’: ‘未成年’ },
{ ‘case’: { ‘$lt’: [‘$age’, 35] }, ‘then’: ‘青年’ },
{ ‘case’: { ‘$lt’: [‘$age’, 50] }, ‘then’: ‘中年’ },
{ ‘case’: { ‘$gte’: [‘$age’, 50] }, ‘then’: ‘老年’ }
],
‘default’: ‘未知’
}
}
}
},
{
‘$merge’: {
‘into’: ‘fgedu_users_dwd’,
‘on’: ‘user_id’,
‘whenMatched’: ‘merge’,
‘whenNotMatched’: ‘insert’
}
}
]
mongo_db.fgedu_users_ods.aggregate(pipeline)

# 3. 数据加载(Load)
# 使用$out将聚合结果输出到新集合
pipeline = [
{
‘$group’: {
‘_id’: ‘$age_group’,
‘count’: { ‘$sum’: 1 },
‘avg_age’: { ‘$avg’: ‘$age’ }
}
},
{
‘$out’: ‘fgedu_users_dws’
}
]
mongo_db.fgedu_users_dwd.aggregate(pipeline)

3.2 数据建模

数据建模方法:

# 1. 星型模型
# 事实表:订单事实表
db.fgedu_orders_fact.insertOne({
order_id: “ORD001”,
user_id: “USER001”,
product_id: “PROD001”,
order_date: ISODate(“2026-04-08T10:00:00Z”),
quantity: 2,
amount: 199.98,
status: “completed”
})

# 维度表:用户维度
db.fgedu_users_dim.insertOne({
user_id: “USER001”,
name: “张三”,
age: 30,
gender: “男”,
city: “北京”,
register_date: ISODate(“2025-01-01T00:00:00Z”)
})

# 维度表:产品维度
db.fgedu_products_dim.insertOne({
product_id: “PROD001”,
name: “iPhone 15”,
category: “手机”,
brand: “Apple”,
price: 99.99
})

# 维度表:时间维度
db.fgedu_time_dim.insertOne({
date: ISODate(“2026-04-08T00:00:00Z”),
year: 2026,
month: 4,
day: 8,
quarter: 2,
week: 14,
is_weekend: false
})

# 2. 雪花模型
# 产品维度表关联品牌维度表
db.fgedu_brands_dim.insertOne({
brand_id: “BRAND001”,
name: “Apple”,
country: “美国”,
category: “电子产品”
})

db.fgedu_products_dim.insertOne({
product_id: “PROD001”,
name: “iPhone 15”,
category: “手机”,
brand_id: “BRAND001”,
price: 99.99
})

3.3 聚合分析

聚合分析示例:

# 1. 按日期统计订单数量和金额
db.fgedu_orders_fact.aggregate([
{
$group: {
_id: {
year: { $year: “$order_date” },
month: { $month: “$order_date” },
day: { $dayOfMonth: “$order_date” }
},
order_count: { $sum: 1 },
total_amount: { $sum: “$amount” },
avg_amount: { $avg: “$amount” }
}
},
{
$sort: { “_id.year”: -1, “_id.month”: -1, “_id.day”: -1 }
}
])

# 2. 按用户统计消费金额
db.fgedu_orders_fact.aggregate([
{
$group: {
_id: “$user_id”,
order_count: { $sum: 1 },
total_amount: { $sum: “$amount” },
avg_amount: { $avg: “$amount” },
max_amount: { $max: “$amount” },
min_amount: { $min: “$amount” }
}
},
{
$sort: { total_amount: -1 }
},
{
$limit: 10
}
])

# 3. 按产品类别统计销售情况
db.fgedu_orders_fact.aggregate([
{
$lookup: {
from: “fgedu_products_dim”,
localField: “product_id”,
foreignField: “product_id”,
as: “product”
}
},
{
$unwind: “$product”
},
{
$group: {
_id: “$product.category”,
order_count: { $sum: 1 },
total_amount: { $sum: “$amount” },
unique_products: { $addToSet: “$product_id” }
}
},
{
$project: {
category: “$_id”,
order_count: 1,
total_amount: 1,
product_count: { $size: “$unique_products” }
}
}
])

3.4 BI集成

BI工具集成:

# 1. 配置MongoDB
BI Connector
# mongosqld.conf
systemLog:
verbosity: 1
logAppend: true
path: /mongodb/logs/mongosqld.log

mongodb:
net:
uri: mongodb://fgedu:fgedu123@192.168.1.101:27017/admin

net:
bindIp: 0.0.0.0
port: 3307

# 启动BI Connector
mongosqld –config /mongodb/app/mongosqld.conf

# 2. 使用Tableau连接
# 在Tableau中选择”其他数据库(ODBC)”
# 配置ODBC数据源
# 服务器: 192.168.1.101
# 端口: 3307
# 数据库: fgedu_warehouse
# 用户名: fgedu
# 密码: fgedu123

# 3. 使用Power
BI连接
# 在Power BI中选择”获取数据” -> “更多…”
# 选择”ODBC”
# 配置数据源名称(DSN)
# 输入SQL查询
SELECT * FROM fgedu_orders_fact

# 4. 使用Python进行数据分析
import pandas as pd
from pymongo import MongoClient

client = MongoClient(‘mongodb://fgedu:fgedu123@192.168.1.101:27017/admin’)
db = client[‘fgedu_warehouse’]

# 读取数据
cursor = db.fgedu_orders_fact.find()
df = pd.DataFrame(list(cursor))

# 数据分析
print(df.describe())
print(df.groupby(‘status’)[‘amount’].sum())

Part04-生产案例与实战讲解

4.1 ETL实战

ETL实战案例:

# 场景:从MySQL同步订单数据到MongoDB数据仓库

# 1. 创建ODS层集合
db.createCollection(“fgedu_orders_ods”)
db.createCollection(“fgedu_users_ods”)
db.createCollection(“fgedu_products_ods”)

# 2. 数据抽取和加载
# 使用Python脚本定时执行
import schedule
import time

def etl_job():
print(“开始ETL任务…”)
# 抽取MySQL数据
# 转换数据
# 加载到MongoDB
print(“ETL任务完成”)

# 每天凌晨2点执行
schedule.every().day.at(“02:00”).do(etl_job)

while True:
schedule.run_pending()
time.sleep(60)

# 3. 验证数据
db.fgedu_orders_ods.countDocuments()
# 输出:100000

db.fgedu_orders_ods.findOne()
# 输出:
{
“_id”: ObjectId(“60a7b8c9d0e1f2a3b4c5d6e7”),
“order_id”: “ORD001”,
“user_id”: “USER001”,
“product_id”: “PROD001”,
“order_date”: ISODate(“2026-04-08T10:00:00Z”),
“quantity”: 2,
“amount”: 199.98,
“status”: “completed”
}

from MongoDB视频:www.itpux.com

4.2 数据建模实战

数据建模实战案例:

# 场景:电商数据仓库建模

# 1. 创建事实表
db.fgedu_orders_fact.createIndex({ order_id: 1 }, { unique: true })
db.fgedu_orders_fact.createIndex({ user_id: 1 })
db.fgedu_orders_fact.createIndex({ product_id: 1 })
db.fgedu_orders_fact.createIndex({ order_date: 1 })

# 2. 创建维度表
db.fgedu_users_dim.createIndex({ user_id: 1 }, { unique: true })
db.fgedu_products_dim.createIndex({ product_id: 1 }, { unique: true })
db.fgedu_time_dim.createIndex({ date: 1 }, { unique: true })

# 3. 插入测试数据
# 时间维度数据
for (let year = 2025; year <= 2026; year++) { for (let month = 1; month <= 12; month++) { for (let day = 1; day <= 28; day++) { db.fgedu_time_dim.insertOne({ date: new Date(year, month - 1, day), year: year, month: month, day: day, quarter: Math.ceil(month / 3), week: Math.ceil(day / 7), is_weekend: [0, 6].includes(new Date(year, month - 1, day).getDay()) }) } } } # 4. 验证数据 db.fgedu_time_dim.countDocuments() # 输出:672

4.3 聚合分析实战

聚合分析实战案例:

# 场景:销售数据分析

# 1. 按月份统计销售额
db.fgedu_orders_fact.aggregate([
{
$group: {
_id: {
year: { $year: “$order_date” },
month: { $month: “$order_date” }
},
total_sales: { $sum: “$amount” },
order_count: { $sum: 1 }
}
},
{
$project: {
year: “$_id.year”,
month: “$_id.month”,
total_sales: 1,
order_count: 1,
avg_order_value: { $divide: [“$total_sales”, “$order_count”] }
}
},
{
$sort: { year: -1, month: -1 }
}
])

# 2. 用户购买行为分析
db.fgedu_orders_fact.aggregate([
{
$group: {
_id: “$user_id”,
first_order: { $min: “$order_date” },
last_order: { $max: “$order_date” },
order_count: { $sum: 1 },
total_spent: { $sum: “$amount” }
}
},
{
$project: {
user_id: “$_id”,
first_order: 1,
last_order: 1,
order_count: 1,
total_spent: 1,
customer_lifetime_days: {
$divide: [
{ $subtract: [“$last_order”, “$first_order”] },
1000 * 60 * 60 * 24
]
}
}
},
{
$sort: { total_spent: -1 }
},
{
$limit: 10
}
])

# 3. 产品关联分析
db.fgedu_orders_fact.aggregate([
{
$group: {
_id: “$user_id”,
products: { $addToSet: “$product_id” }
}
},
{
$unwind: “$products”
},
{
$group: {
_id: “$products”,
user_count: { $sum: 1 },
co_products: { $addToSet: “$products” }
}
},
{
$sort: { user_count: -1 }
},
{
$limit: 10
}
])

风哥提示:聚合分析是数据仓库的核心功能,需要根据业务需求设计合适的聚合查询。

Part05-风哥经验总结与分享

5.1 数据仓库最佳实践

风哥建议的数据仓库最佳实践:

  • 数据分层:按照ODS、DWD、DWS、ADS分层存储数据
  • 索引优化:为常用查询字段创建索引
  • 分区策略:按时间或范围对数据进行分区
  • ETL调度:使用调度工具定时执行ETL任务
  • 数据质量:建立数据质量检查机制
  • 监控告警:监控ETL任务执行情况和数据仓库性能
  • 备份恢复:定期备份数据仓库数据
  • 安全管理:控制数据访问权限,保护敏感数据

学习交流加群风哥QQ113257174

5.2 常见问题与解决方案

常见问题与解决方案:

  • 问题:ETL任务执行缓慢
  • 解决方案:优化查询语句,增加硬件资源,使用并行处理
  • 问题:数据仓库查询性能差
  • 解决方案:创建合适的索引,优化数据模型,使用物化视图
  • 问题:数据不一致
  • 解决方案:建立数据质量检查机制,使用事务保证数据一致性
  • 问题:存储空间不足
  • 解决方案:数据归档,压缩存储,扩展存储容量
  • 问题:BI工具连接失败
  • 解决方案:检查BI Connector配置,检查网络连接,检查用户权限

更多视频教程www.fgedu.net.cn

注意事项

  • 按照ODS、DWD、DWS、ADS分层存储数据
  • 为常用查询字段创建索引
  • 使用调度工具定时执行ETL任务
  • 建立数据质量检查机制
  • 监控ETL任务执行情况和数据仓库性能
  • 定期备份数据仓库数据
  • 控制数据访问权限,保护敏感数据

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息