目录大纲
Part01-基础概念与理论知识
1.1 Flume架构概述
1.2 核心组件介绍
1.3 数据流模型
Part02-生产环境规划与建议
2.1 部署架构规划
2.2 组件选型规划
2.3 性能优化规划
Part03-生产环境项目实施方案
3.1 Flume安装配置
3.2 Source配置实战
3.3 Channel配置实战
3.4 Sink配置实战
Part04-生产案例与实战讲解
4.1 日志收集到HDFS案例
4.2 日志收集到Kafka案例
4.3 多路复用案例
Part05-风哥经验总结与分享
5.1 Flume最佳实践
5.2 日志收集经验总结
Part01-基础概念与理论知识
1.1 Flume架构概述
Flume是Apache顶级项目,用于大规模日志数据收集、聚合和传输。更多视频教程www.fgedu.net.cn
1.2 核心组件介绍
Flume核心组件包括Source、Channel、Sink。学习交流加群风哥微信: itpux-com
– Source:数据源,接收数据
– Channel:通道,暂存数据
– Sink:目的地,发送数据
– Agent:JVM进程,运行组件
– Event:数据单元,包含header和body
1.3 数据流模型
Flume数据流模型是Source-Channel-Sink链式结构。from bigdata视频:www.itpux.com
flume-ng version
# 查看Flume帮助
flume-ng help
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 1234567890abcdef
# Flume帮助
Usage: flume-ng
commands:
help display this help text
agent run a Flume agent
avro-client run an Avro client
Part02-生产环境规划与建议
2.1 部署架构规划
Flume部署架构需要考虑数据量和可靠性。更多学习教程公众号风哥教程itpux_com
– 单层架构:简单场景,直接收集
– 两层架构:Agent收集,Collector聚合
– 多层架构:大规模场景,多层聚合
2.2 组件选型规划
组件选型影响系统性能和可靠性。学习交流加群风哥QQ113257174
ls /bigdata/app/flume/lib/ | grep -i source
# 查看可用Channel
ls /bigdata/app/flume/lib/ | grep -i channel
# 查看可用Sink
ls /bigdata/app/flume/lib/ | grep -i sink
flume-file-channel-1.9.0.jar
flume-jdbc-channel-1.9.0.jar
flume-kafka-source-1.9.0.jar
# Channel列表
flume-file-channel-1.9.0.jar
flume-memory-channel-1.9.0.jar
flume-jdbc-channel-1.9.0.jar
# Sink列表
flume-hdfs-sink-1.9.0.jar
flume-kafka-sink-1.9.0.jar
flume-hbase-sink-1.9.0.jar
2.3 性能优化规划
性能优化需要从多个角度考虑。风哥提示:合理的Channel配置是性能优化的关键。
– 选择合适的Channel类型
– 调整批量大小
– 配置事务容量
– 监控Channel使用率
– 合理设置超时时间
Part03-生产环境项目实施方案
3.1 Flume安装配置
3.1.1 安装Flume
wget https://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -xzf apache-flume-1.9.0-bin.tar.gz -C /bigdata/app/
ln -s /bigdata/app/apache-flume-1.9.0-bin /bigdata/app/flume
# 配置环境变量
export FLUME_HOME=/bigdata/app/flume
export PATH=$PATH:$FLUME_HOME/bin
# 配置flume-env.sh
cat > /bigdata/app/flume/conf/flume-env.sh << 'EOF'
export JAVA_HOME=/bigdata/app/java
export JAVA_OPTS=”-Xms1024m -Xmx2048m -Dflume.monitoring.type=http -Dflume.monitoring.port=34545″
EOF
# 验证安装
flume-ng version
# 完成
# 环境变量
# 配置完成
# 配置文件
# 配置完成
# 验证安装
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 1234567890abcdef
# Flume安装成功
3.2 Source配置实战
3.2.1 Avro Source配置
cat > /bigdata/app/flume/conf/avro-source.conf << 'EOF'
agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sink1
agent1.sources.src1.type = avro
agent1.sources.src1.bind = fgedu01
agent1.sources.src1.port = 4141
agent1.sources.src1.channels = ch1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 1000
agent1.sinks.sink1.type = logger
agent1.sinks.sink1.channel = ch1
EOF
# 启动Agent
flume-ng agent -n agent1 -c conf -f /bigdata/app/flume/conf/avro-source.conf -Dflume.root.logger=INFO,console
# 配置完成
# 启动Agent
Info: Sourcing environment configuration script…
Info: Including Hadoop libraries found via…
Component type: SOURCE, name: src1 started
Component type: CHANNEL, name: ch1 started
Component type: SINK, name: sink1 started
# Agent启动成功
3.2.2 Exec Source配置
cat > /bigdata/app/flume/conf/exec-source.conf << 'EOF'
agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sink1
agent1.sources.src1.type = exec
agent1.sources.src1.command = tail -F /bigdata/logs/app.log
agent1.sources.src1.shell = /bin/bash -c
agent1.sources.src1.channels = ch1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 1000
agent1.sinks.sink1.type = logger
agent1.sinks.sink1.channel = ch1
EOF
# 启动Agent
flume-ng agent -n agent1 -c conf -f /bigdata/app/flume/conf/exec-source.conf -Dflume.root.logger=INFO,console
# 配置完成
# 启动Agent
Info: Sourcing environment configuration script…
Component type: SOURCE, name: src1 started
Event: { headers:{} body: 74657374206C6F67206C696E65 test log line }
Event: { headers:{} body: 74657374206C6F67206C696E65 test log line }
# 日志收集成功
3.2.3 Spooling Directory Source配置
cat > /bigdata/app/flume/conf/spool-source.conf << 'EOF'
agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sink1
agent1.sources.src1.type = spooldir
agent1.sources.src1.spoolDir = /bigdata/logs/flume-spool
agent1.sources.src1.fileSuffix = .completed
agent1.sources.src1.deletePolicy = never
agent1.sources.src1.channels = ch1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 1000
agent1.sinks.sink1.type = logger
agent1.sinks.sink1.channel = ch1
EOF
# 创建监控目录
mkdir -p /bigdata/logs/flume-spool
# 启动Agent
flume-ng agent -n agent1 -c conf -f /bigdata/app/flume/conf/spool-source.conf -Dflume.root.logger=INFO,console
# 配置完成
# 创建目录
# 创建完成
# 启动Agent
Info: Sourcing environment configuration script…
Component type: SOURCE, name: src1 started
Spooling Directory Source: Monitoring directory: /bigdata/logs/flume-spool
Event: { headers:{} body: 746573742064617461 test data }
# 目录监控成功
3.3 Channel配置实战
3.3.1 Memory Channel配置
cat > /bigdata/app/flume/conf/memory-channel.conf << 'EOF'
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.keep-alive = 30
agent1.channels.ch1.byteCapacityBufferPercentage = 20
agent1.channels.ch1.byteCapacity = 800000000
EOF
# 查看配置
cat /bigdata/app/flume/conf/memory-channel.conf
# 配置完成
# 配置内容
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.keep-alive = 30
agent1.channels.ch1.byteCapacityBufferPercentage = 20
agent1.channels.ch1.byteCapacity = 800000000
3.3.2 File Channel配置
cat > /bigdata/app/flume/conf/file-channel.conf << 'EOF'
agent1.channels.ch1.type = file
agent1.channels.ch1.checkpointDir = /bigdata/flume/checkpoint
agent1.channels.ch1.dataDirs = /bigdata/flume/data
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.checkpointInterval = 30000
agent1.channels.ch1.maxFileSize = 2146435071
agent1.channels.ch1.minimumRequiredSpace = 524288000
EOF
# 创建目录
mkdir -p /bigdata/flume/checkpoint
mkdir -p /bigdata/flume/data
# 查看配置
cat /bigdata/app/flume/conf/file-channel.conf
# 配置完成
# 创建目录
# 创建完成
# 配置内容
agent1.channels.ch1.type = file
agent1.channels.ch1.checkpointDir = /bigdata/flume/checkpoint
agent1.channels.ch1.dataDirs = /bigdata/flume/data
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.checkpointInterval = 30000
agent1.channels.ch1.maxFileSize = 2146435071
agent1.channels.ch1.minimumRequiredSpace = 524288000
3.4 Sink配置实战
3.4.1 HDFS Sink配置
cat > /bigdata/app/flume/conf/hdfs-sink.conf << 'EOF'
agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sink1
agent1.sources.src1.type = exec
agent1.sources.src1.command = tail -F /bigdata/logs/app.log
agent1.sources.src1.channels = ch1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 1000
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://fgedu01:9000/bigdata/logs/%Y%m%d
agent1.sinks.sink1.hdfs.filePrefix = app-log
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.rollInterval = 3600
agent1.sinks.sink1.hdfs.rollSize = 134217728
agent1.sinks.sink1.hdfs.rollCount = 0
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.sink1.channel = ch1
EOF
# 启动Agent
flume-ng agent -n agent1 -c conf -f /bigdata/app/flume/conf/hdfs-sink.conf
# 配置完成
# 启动Agent
Info: Sourcing environment configuration script…
Component type: SOURCE, name: src1 started
Component type: CHANNEL, name: ch1 started
Component type: SINK, name: sink1 started
# HDFS Sink启动成功
3.4.2 Kafka Sink配置
cat > /bigdata/app/flume/conf/kafka-sink.conf << 'EOF'
agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sink1
agent1.sources.src1.type = exec
agent1.sources.src1.command = tail -F /bigdata/logs/app.log
agent1.sources.src1.channels = ch1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 1000
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.kafka.topic = fgedu-logs
agent1.sinks.sink1.kafka.bootstrap.servers = fgedu01:9092,fgedu02:9092,fgedu03:9092
agent1.sinks.sink1.kafka.flumeBatchSize = 100
agent1.sinks.sink1.kafka.producer.acks = all
agent1.sinks.sink1.kafka.producer.compression.type = snappy
agent1.sinks.sink1.channel = ch1
EOF
# 启动Agent
flume-ng agent -n agent1 -c conf -f /bigdata/app/flume/conf/kafka-sink.conf
# 配置完成
# 启动Agent
Info: Sourcing environment configuration script…
Component type: SOURCE, name: src1 started
Component type: CHANNEL, name: ch1 started
Component type: SINK, name: sink1 started
# Kafka Sink启动成功
Part04-生产案例与实战讲解
4.1 日志收集到HDFS案例
日志收集到HDFS是经典应用场景。更多视频教程www.fgedu.net.cn
# flume_to_hdfs.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “=== Flume to HDFS ===”
echo “Date: $(date)”
# 创建配置文件
cat > /bigdata/app/flume/conf/fgedu-hdfs.conf << 'EOF'
fgedu.sources = src1
fgedu.channels = ch1
fgedu.sinks = sink1
fgedu.sources.src1.type = exec
fgedu.sources.src1.command = tail -F /bigdata/logs/fgedu-app.log
fgedu.sources.src1.channels = ch1
fgedu.channels.ch1.type = file
fgedu.channels.ch1.checkpointDir = /bigdata/flume/checkpoint
fgedu.channels.ch1.dataDirs = /bigdata/flume/data
fgedu.channels.ch1.capacity = 1000000
fgedu.sinks.sink1.type = hdfs
fgedu.sinks.sink1.hdfs.path = hdfs://fgedu01:9000/bigdata/logs/fgedu/%Y%m%d
fgedu.sinks.sink1.hdfs.filePrefix = fgedu-log
fgedu.sinks.sink1.hdfs.fileType = DataStream
fgedu.sinks.sink1.hdfs.writeFormat = Text
fgedu.sinks.sink1.hdfs.rollInterval = 600
fgedu.sinks.sink1.hdfs.rollSize = 134217728
fgedu.sinks.sink1.hdfs.rollCount = 0
fgedu.sinks.sink1.hdfs.useLocalTimeStamp = true
fgedu.sinks.sink1.channel = ch1
EOF
# 启动Flume
nohup flume-ng agent -n fgedu -c conf -f /bigdata/app/flume/conf/fgedu-hdfs.conf > /bigdata/logs/flume-hdfs.log 2>&1 &
# 验证
sleep 10
hdfs dfs -ls /bigdata/logs/fgedu/$(date +%Y%m%d)/
echo “=== Flume Started ===”
./flume_to_hdfs.sh
=== Flume to HDFS ===
Date: Fri Jan 19 14:00:00 CST 2024
# 创建配置
# 配置完成
# 启动Flume
# 启动成功
# 验证
Found 2 items
-rw-r–r– 3 fgedu fgedu 10240 2024-01-19 14:00 /bigdata/logs/fgedu/20240119/fgedu-log.1705543200000
-rw-r–r– 3 fgedu fgedu 10240 2024-01-19 14:01 /bigdata/logs/fgedu/20240119/fgedu-log.1705543260000
=== Flume Started ===
4.2 日志收集到Kafka案例
日志收集到Kafka用于实时处理场景。学习交流加群风哥微信: itpux-com
# flume_to_kafka.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “=== Flume to Kafka ===”
echo “Date: $(date)”
# 创建配置文件
cat > /bigdata/app/flume/conf/fgedu-kafka.conf << 'EOF'
fgedu.sources = src1
fgedu.channels = ch1
fgedu.sinks = sink1
fgedu.sources.src1.type = exec
fgedu.sources.src1.command = tail -F /bigdata/logs/fgedu-app.log
fgedu.sources.src1.channels = ch1
fgedu.channels.ch1.type = memory
fgedu.channels.ch1.capacity = 100000
fgedu.channels.ch1.transactionCapacity = 10000
fgedu.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
fgedu.sinks.sink1.kafka.topic = fgedu-realtime-logs
fgedu.sinks.sink1.kafka.bootstrap.servers = fgedu01:9092,fgedu02:9092,fgedu03:9092
fgedu.sinks.sink1.kafka.flumeBatchSize = 1000
fgedu.sinks.sink1.kafka.producer.acks = all
fgedu.sinks.sink1.channel = ch1
EOF
# 创建Kafka Topic
kafka-topics.sh –bootstrap-server fgedu01:9092 –create –topic fgedu-realtime-logs –partitions 6 –replication-factor 3
# 启动Flume
nohup flume-ng agent -n fgedu -c conf -f /bigdata/app/flume/conf/fgedu-kafka.conf > /bigdata/logs/flume-kafka.log 2>&1 &
# 验证
sleep 10
kafka-console-consumer.sh –bootstrap-server fgedu01:9092 –topic fgedu-realtime-logs –from-beginning –max-messages 5
echo “=== Flume Started ===”
./flume_to_kafka.sh
=== Flume to Kafka ===
Date: Fri Jan 19 14:30:00 CST 2024
# 创建配置
# 配置完成
# 创建Topic
Created topic fgedu-realtime-logs.
# 启动Flume
# 启动成功
# 验证
{“timestamp”:”2024-01-19T14:30:00″,”level”:”INFO”,”message”:”Application started”}
{“timestamp”:”2024-01-19T14:30:01″,”level”:”DEBUG”,”message”:”Processing request”}
{“timestamp”:”2024-01-19T14:30:02″,”level”:”INFO”,”message”:”Request completed”}
{“timestamp”:”2024-01-19T14:30:03″,”level”:”WARN”,”message”:”High memory usage”}
{“timestamp”:”2024-01-19T14:30:04″,”level”:”INFO”,”message”:”Cache refreshed”}
Processed a total of 5 messages
=== Flume Started ===
4.3 多路复用案例
4.3.1 多Sink配置
cat > /bigdata/app/flume/conf/multiplexing.conf << 'EOF'
agent1.sources = src1
agent1.channels = ch1 ch2
agent1.sinks = sink1 sink2
agent1.sources.src1.type = exec
agent1.sources.src1.command = tail -F /bigdata/logs/app.log
agent1.sources.src1.selector.type = replicating
agent1.sources.src1.channels = ch1 ch2
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch2.type = memory
agent1.channels.ch2.capacity = 10000
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://fgedu01:9000/bigdata/logs/app
agent1.sinks.sink1.channel = ch1
agent1.sinks.sink2.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink2.kafka.topic = fgedu-logs
agent1.sinks.sink2.kafka.bootstrap.servers = fgedu01:9092
agent1.sinks.sink2.channel = ch2
EOF
# 启动Agent
flume-ng agent -n agent1 -c conf -f /bigdata/app/flume/conf/multiplexing.conf
# 配置完成
# 启动Agent
Info: Sourcing environment configuration script…
Component type: SOURCE, name: src1 started
Component type: CHANNEL, name: ch1 started
Component type: CHANNEL, name: ch2 started
Component type: SINK, name: sink1 started
Component type: SINK, name: sink2 started
# 多路复用启动成功
Part05-风哥经验总结与分享
5.1 Flume最佳实践
在实际生产环境中,Flume使用需要注意以下几点:from bigdata视频:www.itpux.com
1. 选择合适的Channel类型
2. 合理设置批量大小
3. 监控Channel使用率
4. 配置故障恢复机制
5. 做好日志轮转
5.2 日志收集经验总结
5.2.1 日志收集建议
– 评估日志数据量
– 选择合适的Source
– 配置可靠的Channel
– 监控收集状态
– 做好容量规划
5.2.2 Flume运维脚本
# flume_maintenance.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “=== Flume Maintenance ===”
echo “Date: $(date)”
# 1. 检查Flume进程
echo “=== Check Flume Process ===”
ps -ef | grep flume | grep -v grep
# 2. 检查Channel状态
echo “=== Check Channel Status ===”
curl -s http://localhost:34545/metrics | python -m json.tool | grep -A5 “CHANNEL”
# 3. 清理旧日志
echo “=== Clean Old Logs ===”
find /bigdata/logs -name “flume-*.log” -mtime +7 -delete
# 4. 检查磁盘空间
echo “=== Check Disk Space ===”
df -h /bigdata/flume/
echo “=== Maintenance Completed ===”
./flume_maintenance.sh
=== Flume Maintenance ===
Date: Fri Jan 19 15:00:00 CST 2024
=== Check Flume Process ===
root 12345 1 0 14:00 ? 00:01:00 /bigdata/app/java/bin/java -Xms1024m -Xmx2048m -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 -cp /bigdata/app/flume/conf… org.apache.flume.node.Application -n fgedu -f /bigdata/app/flume/conf/fgedu-hdfs.conf
=== Check Channel Status ===
“CHANNEL.ch1”: {
“ChannelCapacity”: “1000000”,
“ChannelFillPercentage”: “5.0”,
“Type”: “CHANNEL”,
“EventPutSuccessCount”: “10000”,
“EventTakeSuccessCount”: “9500”
}
=== Clean Old Logs ===
# 清理完成
=== Check Disk Space ===
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 50G 10G 40G 20% /bigdata
=== Maintenance Completed ===
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
