本文档风哥主要介绍DM数据库大数据集成,包括大数据概述、数据集成概述、集成工具、Hadoop集成、Spark集成、Flume集成、数据采集、数据处理、数据存储、实际案例和最佳实践等内容,风哥教程参考DM官方文档DM8大数据集成指南、DM8数据湖指南,适合大数据集成开发人员在学习和生产环境中使用。
Part01-基础概念与理论知识
1.1 大数据概述
大数据是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合。
# 大数据的定义
大数据是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合。
# 大数据的特征(5V)
1. Volume(大量)
– 数据量巨大
– TB、PB级别
– 持续增长
2. Velocity(高速)
– 数据产生速度快
– 实时数据处理
– 流式数据
3. Variety(多样)
– 数据类型多样
– 结构化数据
– 非结构化数据
4. Value(价值)
– 数据价值密度低
– 需要挖掘价值
– 支持决策
5. Veracity(真实性)
– 数据质量参差不齐
– 需要数据清洗
– 保证数据真实性
# 大数据的价值
– 支持决策:为决策提供数据支持
– 提高效率:提高业务运营效率
– 降低成本:降低业务运营成本
– 增强竞争力:增强企业竞争力
大数据是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合。
# 大数据的特征(5V)
1. Volume(大量)
– 数据量巨大
– TB、PB级别
– 持续增长
2. Velocity(高速)
– 数据产生速度快
– 实时数据处理
– 流式数据
3. Variety(多样)
– 数据类型多样
– 结构化数据
– 非结构化数据
4. Value(价值)
– 数据价值密度低
– 需要挖掘价值
– 支持决策
5. Veracity(真实性)
– 数据质量参差不齐
– 需要数据清洗
– 保证数据真实性
# 大数据的价值
– 支持决策:为决策提供数据支持
– 提高效率:提高业务运营效率
– 降低成本:降低业务运营成本
– 增强竞争力:增强企业竞争力
1.2 数据集成概述
数据集成是将不同来源、不同格式的数据整合到一起,形成一个统一的数据视图。
# 数据集成的定义
数据集成是将不同来源、不同格式的数据整合到一起,形成一个统一的数据视图。
# 数据集成的类型
1. 批量集成
– 定期批量处理
– 适合历史数据
– 处理效率高
2. 实时集成
– 实时数据处理
– 适合实时业务
– 响应速度快
3. 混合集成
– 批量+实时
– 灵活性高
– 适用范围广
# 数据集成的挑战
1. 数据异构性
– 数据格式不同
– 数据结构不同 风哥提示:
– 数据语义不同
2. 数据质量
– 数据不完整
– 数据不准确
– 数据不一致
3. 数据安全
– 数据隐私
– 数据权限
– 数据加密
# 数据集成的价值
– 数据整合:整合多个数据源的数据
– 数据共享:实现数据共享
– 数据分析:支持数据分析
– 决策支持:支持决策制定
数据集成是将不同来源、不同格式的数据整合到一起,形成一个统一的数据视图。
# 数据集成的类型
1. 批量集成
– 定期批量处理
– 适合历史数据
– 处理效率高
2. 实时集成
– 实时数据处理
– 适合实时业务
– 响应速度快
3. 混合集成
– 批量+实时
– 灵活性高
– 适用范围广
# 数据集成的挑战
1. 数据异构性
– 数据格式不同
– 数据结构不同 风哥提示:
– 数据语义不同
2. 数据质量
– 数据不完整
– 数据不准确
– 数据不一致
3. 数据安全
– 数据隐私
– 数据权限
– 数据加密
# 数据集成的价值
– 数据整合:整合多个数据源的数据
– 数据共享:实现数据共享
– 数据分析:支持数据分析
– 决策支持:支持决策制定
1.3 集成工具
大数据集成工具是进行大数据集成的重要工具,包括Hadoop、Spark、Flume等。
# 大数据集成工具分类
1. 存储工具
– HDFS:分布式文件系统
– HBase:分布式数据库
– Hive:数据仓库
2. 计算工具
– MapReduce:分布式计算框架
– Spark:内存计算框架
– Flink:流式计算框架
3. 采集工具
– Flume:日志采集
– Kafka:消息队列 学习交流加群风哥微信: itpux-com
– Sqoop:数据导入导出
4. 调度工具
– Oozie:工作流调度
– Airflow:工作流调度
– Azkaban:工作流调度
# DM数据库在大数据集成中的应用
– 数据存储:存储结构化数据
– 数据查询:快速查询数据
– 数据分析:支持复杂分析
– 数据挖掘:支持数据挖掘
1. 存储工具
– HDFS:分布式文件系统
– HBase:分布式数据库
– Hive:数据仓库
2. 计算工具
– MapReduce:分布式计算框架
– Spark:内存计算框架
– Flink:流式计算框架
3. 采集工具
– Flume:日志采集
– Kafka:消息队列 学习交流加群风哥微信: itpux-com
– Sqoop:数据导入导出
4. 调度工具
– Oozie:工作流调度
– Airflow:工作流调度
– Azkaban:工作流调度
# DM数据库在大数据集成中的应用
– 数据存储:存储结构化数据
– 数据查询:快速查询数据
– 数据分析:支持复杂分析
– 数据挖掘:支持数据挖掘
风哥提示:大数据集成是企业数字化转型的重要工具,掌握大数据集成的方法和工具,是构建大数据平台的关键。根据业务需求和数据特点,选择合适的大数据集成方案,是保证项目成功的重要手段。
Part02-生产环境规划与建议
2.1 Hadoop集成
2.1.1 HDFS集成
# 1. 安装HDFS
– 下载Hadoop
$ wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz
– 解压Hadoop
$ tar -zxvf hadoop-3.3.4.tar.gz
$ mv hadoop-3.3.4 /opt/hadoop
– 配置环境变量
$ vi /etc/profile
export HADOOP_HOME=/opt/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
$ source /etc/profile
# 2. 配置HDFS
– 配置core-site.xml
$ vi $HADOOP_HOME/etc/hadoop/core-site.xml
fs.defaultFS
hdfs://fgedu.localhost:9000
hadoop.tmp.dir
/opt/hadoop/tmp 学习交流加群风哥QQ113257174
– 配置hdfs-site.xml
$ vi $HADOOP_HOME/etc/hadoop/hdfs-site.xml
dfs.replication
1
dfs.namenode.name.dir
/opt/hadoop/hdfs/namenode
dfs.datanode.data.dir
/opt/hadoop/hdfs/datanode
# 3. 启动HDFS
– 格式化NameNode
$ hdfs namenode -format
– 启动HDFS
$ start-dfs.sh
– 验证HDFS
$ jps
# NameNode
# DataNode
# SecondaryNameNode
# 4. 使用HDFS
– 创建目录
$ hdfs dfs -mkdir /fgedu
– 上传文件
$ hdfs dfs -put /tmp/data.txt /fgedu/
– 查看文件
$ hdfs dfs -ls /fgedu/
# 输出结果
# Found 1 items
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/data.txt
– 下载Hadoop
$ wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz
– 解压Hadoop
$ tar -zxvf hadoop-3.3.4.tar.gz
$ mv hadoop-3.3.4 /opt/hadoop
– 配置环境变量
$ vi /etc/profile
export HADOOP_HOME=/opt/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
$ source /etc/profile
# 2. 配置HDFS
– 配置core-site.xml
$ vi $HADOOP_HOME/etc/hadoop/core-site.xml
– 配置hdfs-site.xml
$ vi $HADOOP_HOME/etc/hadoop/hdfs-site.xml
# 3. 启动HDFS
– 格式化NameNode
$ hdfs namenode -format
– 启动HDFS
$ start-dfs.sh
– 验证HDFS
$ jps
# NameNode
# DataNode
# SecondaryNameNode
# 4. 使用HDFS
– 创建目录
$ hdfs dfs -mkdir /fgedu
– 上传文件
$ hdfs dfs -put /tmp/data.txt /fgedu/
– 查看文件
$ hdfs dfs -ls /fgedu/
# 输出结果
# Found 1 items
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/data.txt
2.1.2 Hive集成
更多视频教程www.fgedu.net.cn
# 1. 安装Hive
– 下载Hive
$ wget https://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
– 解压Hive
$ tar -zxvf apache-hive-3.1.3-bin.tar.gz
$ mv apache-hive-3.1.3-bin /opt/hive
– 配置环境变量
$ vi /etc/profile
export HIVE_HOME=/opt/hive
export PATH=$PATH:$HIVE_HOME/bin
$ source /etc/profile
# 2. 配置Hive
– 配置hive-site.xml
$ vi $HIVE_HOME/conf/hive-site.xml
javax.jdo.option.ConnectionURL
jdbc:mysql://fgedu.localhost:3306/metastore
javax.jdo.option.ConnectionDriverName
com.mysql.jdbc.Driver
javax.jdo.option.ConnectionUserName
hive
javax.jdo.option.ConnectionPassword
hive
# 3. 初始化元数据
– 初始化元数据
$ schematool -dbType mysql -initSchema
# 4. 启动Hive
– 启动Hive
$ hive
– 创建表 更多学习教程公众号风哥教程itpux_com
hive> CREATE TABLE fgedu_user (
user_id INT,
user_name STRING,
user_email STRING,
user_status INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE;
– 加载数据
hive> LOAD DATA LOCAL INPATH ‘/tmp/user.csv’ INTO TABLE fgedu_user;
– 查询数据
hive> SELECT * FROM fgedu_user LIMIT 10;
# 输出结果
# OK
# 1 fgedu_user1 fgedu_user1@fgedu.net.cn 1
# 2 fgedu_user2 fgedu_user2@fgedu.net.cn 1
# 3 user3 user3@fgedu.net.cn 1
– 下载Hive
$ wget https://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
– 解压Hive
$ tar -zxvf apache-hive-3.1.3-bin.tar.gz
$ mv apache-hive-3.1.3-bin /opt/hive
– 配置环境变量
$ vi /etc/profile
export HIVE_HOME=/opt/hive
export PATH=$PATH:$HIVE_HOME/bin
$ source /etc/profile
# 2. 配置Hive
– 配置hive-site.xml
$ vi $HIVE_HOME/conf/hive-site.xml
# 3. 初始化元数据
– 初始化元数据
$ schematool -dbType mysql -initSchema
# 4. 启动Hive
– 启动Hive
$ hive
– 创建表 更多学习教程公众号风哥教程itpux_com
hive> CREATE TABLE fgedu_user (
user_id INT,
user_name STRING,
user_email STRING,
user_status INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE;
– 加载数据
hive> LOAD DATA LOCAL INPATH ‘/tmp/user.csv’ INTO TABLE fgedu_user;
– 查询数据
hive> SELECT * FROM fgedu_user LIMIT 10;
# 输出结果
# OK
# 1 fgedu_user1 fgedu_user1@fgedu.net.cn 1
# 2 fgedu_user2 fgedu_user2@fgedu.net.cn 1
# 3 user3 user3@fgedu.net.cn 1
2.2 Spark集成
2.2.1 Spark安装配置
# 1. 安装Spark
– 下载Spark
$ wget https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
– 解压Spark
$ tar -zxvf spark-3.3.2-bin-hadoop3.tgz
$ mv spark-3.3.2-bin-hadoop3 /opt/spark
– 配置环境变量
$ vi /etc/profile from DB视频:www.itpux.com
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
$ source /etc/profile
# 2. 配置Spark
– 配置spark-env.sh
$ vi $SPARK_HOME/conf/spark-env.sh
export JAVA_HOME=/opt/jdk
export HADOOP_HOME=/opt/hadoop
export SPARK_MASTER_HOST=fgedu.localhost
export SPARK_MASTER_PORT=7077
– 配置slaves
$ vi $SPARK_HOME/conf/slaves
fgedu.localhost
# 3. 启动Spark
– 启动Spark Master
$ start-master.sh
– 启动Spark Worker
$ start-slaves.sh
– 验证Spark
$ jps
# Master
# Worker
# 4. 使用Spark
– 启动Spark Shell
$ spark-shell
– 读取数据
scala> val data = spark.read.csv(“/fgedu/data.txt”)
– 查看数据
scala> data.show()
# 输出结果
# +—+—-+—-+
# | _c0| _c1| _c2|
# +—+—-+—-+
# | 1| 2| 3|
# | 4| 5| 6|
# +—+—-+—-+
– 下载Spark
$ wget https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
– 解压Spark
$ tar -zxvf spark-3.3.2-bin-hadoop3.tgz
$ mv spark-3.3.2-bin-hadoop3 /opt/spark
– 配置环境变量
$ vi /etc/profile from DB视频:www.itpux.com
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
$ source /etc/profile
# 2. 配置Spark
– 配置spark-env.sh
$ vi $SPARK_HOME/conf/spark-env.sh
export JAVA_HOME=/opt/jdk
export HADOOP_HOME=/opt/hadoop
export SPARK_MASTER_HOST=fgedu.localhost
export SPARK_MASTER_PORT=7077
– 配置slaves
$ vi $SPARK_HOME/conf/slaves
fgedu.localhost
# 3. 启动Spark
– 启动Spark Master
$ start-master.sh
– 启动Spark Worker
$ start-slaves.sh
– 验证Spark
$ jps
# Master
# Worker
# 4. 使用Spark
– 启动Spark Shell
$ spark-shell
– 读取数据
scala> val data = spark.read.csv(“/fgedu/data.txt”)
– 查看数据
scala> data.show()
# 输出结果
# +—+—-+—-+
# | _c0| _c1| _c2|
# +—+—-+—-+
# | 1| 2| 3|
# | 4| 5| 6|
# +—+—-+—-+
2.2.2 Spark SQL集成
# 1. Spark SQL基础
– 创建DataFrame
scala> val df = spark.read.json(“/fgedu/user.json”)
– 查看数据
scala> df.show()
– 查看Schema
scala> df.printSchema()
# 输出结果
# root
# |– user_id: integer (nullable = true)
# |– user_name: string (nullable = true)
# |– user_email: string (nullable = true)
# 2. Spark SQL查询
– 注册临时视图
scala> df.createOrReplaceTempView(“fgedu_user”)
– 执行SQL查询
scala> val result = spark.sql(“SELECT * FROM fgedu_user WHERE user_status = 1”)
– 查看结果
scala> result.show()
# 输出结果
# +——-+———+—————–+
# |user_id|user_name| user_email|
# +——-+———+—————–+
# | 1| fgedu_user1|fgedu_user1@fgedu.net.cn|
# | 2| fgedu_user2|fgedu_user2@fgedu.net.cn|
# +——-+———+—————–+
# 3. 实际示例
– 读取CSV数据
scala> val df = spark.read.option(“header”, “true”).csv(“/fgedu/user.csv”)
– 注册临时视图
scala> df.createOrReplaceTempView(“fgedu_user”)
– 执行复杂查询
scala> val result = spark.sql(“””
SELECT user_status, COUNT(*) AS user_count
FROM fgedu_user
GROUP BY user_status
ORDER BY user_count DESC
“””)
– 查看结果
scala> result.show()
# 输出结果
# +———–+———–+
# |user_status|user_count|
# +———–+———–+
# | 1| 800|
# | 0| 200|
# +———–+———–+
– 创建DataFrame
scala> val df = spark.read.json(“/fgedu/user.json”)
– 查看数据
scala> df.show()
– 查看Schema
scala> df.printSchema()
# 输出结果
# root
# |– user_id: integer (nullable = true)
# |– user_name: string (nullable = true)
# |– user_email: string (nullable = true)
# 2. Spark SQL查询
– 注册临时视图
scala> df.createOrReplaceTempView(“fgedu_user”)
– 执行SQL查询
scala> val result = spark.sql(“SELECT * FROM fgedu_user WHERE user_status = 1”)
– 查看结果
scala> result.show()
# 输出结果
# +——-+———+—————–+
# |user_id|user_name| user_email|
# +——-+———+—————–+
# | 1| fgedu_user1|fgedu_user1@fgedu.net.cn|
# | 2| fgedu_user2|fgedu_user2@fgedu.net.cn|
# +——-+———+—————–+
# 3. 实际示例
– 读取CSV数据
scala> val df = spark.read.option(“header”, “true”).csv(“/fgedu/user.csv”)
– 注册临时视图
scala> df.createOrReplaceTempView(“fgedu_user”)
– 执行复杂查询
scala> val result = spark.sql(“””
SELECT user_status, COUNT(*) AS user_count
FROM fgedu_user
GROUP BY user_status
ORDER BY user_count DESC
“””)
– 查看结果
scala> result.show()
# 输出结果
# +———–+———–+
# |user_status|user_count|
# +———–+———–+
# | 1| 800|
# | 0| 200|
# +———–+———–+
2.3 Flume集成
2.3.1 Flume安装配置
# 1. 安装Flume
– 下载Flume
$ wget https://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
– 解压Flume
$ tar -zxvf apache-flume-1.9.0-bin.tar.gz
$ mv apache-flume-1.9.0-bin /opt/flume
– 配置环境变量
$ vi /etc/profile
export FLUME_HOME=/opt/flume
export PATH=$PATH:$FLUME_HOME/bin
$ source /etc/profile
# 2. 配置Flume
– 创建配置文件
$ vi $FLUME_HOME/conf/fgedu.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/fgedu.log
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://fgedu.localhost:9000/fgedu/logs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 3. 启动Flume
– 启动Flume Agent
$ flume-ng agent –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/fgedu.conf –name a1
– 验证Flume
$ jps
# Application
# Application
# 4. 测试Flume
– 写入日志
$ echo “test log” >> /var/log/fgedu.log
– 查看HDFS
$ hdfs dfs -ls /fgedu/logs
# 输出结果
# Found 1 items
# -rw-r–r– 1 root supergroup 9 2024-01-10 10:00 /fgedu/logs/FlumeData.1234567890
– 下载Flume
$ wget https://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
– 解压Flume
$ tar -zxvf apache-flume-1.9.0-bin.tar.gz
$ mv apache-flume-1.9.0-bin /opt/flume
– 配置环境变量
$ vi /etc/profile
export FLUME_HOME=/opt/flume
export PATH=$PATH:$FLUME_HOME/bin
$ source /etc/profile
# 2. 配置Flume
– 创建配置文件
$ vi $FLUME_HOME/conf/fgedu.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/fgedu.log
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://fgedu.localhost:9000/fgedu/logs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 3. 启动Flume
– 启动Flume Agent
$ flume-ng agent –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/fgedu.conf –name a1
– 验证Flume
$ jps
# Application
# Application
# 4. 测试Flume
– 写入日志
$ echo “test log” >> /var/log/fgedu.log
– 查看HDFS
$ hdfs dfs -ls /fgedu/logs
# 输出结果
# Found 1 items
# -rw-r–r– 1 root supergroup 9 2024-01-10 10:00 /fgedu/logs/FlumeData.1234567890
2.3.2 Flume与DM集成
# 1. Flume到HDFS
– 配置Flume Agent
$ vi $FLUME_HOME/conf/fgedu_hdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/fgedu.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://fgedu.localhost:9000/fgedu/logs/%Y%m%d
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 2. Flume到Hive
– 配置Flume Agent
$ vi $FLUME_HOME/conf/fgedu_hive.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/fgedu.log
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://fgedu.localhost:9083
a1.sinks.k1.hive.database = fgedu
a1.sinks.k1.hive.table = logs
a1.sinks.k1.hive.partition = %Y%m%d
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 3. 实际示例
– 启动Flume Agent
$ flume-ng agent –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/fgedu_hdfs.conf –name a1
– 写入日志
$ echo “2024-01-10 10:00:00 INFO test log” >> /var/log/fgedu.log
– 查看HDFS
$ hdfs dfs -cat /fgedu/logs/20240110/FlumeData.*
# 输出结果
# 2024-01-10 10:00:00 INFO test log
– 配置Flume Agent
$ vi $FLUME_HOME/conf/fgedu_hdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/fgedu.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://fgedu.localhost:9000/fgedu/logs/%Y%m%d
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 2. Flume到Hive
– 配置Flume Agent
$ vi $FLUME_HOME/conf/fgedu_hive.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/fgedu.log
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://fgedu.localhost:9083
a1.sinks.k1.hive.database = fgedu
a1.sinks.k1.hive.table = logs
a1.sinks.k1.hive.partition = %Y%m%d
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 3. 实际示例
– 启动Flume Agent
$ flume-ng agent –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/fgedu_hdfs.conf –name a1
– 写入日志
$ echo “2024-01-10 10:00:00 INFO test log” >> /var/log/fgedu.log
– 查看HDFS
$ hdfs dfs -cat /fgedu/logs/20240110/FlumeData.*
# 输出结果
# 2024-01-10 10:00:00 INFO test log
生产环境建议:根据业务需求和数据特点,选择合适的大数据集成方案。在Hadoop集成中,要注意HDFS的副本策略和Hive的分区策略。在Spark集成中,要注意内存管理和任务调度。在Flume集成中,要注意数据采集的可靠性和性能。
Part03-生产环境项目实施方案
3.1 数据采集
3.1.1 日志采集
# 1. Flume日志采集
– 配置Flume Agent
$ vi $FLUME_HOME/conf/fgedu_log.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/fgedu/application.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://fgedu.localhost:9000/fgedu/logs/%Y%m%d
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 2. 启动Flume
– 启动Flume Agent
$ flume-ng agent –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/fgedu_log.conf –name a1 -Dflume.root.logger=INFO,console
# 3. 验证日志采集
– 写入日志
$ echo “2024-01-10 10:00:00 INFO [main] Application started” >> /var/log/fgedu/application.log
– 查看HDFS
$ hdfs dfs -ls /fgedu/logs/20240110
# 输出结果
# Found 1 items
# -rw-r–r– 1 root supergroup 45 2024-01-10 10:00 /fgedu/logs/20240110/FlumeData.1234567890
– 查看日志内容
$ hdfs dfs -cat /fgedu/logs/20240110/FlumeData.1234567890
# 输出结果
# 2024-01-10 10:00:00 INFO [main] Application started
– 配置Flume Agent
$ vi $FLUME_HOME/conf/fgedu_log.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/fgedu/application.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://fgedu.localhost:9000/fgedu/logs/%Y%m%d
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 2. 启动Flume
– 启动Flume Agent
$ flume-ng agent –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/fgedu_log.conf –name a1 -Dflume.root.logger=INFO,console
# 3. 验证日志采集
– 写入日志
$ echo “2024-01-10 10:00:00 INFO [main] Application started” >> /var/log/fgedu/application.log
– 查看HDFS
$ hdfs dfs -ls /fgedu/logs/20240110
# 输出结果
# Found 1 items
# -rw-r–r– 1 root supergroup 45 2024-01-10 10:00 /fgedu/logs/20240110/FlumeData.1234567890
– 查看日志内容
$ hdfs dfs -cat /fgedu/logs/20240110/FlumeData.1234567890
# 输出结果
# 2024-01-10 10:00:00 INFO [main] Application started
3.1.2 数据库采集
# 1. Sqoop数据采集
– 从DM数据库导入到HDFS
$ sqoop import \
–connect jdbc:dm://fgedu.localhost:5236/fgedu \
–username fgedu_user \
–password fgedu_password \
–table fgedu_user \
–target-dir /fgedu/user \
–m 1
– 查看导入的数据
$ hdfs dfs -ls /fgedu/user
# 输出结果
# Found 2 items
# -rw-r–r– 1 root supergroup 0 2024-01-10 10:00 /fgedu/user/_SUCCESS
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/user/part-m-00000
– 查看数据内容
$ hdfs dfs -cat /fgedu/user/part-m-00000
# 输出结果
# 1,fgedu_user1,fgedu_user1@fgedu.net.cn,1
# 2,fgedu_user2,fgedu_user2@fgedu.net.cn,1
# 3,user3,user3@fgedu.net.cn,1
# 2. 增量数据采集
– 增量导入
$ sqoop import \
–connect jdbc:dm://fgedu.localhost:5236/fgedu \
–username fgedu_user \
–password fgedu_password \
–table fgedu_user \
–target-dir /fgedu/user_incremental \
–check-column update_time \
–incremental append \
–last-value “2024-01-01 00:00:00” \
–m 1
# 3. 实际示例
– 全量导入
$ sqoop import \
–connect jdbc:dm://fgedu.localhost:5236/fgedu \
–username fgedu_user \
–password fgedu_password \
–table fgedu_order \
–target-dir /fgedu/order \
–m 2
– 查看导入的数据
$ hdfs dfs -ls /fgedu/order
# 输出结果
# Found 3 items
# -rw-r–r– 1 root supergroup 0 2024-01-10 10:00 /fgedu/order/_SUCCESS
# -rw-r–r– 1 root supergroup 2048 2024-01-10 10:00 /fgedu/order/part-m-00000
# -rw-r–r– 1 root supergroup 2048 2024-01-10 10:00 /fgedu/order/part-m-00001
– 从DM数据库导入到HDFS
$ sqoop import \
–connect jdbc:dm://fgedu.localhost:5236/fgedu \
–username fgedu_user \
–password fgedu_password \
–table fgedu_user \
–target-dir /fgedu/user \
–m 1
– 查看导入的数据
$ hdfs dfs -ls /fgedu/user
# 输出结果
# Found 2 items
# -rw-r–r– 1 root supergroup 0 2024-01-10 10:00 /fgedu/user/_SUCCESS
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/user/part-m-00000
– 查看数据内容
$ hdfs dfs -cat /fgedu/user/part-m-00000
# 输出结果
# 1,fgedu_user1,fgedu_user1@fgedu.net.cn,1
# 2,fgedu_user2,fgedu_user2@fgedu.net.cn,1
# 3,user3,user3@fgedu.net.cn,1
# 2. 增量数据采集
– 增量导入
$ sqoop import \
–connect jdbc:dm://fgedu.localhost:5236/fgedu \
–username fgedu_user \
–password fgedu_password \
–table fgedu_user \
–target-dir /fgedu/user_incremental \
–check-column update_time \
–incremental append \
–last-value “2024-01-01 00:00:00” \
–m 1
# 3. 实际示例
– 全量导入
$ sqoop import \
–connect jdbc:dm://fgedu.localhost:5236/fgedu \
–username fgedu_user \
–password fgedu_password \
–table fgedu_order \
–target-dir /fgedu/order \
–m 2
– 查看导入的数据
$ hdfs dfs -ls /fgedu/order
# 输出结果
# Found 3 items
# -rw-r–r– 1 root supergroup 0 2024-01-10 10:00 /fgedu/order/_SUCCESS
# -rw-r–r– 1 root supergroup 2048 2024-01-10 10:00 /fgedu/order/part-m-00000
# -rw-r–r– 1 root supergroup 2048 2024-01-10 10:00 /fgedu/order/part-m-00001
3.2 数据处理
3.2.1 批量处理
# 1. MapReduce处理
– 编写Mapper
$ vi WordCountMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(” “);
for (String w : words) {
word.set(w);
context.write(word, one);
}
}
}
– 编写Reducer
$ vi WordCountReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer {
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
# 2. Spark处理
– 读取数据
scala> val data = spark.read.text(“/fgedu/data.txt”)
– 处理数据
scala> val words = data.rdd.flatMap(line => line.toString().split(” “))
scala> val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
– 查看结果
scala> wordCounts.collect()
# 输出结果
# res0: Array[(String, Int)] = Array((hello,3), (world,2), (fgedu,1))
# 3. 实际示例
– 读取用户数据
scala> val userDF = spark.read.option(“header”, “true”).csv(“/fgedu/user”)
– 统计用户数量
scala> val userCount = userDF.count()
# 输出结果
# userCount: Long = 1000
– 统计用户状态分布
scala> val statusDF = userDF.groupBy(“user_status”).count()
scala> statusDF.show()
# 输出结果
# +———–+—–+
# |user_status|count|
# +———–+—–+
# | 1| 800|
# | 0| 200|
# +———–+—–+
– 编写Mapper
$ vi WordCountMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(” “);
for (String w : words) {
word.set(w);
context.write(word, one);
}
}
}
– 编写Reducer
$ vi WordCountReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer
public void reduce(Text key, Iterable
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
# 2. Spark处理
– 读取数据
scala> val data = spark.read.text(“/fgedu/data.txt”)
– 处理数据
scala> val words = data.rdd.flatMap(line => line.toString().split(” “))
scala> val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
– 查看结果
scala> wordCounts.collect()
# 输出结果
# res0: Array[(String, Int)] = Array((hello,3), (world,2), (fgedu,1))
# 3. 实际示例
– 读取用户数据
scala> val userDF = spark.read.option(“header”, “true”).csv(“/fgedu/user”)
– 统计用户数量
scala> val userCount = userDF.count()
# 输出结果
# userCount: Long = 1000
– 统计用户状态分布
scala> val statusDF = userDF.groupBy(“user_status”).count()
scala> statusDF.show()
# 输出结果
# +———–+—–+
# |user_status|count|
# +———–+—–+
# | 1| 800|
# | 0| 200|
# +———–+—–+
3.2.2 实时处理
# 1. Spark Streaming处理
– 创建StreamingContext
scala> val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
– 创建DStream
scala> val lines = ssc.socketTextStream(“fgedu.localhost”, 9999)
– 处理数据
scala> val words = lines.flatMap(line => line.split(” “))
scala> val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
– 输出结果
scala> wordCounts.print()
– 启动Streaming
scala> ssc.start()
scala> ssc.awaitTermination()
# 2. 实际示例
– 读取Kafka数据
scala> val kafkaDF = spark.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “fgedu.localhost:9092”)
.option(“subscribe”, “fgedu_topic”)
.load()
– 处理数据
scala> val processedDF = kafkaDF.selectExpr(“CAST(value AS STRING)”)
.as[String]
.flatMap(line => line.split(” “))
.groupBy(“value”)
.count()
– 输出结果
scala> val query = processedDF.writeStream
.outputMode(“complete”)
.format(“console”)
.start()
– 查看结果
# Batch: 0
# +—–+—–+
# |value|count|
# +—–+—–+
# |hello| 3|
# |world| 2|
# +—–+—–+
– 创建StreamingContext
scala> val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
– 创建DStream
scala> val lines = ssc.socketTextStream(“fgedu.localhost”, 9999)
– 处理数据
scala> val words = lines.flatMap(line => line.split(” “))
scala> val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
– 输出结果
scala> wordCounts.print()
– 启动Streaming
scala> ssc.start()
scala> ssc.awaitTermination()
# 2. 实际示例
– 读取Kafka数据
scala> val kafkaDF = spark.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “fgedu.localhost:9092”)
.option(“subscribe”, “fgedu_topic”)
.load()
– 处理数据
scala> val processedDF = kafkaDF.selectExpr(“CAST(value AS STRING)”)
.as[String]
.flatMap(line => line.split(” “))
.groupBy(“value”)
.count()
– 输出结果
scala> val query = processedDF.writeStream
.outputMode(“complete”)
.format(“console”)
.start()
– 查看结果
# Batch: 0
# +—–+—–+
# |value|count|
# +—–+—–+
# |hello| 3|
# |world| 2|
# +—–+—–+
3.3 数据存储
3.3.1 HDFS存储
# 1. HDFS基本操作
– 创建目录
$ hdfs dfs -mkdir /fgedu/data
– 上传文件
$ hdfs dfs -put /tmp/data.txt /fgedu/data/
– 查看文件
$ hdfs dfs -ls /fgedu/data/
# 输出结果
# Found 1 items
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/data/data.txt
– 查看文件内容
$ hdfs dfs -cat /fgedu/data/data.txt
# 输出结果
# hello world fgedu
# hello world
# 2. HDFS副本管理
– 设置副本数
$ hdfs dfs -setrep -w 3 /fgedu/data/data.txt
# 输出结果
# Replication 3 set: /fgedu/data/data.txt
– 查看副本数
$ hdfs dfs -stat %r /fgedu/data/data.txt
# 输出结果
# 3
# 3. 实际示例
– 批量上传文件
$ hdfs dfs -put /tmp/*.csv /fgedu/data/
– 查看文件列表
$ hdfs dfs -ls /fgedu/data/
# 输出结果
# Found 3 items
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/data/user.csv
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/data/order.csv
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/data/product.csv
– 创建目录
$ hdfs dfs -mkdir /fgedu/data
– 上传文件
$ hdfs dfs -put /tmp/data.txt /fgedu/data/
– 查看文件
$ hdfs dfs -ls /fgedu/data/
# 输出结果
# Found 1 items
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/data/data.txt
– 查看文件内容
$ hdfs dfs -cat /fgedu/data/data.txt
# 输出结果
# hello world fgedu
# hello world
# 2. HDFS副本管理
– 设置副本数
$ hdfs dfs -setrep -w 3 /fgedu/data/data.txt
# 输出结果
# Replication 3 set: /fgedu/data/data.txt
– 查看副本数
$ hdfs dfs -stat %r /fgedu/data/data.txt
# 输出结果
# 3
# 3. 实际示例
– 批量上传文件
$ hdfs dfs -put /tmp/*.csv /fgedu/data/
– 查看文件列表
$ hdfs dfs -ls /fgedu/data/
# 输出结果
# Found 3 items
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/data/user.csv
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/data/order.csv
# -rw-r–r– 1 root supergroup 1024 2024-01-10 10:00 /fgedu/data/product.csv
3.3.2 Hive存储
# 1. 创建Hive表
– 创建内部表
hive> CREATE TABLE fgedu_user (
user_id INT,
user_name STRING,
user_email STRING,
user_status INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE;
– 创建外部表
hive> CREATE EXTERNAL TABLE fgedu_order (
order_id INT,
user_id INT,
product_id INT,
amount DECIMAL(10,2),
order_date STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE
LOCATION ‘/fgedu/order’;
# 2. 加载数据
– 加载数据到内部表
hive> LOAD DATA LOCAL INPATH ‘/tmp/user.csv’ INTO TABLE fgedu_user;
– 加载数据到外部表
hive> LOAD DATA INPATH ‘/tmp/order.csv’ INTO TABLE fgedu_order;
# 3. 查询数据
– 查询用户数据
hive> SELECT * FROM fgedu_user LIMIT 10;
# 输出结果
# OK
# 1 fgedu_user1 fgedu_user1@fgedu.net.cn 1
# 2 fgedu_user2 fgedu_user2@fgedu.net.cn 1
# 3 user3 user3@fgedu.net.cn 1
– 查询订单数据
hive> SELECT user_id, COUNT(*) AS order_count, SUM(amount) AS total_amount
FROM fgedu_order
GROUP BY user_id
LIMIT 10;
# 输出结果
# OK
# 1 10 1000.00
# 2 5 500.00
# 3 3 300.00
# 4. 实际示例
– 创建分区表
hive> CREATE TABLE fgedu_log (
log_time STRING,
log_level STRING,
log_message STRING
)
PARTITIONED BY (log_date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE;
– 加载分区数据
hive> LOAD DATA INPATH ‘/fgedu/logs/20240110′ INTO TABLE fgedu_log PARTITION (log_date=’20240110’);
– 查询分区数据
hive> SELECT * FROM fgedu_log WHERE log_date = ‘20240110’ LIMIT 10;
# 输出结果
# OK
# 2024-01-10 10:00:00 INFO Application started
# 2024-01-10 10:00:01 INFO User login
– 创建内部表
hive> CREATE TABLE fgedu_user (
user_id INT,
user_name STRING,
user_email STRING,
user_status INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE;
– 创建外部表
hive> CREATE EXTERNAL TABLE fgedu_order (
order_id INT,
user_id INT,
product_id INT,
amount DECIMAL(10,2),
order_date STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE
LOCATION ‘/fgedu/order’;
# 2. 加载数据
– 加载数据到内部表
hive> LOAD DATA LOCAL INPATH ‘/tmp/user.csv’ INTO TABLE fgedu_user;
– 加载数据到外部表
hive> LOAD DATA INPATH ‘/tmp/order.csv’ INTO TABLE fgedu_order;
# 3. 查询数据
– 查询用户数据
hive> SELECT * FROM fgedu_user LIMIT 10;
# 输出结果
# OK
# 1 fgedu_user1 fgedu_user1@fgedu.net.cn 1
# 2 fgedu_user2 fgedu_user2@fgedu.net.cn 1
# 3 user3 user3@fgedu.net.cn 1
– 查询订单数据
hive> SELECT user_id, COUNT(*) AS order_count, SUM(amount) AS total_amount
FROM fgedu_order
GROUP BY user_id
LIMIT 10;
# 输出结果
# OK
# 1 10 1000.00
# 2 5 500.00
# 3 3 300.00
# 4. 实际示例
– 创建分区表
hive> CREATE TABLE fgedu_log (
log_time STRING,
log_level STRING,
log_message STRING
)
PARTITIONED BY (log_date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE;
– 加载分区数据
hive> LOAD DATA INPATH ‘/fgedu/logs/20240110′ INTO TABLE fgedu_log PARTITION (log_date=’20240110’);
– 查询分区数据
hive> SELECT * FROM fgedu_log WHERE log_date = ‘20240110’ LIMIT 10;
# 输出结果
# OK
# 2024-01-10 10:00:00 INFO Application started
# 2024-01-10 10:00:01 INFO User login
风哥提示:大数据集成是一个持续的过程,需要根据业务需求和数据变化,不断调整和优化。建立完善的监控体系,是保障大数据平台稳定运行的关键。
Part04-生产案例与实战讲解
4.1 日志数据采集案例
4.1.1 案例描述
某企业需要采集应用日志数据,存储到HDFS,用于后续的数据分析和挖掘。
4.1.2 实施步骤
# 1. 需求分析
– 业务需求
– 实时采集应用日志
– 存储到HDFS
– 支持数据分析
– 技术需求
– 使用Flume采集日志
– 存储到HDFS
– 按日期分区存储
# 2. 方案设计
– Flume Agent配置
– Source:Exec Source
– Channel:Memory Channel
– Sink:HDFS Sink
– HDFS目录结构
– /fgedu/logs/yyyyMMdd/
– 按日期分区
– 按小时滚动
# 3. 实施步骤
– 配置Flume Agent
$ vi $FLUME_HOME/conf/fgedu_log.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/fgedu/application.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://fgedu.localhost:9000/fgedu/logs/%Y%m%d/%H
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
– 启动Flume Agent
$ flume-ng agent –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/fgedu_log.conf –name a1 -Dflume.root.logger=INFO,console
# 4. 验证结果
– 写入日志
$ echo “2024-01-10 10:00:00 INFO [main] Application started” >> /var/log/fgedu/application.log
– 查看HDFS
$ hdfs dfs -ls /fgedu/logs/20240110/10
# 输出结果
# Found 1 items
# -rw-r–r– 1 root supergroup 45 2024-01-10 10:00 /fgedu/logs/20240110/10/FlumeData.1234567890
– 查看日志内容
$ hdfs dfs -cat /fgedu/logs/20240110/10/FlumeData.1234567890
# 输出结果
# 2024-01-10 10:00:00 INFO [main] Application started
# 5. 实施结果
– 日志采集成功
– 数据存储正常
– 分区存储正常
– 业务需求
– 实时采集应用日志
– 存储到HDFS
– 支持数据分析
– 技术需求
– 使用Flume采集日志
– 存储到HDFS
– 按日期分区存储
# 2. 方案设计
– Flume Agent配置
– Source:Exec Source
– Channel:Memory Channel
– Sink:HDFS Sink
– HDFS目录结构
– /fgedu/logs/yyyyMMdd/
– 按日期分区
– 按小时滚动
# 3. 实施步骤
– 配置Flume Agent
$ vi $FLUME_HOME/conf/fgedu_log.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/fgedu/application.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://fgedu.localhost:9000/fgedu/logs/%Y%m%d/%H
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
– 启动Flume Agent
$ flume-ng agent –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/fgedu_log.conf –name a1 -Dflume.root.logger=INFO,console
# 4. 验证结果
– 写入日志
$ echo “2024-01-10 10:00:00 INFO [main] Application started” >> /var/log/fgedu/application.log
– 查看HDFS
$ hdfs dfs -ls /fgedu/logs/20240110/10
# 输出结果
# Found 1 items
# -rw-r–r– 1 root supergroup 45 2024-01-10 10:00 /fgedu/logs/20240110/10/FlumeData.1234567890
– 查看日志内容
$ hdfs dfs -cat /fgedu/logs/20240110/10/FlumeData.1234567890
# 输出结果
# 2024-01-10 10:00:00 INFO [main] Application started
# 5. 实施结果
– 日志采集成功
– 数据存储正常
– 分区存储正常
4.2 实时数据处理案例
4.2.1 案例描述
某企业需要实时处理订单数据,计算实时订单统计指标,用于实时监控和决策。
4.2.2 实施步骤
# 1. 需求分析
– 业务需求
– 实时处理订单数据
– 计算订单统计指标
– 实时监控和决策
– 技术需求
– 使用Kafka接收订单数据
– 使用Spark Streaming处理数据
– 存储到DM数据库
# 2. 方案设计
– Kafka Topic
– Topic名称:fgedu_order
– 分区数:3
– 副本数:2
– Spark Streaming
– 批处理间隔:10秒
– 处理逻辑:统计订单数量和金额
– 输出:写入DM数据库
# 3. 实施步骤
– 创建Kafka Topic
$ kafka-topics.sh –create –topic fgedu_order –partitions 3 –replication-factor 2 –bootstrap-server fgedu.localhost:9092
– Spark Streaming处理代码
$ vi OrderStreaming.scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import java.sql.{DriverManager, Connection}
object OrderStreaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(“OrderStreaming”)
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
“bootstrap.servers” -> “fgedu.localhost:9092”,
“key.deserializer” -> classOf[StringDeserializer],
“value.deserializer” -> classOf[StringDeserializer],
“group.id” -> “fgedu_group”,
“auto.offset.reset” -> “latest”,
“enable.auto.commit” -> (false: java.lang.Boolean)
)
val topics = Array(“fgedu_order”)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val orders = stream.map(record => record.value)
val orderStats = orders.map(order => {
val fields = order.split(“,”)
(fields(0).toInt, fields(3).toDouble)
}).reduceByKey(_ + _)
orderStats.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val conn = DriverManager.getConnection(“jdbc:dm://fgedu.localhost:5236/fgedu”, “fgedu_user”, “fgedu_password”)
partition.foreach { case (user_id, total_amount) =>
val stmt = conn.prepareStatement(“INSERT INTO fgedu_order_stats (user_id, total_amount, stat_time) VALUES (?, ?, SYSDATE)”)
stmt.setInt(1, user_id)
stmt.setDouble(2, total_amount)
stmt.executeUpdate()
}
conn.close()
}
}
ssc.start()
ssc.awaitTermination()
}
}
– 编译和运行
$ spark-submit –class OrderStreaming –master local[2] OrderStreaming.jar
# 4. 验证结果
– 发送订单数据
$ kafka-console-producer.sh –broker-list fgedu.localhost:9092 –topic fgedu_order
> 1,1,1,100.00,2024-01-10 10:00:00
> 1,2,2,200.00,2024-01-10 10:00:01
> 2,3,3,300.00,2024-01-10 10:00:02
– 查询统计结果
SQL> SELECT * FROM fgedu_order_stats;
# 输出结果
# USER_ID TOTAL_AMOUNT STAT_TIME
# ——– ————- ——————-
# 1 300.00 2024-01-10 10:00:10
# 2 300.00 2024-01-10 10:00:10
# 5. 实施结果
– 实时处理成功
– 统计计算正常
– 数据存储正常
– 业务需求
– 实时处理订单数据
– 计算订单统计指标
– 实时监控和决策
– 技术需求
– 使用Kafka接收订单数据
– 使用Spark Streaming处理数据
– 存储到DM数据库
# 2. 方案设计
– Kafka Topic
– Topic名称:fgedu_order
– 分区数:3
– 副本数:2
– Spark Streaming
– 批处理间隔:10秒
– 处理逻辑:统计订单数量和金额
– 输出:写入DM数据库
# 3. 实施步骤
– 创建Kafka Topic
$ kafka-topics.sh –create –topic fgedu_order –partitions 3 –replication-factor 2 –bootstrap-server fgedu.localhost:9092
– Spark Streaming处理代码
$ vi OrderStreaming.scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import java.sql.{DriverManager, Connection}
object OrderStreaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(“OrderStreaming”)
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
“bootstrap.servers” -> “fgedu.localhost:9092”,
“key.deserializer” -> classOf[StringDeserializer],
“value.deserializer” -> classOf[StringDeserializer],
“group.id” -> “fgedu_group”,
“auto.offset.reset” -> “latest”,
“enable.auto.commit” -> (false: java.lang.Boolean)
)
val topics = Array(“fgedu_order”)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val orders = stream.map(record => record.value)
val orderStats = orders.map(order => {
val fields = order.split(“,”)
(fields(0).toInt, fields(3).toDouble)
}).reduceByKey(_ + _)
orderStats.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val conn = DriverManager.getConnection(“jdbc:dm://fgedu.localhost:5236/fgedu”, “fgedu_user”, “fgedu_password”)
partition.foreach { case (user_id, total_amount) =>
val stmt = conn.prepareStatement(“INSERT INTO fgedu_order_stats (user_id, total_amount, stat_time) VALUES (?, ?, SYSDATE)”)
stmt.setInt(1, user_id)
stmt.setDouble(2, total_amount)
stmt.executeUpdate()
}
conn.close()
}
}
ssc.start()
ssc.awaitTermination()
}
}
– 编译和运行
$ spark-submit –class OrderStreaming –master local[2] OrderStreaming.jar
# 4. 验证结果
– 发送订单数据
$ kafka-console-producer.sh –broker-list fgedu.localhost:9092 –topic fgedu_order
> 1,1,1,100.00,2024-01-10 10:00:00
> 1,2,2,200.00,2024-01-10 10:00:01
> 2,3,3,300.00,2024-01-10 10:00:02
– 查询统计结果
SQL> SELECT * FROM fgedu_order_stats;
# 输出结果
# USER_ID TOTAL_AMOUNT STAT_TIME
# ——– ————- ——————-
# 1 300.00 2024-01-10 10:00:10
# 2 300.00 2024-01-10 10:00:10
# 5. 实施结果
– 实时处理成功
– 统计计算正常
– 数据存储正常
4.3 数据湖建设案例
4.3.1 案例描述
某企业需要建设数据湖,整合多个数据源的数据,支持数据分析和机器学习。
4.3.2 实施步骤
# 1. 需求分析
– 业务需求
– 整合多个数据源
– 支持数据分析
– 支持机器学习
– 技术需求
– 使用HDFS存储数据
– 使用Hive管理数据
– 使用Spark处理数据
# 2. 方案设计
– 数据湖架构
– 原始数据层(Raw)
– 清洗数据层(Cleaned)
– 聚合数据层(Aggregated)
– 数据处理流程
– 数据采集:Flume + Sqoop
– 数据处理:Spark
– 数据存储:HDFS + Hive
# 3. 实施步骤
– 创建HDFS目录
$ hdfs dfs -mkdir -p /fgedu/lake/raw
$ hdfs dfs -mkdir -p /fgedu/lake/cleaned
$ hdfs dfs -mkdir -p /fgedu/lake/aggregated
– 创建Hive数据库
hive> CREATE DATABASE fgedu_lake;
– 创建原始数据表
hive> CREATE TABLE fgedu_lake.raw_user (
user_id INT,
user_name STRING,
user_email STRING,
user_status INT,
create_time STRING,
update_time STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE
LOCATION ‘/fgedu/lake/raw/user’;
– 创建清洗数据表
hive> CREATE TABLE fgedu_lake.cleaned_user (
user_id INT,
user_name STRING,
user_email STRING,
user_status INT,
create_date DATE,
update_date DATE
)
PARTITIONED BY (load_date STRING)
STORED AS PARQUET
LOCATION ‘/fgedu/lake/cleaned/user’;
– 创建聚合数据表
hive> CREATE TABLE fgedu_lake.aggregated_user_stats (
user_id INT,
order_count INT,
total_amount DECIMAL(10,2),
avg_amount DECIMAL(10,2),
stat_date DATE
)
PARTITIONED BY (load_date STRING)
STORED AS PARQUET
LOCATION ‘/fgedu/lake/aggregated/user_stats’;
# 4. 数据处理
– 数据清洗
$ spark-submit –class DataCleaning –master yarn DataCleaning.jar
– 数据聚合
$ spark-submit –class DataAggregation –master yarn DataAggregation.jar
# 5. 验证结果
– 查询清洗数据
hive> SELECT * FROM fgedu_lake.cleaned_user WHERE load_date = ‘20240110’ LIMIT 10;
# 输出结果
# OK
# 1 fgedu_user1 fgedu_user1@fgedu.net.cn 1 2024-01-01 2024-01-10
# 2 fgedu_user2 fgedu_user2@fgedu.net.cn 1 2024-01-02 2024-01-10
# 3 user3 user3@fgedu.net.cn 1 2024-01-03 2024-01-10
– 查询聚合数据
hive> SELECT * FROM fgedu_lake.aggregated_user_stats WHERE load_date = ‘20240110’ LIMIT 10;
# 输出结果
# OK
# 1 10 1000.00 100.00 2024-01-10
# 2 5 500.00 100.00 2024-01-10
# 3 3 300.00 100.00 2024-01-10
# 6. 实施结果
– 数据湖建设成功
– 数据整合完成
– 数据处理正常
– 业务需求
– 整合多个数据源
– 支持数据分析
– 支持机器学习
– 技术需求
– 使用HDFS存储数据
– 使用Hive管理数据
– 使用Spark处理数据
# 2. 方案设计
– 数据湖架构
– 原始数据层(Raw)
– 清洗数据层(Cleaned)
– 聚合数据层(Aggregated)
– 数据处理流程
– 数据采集:Flume + Sqoop
– 数据处理:Spark
– 数据存储:HDFS + Hive
# 3. 实施步骤
– 创建HDFS目录
$ hdfs dfs -mkdir -p /fgedu/lake/raw
$ hdfs dfs -mkdir -p /fgedu/lake/cleaned
$ hdfs dfs -mkdir -p /fgedu/lake/aggregated
– 创建Hive数据库
hive> CREATE DATABASE fgedu_lake;
– 创建原始数据表
hive> CREATE TABLE fgedu_lake.raw_user (
user_id INT,
user_name STRING,
user_email STRING,
user_status INT,
create_time STRING,
update_time STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE
LOCATION ‘/fgedu/lake/raw/user’;
– 创建清洗数据表
hive> CREATE TABLE fgedu_lake.cleaned_user (
user_id INT,
user_name STRING,
user_email STRING,
user_status INT,
create_date DATE,
update_date DATE
)
PARTITIONED BY (load_date STRING)
STORED AS PARQUET
LOCATION ‘/fgedu/lake/cleaned/user’;
– 创建聚合数据表
hive> CREATE TABLE fgedu_lake.aggregated_user_stats (
user_id INT,
order_count INT,
total_amount DECIMAL(10,2),
avg_amount DECIMAL(10,2),
stat_date DATE
)
PARTITIONED BY (load_date STRING)
STORED AS PARQUET
LOCATION ‘/fgedu/lake/aggregated/user_stats’;
# 4. 数据处理
– 数据清洗
$ spark-submit –class DataCleaning –master yarn DataCleaning.jar
– 数据聚合
$ spark-submit –class DataAggregation –master yarn DataAggregation.jar
# 5. 验证结果
– 查询清洗数据
hive> SELECT * FROM fgedu_lake.cleaned_user WHERE load_date = ‘20240110’ LIMIT 10;
# 输出结果
# OK
# 1 fgedu_user1 fgedu_user1@fgedu.net.cn 1 2024-01-01 2024-01-10
# 2 fgedu_user2 fgedu_user2@fgedu.net.cn 1 2024-01-02 2024-01-10
# 3 user3 user3@fgedu.net.cn 1 2024-01-03 2024-01-10
– 查询聚合数据
hive> SELECT * FROM fgedu_lake.aggregated_user_stats WHERE load_date = ‘20240110’ LIMIT 10;
# 输出结果
# OK
# 1 10 1000.00 100.00 2024-01-10
# 2 5 500.00 100.00 2024-01-10
# 3 3 300.00 100.00 2024-01-10
# 6. 实施结果
– 数据湖建设成功
– 数据整合完成
– 数据处理正常
生产环境建议:在大数据集成项目完成后,要进行充分的测试,确保系统的可靠性和稳定性。建立完善的监控体系,及时发现和解决问题。定期进行系统维护,保持系统稳定运行。
Part05-风哥经验总结与分享
5.1 大数据集成最佳实践
DM数据库大数据集成最佳实践:
- 充分规划:在建设前进行充分的规划,包括需求分析、架构设计、技术选型
- 分层设计:采用分层架构设计,包括原始数据层、清洗数据层、聚合数据层
- 数据质量:保证数据质量,包括数据清洗、数据验证、数据监控
- 性能优化:优化数据处理性能,包括并行处理、内存管理、任务调度
- 可靠性保证:保证数据处理的可靠性,包括容错机制、数据备份、故障恢复
- 扩展性设计:设计可扩展的架构,支持业务增长和数据增长
- 监控告警:建立完善的监控告警体系,及时发现和解决问题
- 文档记录:记录建设过程和结果,便于后续维护
- 团队协作:与团队协作,共同完成建设工作
- 经验积累:积累建设经验,提高建设效率
5.2 常见问题与解决方案
# 1. 数据采集问题
– 症状:数据采集失败,数据丢失
– 原因:Flume配置错误、网络问题、磁盘空间不足
– 解决方案:检查Flume配置、检查网络连接、清理磁盘空间
# 2. 数据处理问题
– 症状:数据处理慢,任务失败
– 原因:数据量大、资源不足、任务配置不合理
– 解决方案:优化任务配置、增加资源、优化处理逻辑
# 3. 数据存储问题
– 症状:数据存储失败,数据丢失
– 原因:HDFS配置错误、NameNode故障、磁盘故障
– 解决方案:检查HDFS配置、修复NameNode、更换故障磁盘
# 4. 性能问题
– 症状:系统性能差,响应慢
– 原因:资源不足、配置不合理、数据倾斜
– 解决方案:增加资源、优化配置、解决数据倾斜
# 5. 扩展性问题
– 症状:系统扩展性差,无法满足业务增长
– 原因:架构设计不合理、技术选型不当
– 解决方案:重新设计架构、优化技术选型
– 症状:数据采集失败,数据丢失
– 原因:Flume配置错误、网络问题、磁盘空间不足
– 解决方案:检查Flume配置、检查网络连接、清理磁盘空间
# 2. 数据处理问题
– 症状:数据处理慢,任务失败
– 原因:数据量大、资源不足、任务配置不合理
– 解决方案:优化任务配置、增加资源、优化处理逻辑
# 3. 数据存储问题
– 症状:数据存储失败,数据丢失
– 原因:HDFS配置错误、NameNode故障、磁盘故障
– 解决方案:检查HDFS配置、修复NameNode、更换故障磁盘
# 4. 性能问题
– 症状:系统性能差,响应慢
– 原因:资源不足、配置不合理、数据倾斜
– 解决方案:增加资源、优化配置、解决数据倾斜
# 5. 扩展性问题
– 症状:系统扩展性差,无法满足业务增长
– 原因:架构设计不合理、技术选型不当
– 解决方案:重新设计架构、优化技术选型
5.3 大数据集成检查清单
DM数据库大数据集成检查清单:
- 需求分析检查:需求分析是否充分,业务需求是否明确
- 架构设计检查:架构设计是否合理,分层设计是否清晰
- 数据采集检查:数据采集是否正常,数据是否完整
- 数据处理检查:数据处理是否正常,结果是否正确
- 数据存储检查:数据存储是否正常,数据是否安全
- 性能检查:性能是否满足要求,优化是否完成
- 可靠性检查:可靠性是否保证,容错机制是否完善
- 扩展性检查:扩展性是否满足要求,架构是否可扩展
- 文档记录检查:建设过程是否记录,文档是否完善
- 团队协作检查:团队协作是否顺畅,责任分工是否明确
持续改进:大数据集成是一个持续的过程,需要根据业务需求和数据变化,不断调整和优化。建立完善的监控体系,是保障大数据平台稳定运行的关键。定期进行系统维护,保持系统稳定运行。
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
