1. DataWorks简介与版本说明
阿里云DataWorks是一站式大数据开发治理平台,提供数据集成、数据开发、数据治理、数据服务等全链路能力。更多学习教程www.fgedu.net.cn。DataWorks基于MaxCompute构建,支持多种计算引擎,是企业数据中台建设的核心平台。
DataWorks提供可视化的数据开发界面,支持SQL、Shell、Python等多种开发语言。学习交流加群风哥微信: itpux-com。它具有强大的调度能力、完善的权限管理、丰富的数据治理功能,帮助企业实现数据资产化管理。
DataWorks核心特性:
– 数据开发:可视化SQL开发、Shell脚本、Python开发
– 任务调度:强大的工作流调度引擎
– 数据治理:数据质量、数据血缘、数据标准
– 数据服务:快速构建数据API服务
– 安全管理:细粒度的权限控制
– 运维中心:任务监控、告警通知
– 数据资产:数据地图、数据目录
– 协同开发:多人协作、版本管理
– 开放生态:支持开源引擎和自定义插件
DataWorks架构组件:
DataStudio 数据开发工作室,可视化开发环境
DataIntegration 数据集成服务,数据同步
Scheduler 调度系统,任务编排
DataGovernance 数据治理,质量监控
DataService 数据服务,API发布
SecurityCenter 安全中心,权限管理
OperationCenter 运维中心,监控告警
DataAssets 数据资产,数据地图
2. DataWorks版本选择与获取方式
DataWorks是阿里云的云服务产品,无需下载安装包,通过阿里云控制台直接使用。同时提供客户端工具用于本地开发。
DataWorks版本状态:
基础版 免费版,适合个人学习和小团队
标准版 企业版,适合中型企业
专业版 高级版,适合大型企业
企业版 旗舰版,适合超大型企业
版本功能对比:
功能 基础版 标准版 专业版 企业版
数据开发 支持 支持 支持 支持
任务调度 基础 标准 高级 企业级
数据集成 限量 标准 高级 无限
数据治理 无 基础 标准 高级
数据服务 无 基础 标准 高级
安全审计 无 基础 标准 高级
官方访问地址:
控制台地址:https://workbench.data.aliyun.com/
文档中心:https://help.aliyun.com/product/72772.html
API文档:https://help.aliyun.com/document_detail/173534.html
开发者工具:https://developer.aliyun.com/tool
3. DataWorks客户端工具下载
方式一:DataWorks Studio客户端
访问:https://help.aliyun.com/document_detail/137930.html
Windows版本:
$ cd /fgeudb/software
$ wget https://dataworks-public.oss-cn-hangzhou.aliyuncs.com/tools/DataWorks-Studio-3.2.0-win-x64.zip
输出示例如下:
–2026-04-04 10:00:00– https://dataworks-public.oss-cn-hangzhou.aliyuncs.com/tools/DataWorks-Studio-3.2.0-win-x64.zip
Resolving dataworks-public.oss-cn-hangzhou.aliyuncs.com…
Connecting to dataworks-public.oss-cn-hangzhou.aliyuncs.com… connected.
HTTP request sent, awaiting response… 200 OK
Length: 123456789 (118M) [application/zip]
Saving to: ‘DataWorks-Studio-3.2.0-win-x64.zip’
macOS版本:
$ wget https://dataworks-public.oss-cn-hangzhou.aliyuncs.com/tools/DataWorks-Studio-3.2.0-mac.dmg
Linux版本:
$ wget https://dataworks-public.oss-cn-hangzhou.aliyuncs.com/tools/DataWorks-Studio-3.2.0-linux-x64.tar.gz
解压安装:
$ unzip DataWorks-Studio-3.2.0-win-x64.zip -d /fgeudb/dataworks/
方式二:DataX数据同步工具
$ cd /fgeudb/software
$ wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202303/datax.tar.gz
输出示例如下:
–2026-04-04 10:00:00– https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202303/datax.tar.gz
Length: 456789012 (435M) [application/octet-stream]
Saving to: ‘datax.tar.gz’
解压安装:
$ tar -zxvf datax.tar.gz -C /fgeudb/
验证安装:
$ cd /fgeudb/datax
$ python bin/datax.py job/job.json
输出示例如下:
DataX (DATAX-OPENSOURCE-3.0.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2026-04-04 10:05:00.123 [main] INFO Engine – the machine info =>
…
任务启动时刻 : 2026-04-04 10:05:00
任务结束时刻 : 2026-04-04 10:05:01
任务总计耗时 : 1s
方式三:PyODPS Python SDK
$ pip install pyodps
输出示例如下:
Collecting pyodps
Downloading pyodps-0.12.0-cp310-cp310-manylinux.whl (15.6 MB)
…
Successfully installed pyodps-0.12.0
安装DataWorks Python SDK:
$ pip install aliyun-python-sdk-dataworks-public
验证安装:
$ python -c “from odps import ODPS; print(‘PyODPS installed successfully’)”
输出示例如下:
PyODPS installed successfully
方式四:阿里云CLI工具
Linux:
$ wget https://aliyuncli.alicdn.com/aliyun-cli-linux-latest-amd64.tgz
$ tar -zxvf aliyun-cli-linux-latest-amd64.tgz -C /usr/local/bin/
macOS:
$ brew install aliyun-cli
Windows:
下载地址:https://aliyuncli.alicdn.com/aliyun-cli-windows-latest-amd64.zip
配置阿里云CLI:
$ aliyun configure
输出示例如下:
Configuring profile ‘default’ in ‘AK’ authenticate mode…
Access Key Id [*********************]: your_access_key_id
Access Key Secret [*********************]: your_access_key_secret
Default Region Id [cn-hangzhou]: cn-hangzhou
Default Output Format [json]: json
Default Language [zh]: zh
验证配置:
$ aliyun dataworks ListProjects –RegionId cn-hangzhou
输出示例如下:
{
“RequestId”: “abc123-def456”,
“ProjectList”: {
“Project”: [
{
“ProjectId”: 12345,
“ProjectName”: “fgedu_project”,
“ProjectStatus”: 1
}
]
}
}
4. DataWorks环境配置实战
步骤1:创建阿里云账号和AccessKey
1. 访问:https://www.aliyun.com/
2. 点击”免费注册”
3. 完成账号注册和实名认证
创建AccessKey:
1. 登录阿里云控制台
2. 点击右上角头像 -> “AccessKey管理”
3. 点击”创建AccessKey”
4. 选择使用方式:
– 使用主账号AccessKey(不推荐)
– 创建RAM子账号AccessKey(推荐)
5. 保存AccessKey ID和AccessKey Secret
创建RAM用户:
1. 访问RAM控制台:https://ram.console.aliyun.com/
2. 点击”用户” -> “创建用户”
3. 填写用户信息:
– 用户名:dataworks_user
– 访问方式:编程访问
4. 授权:
– AliyunDataWorksFullAccess
– AliyunMaxComputeFullAccess
5. 创建AccessKey
步骤2:开通DataWorks服务
1. 访问:https://www.aliyun.com/product/bigdata/ide
2. 点击”立即购买”
3. 选择规格:
– 版本:标准版/专业版/企业版
– 计算引擎:MaxCompute
– 区域:华东1(杭州)
4. 配置MaxCompute:
– 项目名称:fgedu_project
– 计费模式:按量付费/包年包月
5. 确认订单并支付
创建工作空间:
1. 登录DataWorks控制台
2. 点击”工作空间” -> “创建工作空间”
3. 填写基本信息:
– 工作空间名称:fgedu_workspace
– 模式:标准模式/简单模式
4. 配置计算引擎:
– MaxCompute项目
– 绑定MaxCompute项目
5. 点击”创建工作空间”
步骤3:配置本地开发环境
# yum install -y python3 python3-pip
创建虚拟环境:
$ python3 -m venv /fgeudb/dataworks/venv
$ source /fgeudb/dataworks/venv/bin/activate
安装依赖:
(venv) $ pip install pyodps aliyun-python-sdk-dataworks-public pandas
创建配置文件:
$ vi /fgeudb/dataworks/odps_config.py
from odps import ODPS
配置信息:
ACCESS_ID = ‘your_access_key_id’
ACCESS_KEY = ‘your_access_key_secret’
ODPS_ENDPOINT = ‘http://service.cn-hangzhou.maxcompute.aliyun.com/api’
PROJECT = ‘fgedu_project’
创建ODPS连接:
odps = ODPS(ACCESS_ID, ACCESS_KEY, PROJECT, ODPS_ENDPOINT)
测试连接:
$ python /fgeudb/dataworks/odps_config.py
输出示例如下:
Connected to MaxCompute project: fgedu_project
步骤4:配置DataX数据同步
$ vi /fgeudb/datax/job/mysql_to_odps.json
{
“job”: {
“setting”: {
“speed”: {
“channel”: 3
}
},
“content”: [
{
“reader”: {
“name”: “mysqlreader”,
“parameter”: {
“username”: “root”,
“password”: “password123”,
“column”: [“id”, “name”, “sales_amount”, “sales_date”],
“connection”: [
{
“table”: [“sales_data”],
“jdbcUrl”: [“jdbc:mysql://192.168.1.51:3306/fgedu_db”]
}
]
}
},
“writer”: {
“name”: “odpswriter”,
“parameter”: {
“accessId”: “your_access_key_id”,
“accessKey”: “your_access_key_secret”,
“project”: “fgedu_project”,
“table”: “sales_data”,
“odpsServer”: “http://service.cn-hangzhou.maxcompute.aliyun.com/api”,
“column”: [“id”, “name”, “sales_amount”, “sales_date”],
“partition”: “dt=’20260404′”
}
}
}
]
}
}
执行同步作业:
$ cd /fgeudb/datax
$ python bin/datax.py job/mysql_to_odps.json
输出示例如下:
2026-04-04 10:10:00.123 [job-0] INFO JobContainer – 任务启动时刻 : 2026-04-04 10:10:00
2026-04-04 10:10:05.456 [job-0] INFO JobContainer – 任务结束时刻 : 2026-04-04 10:10:05
2026-04-04 10:10:05.456 [job-0] INFO JobContainer – 任务总计耗时 : 5s
2026-04-04 10:10:05.456 [job-0] INFO JobContainer – 平均流量 : 1.23MB/s
2026-04-04 10:10:05.456 [job-0] INFO JobContainer – 记录写入速度 : 10000rec/s
5. DataWorks配置详解
PyODPS配置
from odps import ODPS
from odps import options
odps = ODPS(
access_id=’your_access_key_id’,
secret_access_key=’your_access_key_secret’,
project=’fgedu_project’,
endpoint=’http://service.cn-hangzhou.maxcompute.aliyun.com/api’
)
全局配置:
options.sql.settings = {
‘odps.sql.reducer.instances’: 100,
‘odps.sql.joiner.instances’: 100,
‘odps.sql.mapper.split.size’: 256
}
资源配置:
options.lifecycle = 365
options.table.lifecycle = 365
日志配置:
import logging
logging.basicConfig(level=logging.INFO)
DataWorks工作流配置
1. 登录DataWorks控制台
2. 进入DataStudio
3. 点击”新建” -> “业务流程”
4. 输入业务流程名称:sales_analysis
5. 拖拽节点到画布:
– 虚拟节点(开始节点)
– SQL节点
– Shell节点
– 数据集成节点
节点配置示例:
SQL节点配置:
— 节点名称:ods_sales_data
— 调度配置:
— 调度周期:天
— 生效时间:2026-01-01 ~ 2099-12-31
— 重跑属性:支持重跑
SQL内容:
INSERT OVERWRITE TABLE ods_sales_data
SELECT
id,
name,
sales_amount,
sales_date,
DATE_FORMAT(sales_date, ‘yyyyMMdd’) as dt
FROM source_sales_data
WHERE dt = ‘${bizdate}’;
依赖配置:
虚拟节点 -> ods_sales_data -> dwd_sales_detail -> ads_sales_summary
6. DataWorks数据开发实战
步骤1:创建表结构
CREATE TABLE IF NOT EXISTS ods_sales_data (
id BIGINT COMMENT ‘ID’,
name STRING COMMENT ‘名称’,
sales_amount DECIMAL(18,2) COMMENT ‘销售金额’,
sales_date DATE COMMENT ‘销售日期’
)
PARTITIONED BY (dt STRING COMMENT ‘日期分区’)
LIFECYCLE 365
COMMENT ‘销售数据ODS层’;
创建DWD层表:
CREATE TABLE IF NOT EXISTS dwd_sales_detail (
id BIGINT COMMENT ‘ID’,
name STRING COMMENT ‘名称’,
sales_amount DECIMAL(18,2) COMMENT ‘销售金额’,
sales_date DATE COMMENT ‘销售日期’,
region STRING COMMENT ‘地区’,
product_category STRING COMMENT ‘产品类别’
)
PARTITIONED BY (dt STRING COMMENT ‘日期分区’)
LIFECYCLE 365
COMMENT ‘销售明细DWD层’;
创建ADS层表:
CREATE TABLE IF NOT EXISTS ads_sales_summary (
stat_date DATE COMMENT ‘统计日期’,
region STRING COMMENT ‘地区’,
total_sales DECIMAL(18,2) COMMENT ‘销售总额’,
order_count BIGINT COMMENT ‘订单数量’,
avg_sales DECIMAL(18,2) COMMENT ‘平均销售额’
)
LIFECYCLE 365
COMMENT ‘销售汇总ADS层’;
步骤2:数据开发SQL
INSERT OVERWRITE TABLE ods_sales_data PARTITION(dt=’${bizdate}’)
SELECT
id,
name,
sales_amount,
sales_date
FROM source_sales_data
WHERE dt = ‘${bizdate}’;
DWD层数据处理:
INSERT OVERWRITE TABLE dwd_sales_detail PARTITION(dt=’${bizdate}’)
SELECT
t1.id,
t1.name,
t1.sales_amount,
t1.sales_date,
t2.region,
t3.product_category
FROM ods_sales_data t1
LEFT JOIN dim_region t2 ON t1.region_id = t2.id
LEFT JOIN dim_product t3 ON t1.product_id = t3.id
WHERE t1.dt = ‘${bizdate}’;
ADS层数据汇总:
INSERT OVERWRITE TABLE ads_sales_summary
SELECT
sales_date as stat_date,
region,
SUM(sales_amount) as total_sales,
COUNT(*) as order_count,
AVG(sales_amount) as avg_sales
FROM dwd_sales_detail
WHERE dt = ‘${bizdate}’
GROUP BY sales_date, region;
步骤3:使用PyODPS开发
from odps import ODPS
import pandas as pd
连接MaxCompute:
odps = ODPS(‘access_id’, ‘access_key’, ‘fgedu_project’, ‘endpoint’)
执行SQL:
result = odps.execute_sql(”’
SELECT
region,
SUM(sales_amount) as total_sales
FROM dwd_sales_detail
WHERE dt = ‘20260404’
GROUP BY region
”’)
读取结果:
with result.open_reader() as reader:
for record in reader:
print(record)
转换为DataFrame:
df = result.to_pandas()
print(df.head())
写入数据:
df.to_sql(
‘ads_sales_summary’,
odps,
if_exists=’append’,
index=False
)
7. 环境验证与测试
验证DataWorks连接
$ python -c ”
from odps import ODPS
odps = ODPS(‘access_id’, ‘access_key’, ‘fgedu_project’, ‘endpoint’)
print(‘Project:’, odps.project)
print(‘Tables:’, list(odps.list_tables())[:5])
”
输出示例如下:
Project: fgedu_project
Tables: [‘ods_sales_data’, ‘dwd_sales_detail’, ‘ads_sales_summary’]
验证DataX:
$ python /fgeudb/datax/bin/datax.py –help
输出示例如下:
DataX (DATAX-OPENSOURCE-3.0.0), From Alibaba !
Usage: datax.py [options] job-configuration-location
验证阿里云CLI:
$ aliyun dataworks GetProject –ProjectId 12345
输出示例如下:
{
“RequestId”: “abc123”,
“Project”: {
“ProjectId”: 12345,
“ProjectName”: “fgedu_project”,
“ProjectStatus”: 1
}
}
功能测试
from odps import ODPS
odps = ODPS(‘access_id’, ‘access_key’, ‘fgedu_project’, ‘endpoint’)
执行测试SQL:
result = odps.execute_sql(‘SELECT COUNT(*) FROM ods_sales_data’)
print(result)
测试数据同步:
$ python /fgeudb/datax/bin/datax.py /fgeudb/datax/job/test_job.json
测试工作流:
1. 登录DataWorks控制台
2. 进入DataStudio
3. 运行测试工作流
4. 验证执行结果
8. 常见问题与解决方案
问题1:AccessKey权限不足
解决方案:
1. 检查AccessKey是否正确
2. 检查RAM用户权限:
– AliyunDataWorksFullAccess
– AliyunMaxComputeFullAccess
3. 检查项目权限:
在MaxCompute控制台添加用户权限
授权命令:
GRANT Project Read TO USER aliyun$your_ram_user;
GRANT Table Read, Write ON TABLE * TO USER aliyun$your_ram_user;
问题2:数据同步失败
解决方案:
1. 检查网络连通性:
确保能访问源数据库和MaxCompute
2. 检查配置文件:
验证JSON配置正确
3. 检查数据类型:
确保源和目标类型兼容
4. 查看详细日志:
$ cat /fgeudb/datax/log/job.log
5. 增加并发:
“speed”: {“channel”: 5}
问题3:SQL执行超时
解决方案:
1. 优化SQL:
– 添加分区过滤
– 减少数据扫描量
– 使用合适的JOIN策略
2. 增加资源:
options.sql.settings = {
‘odps.sql.reducer.instances’: 200,
‘odps.sql.mapper.split.size’: 128
}
3. 使用索引:
创建聚簇表或索引表
4. 分批处理:
将大任务拆分为小任务
DataWorks管理命令
from odps import ODPS
创建表:
odps.create_table(‘table_name’, ‘col1 string, col2 bigint’, partition=’pt string’)
删除表:
odps.delete_table(‘table_name’)
列出表:
for table in odps.list_tables():
print(table.name)
执行SQL:
result = odps.execute_sql(‘SELECT * FROM table_name’)
读取数据:
table = odps.get_table(‘table_name’)
with table.open_reader() as reader:
for record in reader:
print(record)
写入数据:
table = odps.get_table(‘table_name’)
with table.open_writer() as writer:
writer.write([[‘value1’, 1], [‘value2’, 2]])
阿里云CLI命令:
列出项目:
$ aliyun dataworks ListProjects –RegionId cn-hangzhou
获取项目详情:
$ aliyun dataworks GetProject –ProjectId 12345
列出工作流:
$ aliyun dataworks ListFiles –ProjectId 12345 –FileFolderPath /
触发调度:
$ aliyun dataworks TriggerNode –NodeId xxx –BizDate 20260404
1. 使用RAM子账号管理权限;2. 开启DataWorks专业版或企业版;3. 合理规划分层架构(ODS/DWD/DWS/ADS);4. 配置数据质量监控;5. 设置任务告警通知;6. 定期备份元数据;7. 使用DataWorks数据治理功能;8. 合理设置调度周期和资源;9. 监控成本和资源使用;10. 定期优化SQL性能。
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
