1. 首页 > IT综合教程 > 正文

ELK教程FG457-ELK日志分析工具开发

1. ELK日志分析工具开发概述

ELK日志分析工具开发是指开发用于日志采集、解析、分析和可视化的工具,包括日志采集器、解析器、分析器、可视化工具等多个方面。ELK工具可以提高日志处理效率,减少人工操作,确保日志的质量和一致性。本文详细介绍ELK日志分析工具开发的核心要素和最佳实践。更多学习教程www.fgedu.net.cn

# 检查ELK环境
$ curl -X GET “http://fgedudb:9200/”
{
“name” : “elasticsearch-node-1”,
“cluster_name” : “elasticsearch”,
“cluster_uuid” : “abc123def456”,
“version” : {
“number” : “7.17.0”,
“build_flavor” : “default”,
“build_type” : “deb”,
“build_hash” : “747e1cc71def077253878a59143c1f785afa92b9”,
“build_date” : “2022-01-13T00:42:12.435326Z”,
“build_snapshot” : false,
“lucene_version” : “8.11.1”,
“minimum_wire_compatibility_version” : “6.8.0”,
“minimum_index_compatibility_version” : “6.0.0-beta1”
},
“tagline” : “You Know, for Search”
}

# 检查开发环境
$ python3 –version
Python 3.8.10

$ java -version
openjdk version “11.0.11” 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)

# 检查Logstash
$ /usr/share/logstash/bin/logstash –version
logstash 7.17.0

# 检查Kibana
$ curl -X GET “http://fgedudb:5601/api/status”
{
“name” : “kibana”,
“uuid” : “abc123-def456-ghi789”,
“version” : {
“number” : “7.17.0”,
“build_hash” : “abc123def456”,
“build_number” : 12345
},
“status” : {
“overall” : {
“level” : “available”,
“summary” : “All services are available”
}
}
}

生产环境风哥建议:ELK日志分析工具开发应遵循实用性、可靠性、可维护性和安全性原则,确保工具能够满足日志处理需求并稳定运行。

2. 常用ELK工具

常用的ELK工具包括日志采集工具、日志解析工具、日志分析工具、可视化工具等。学习交流加群风哥微信: itpux-com

# 常用ELK工具清单
$ cat > elk_tools.md << 'EOF' # 常用ELK工具 ## 1. 日志采集工具 - Filebeat:轻量级日志采集器 - Logstash:服务器端数据处理管道 - Fluentd:开源数据收集器 - Fluent Bit:轻量级日志处理器 - Rsyslog:系统日志处理工具 ## 2. 日志解析工具 - Grok:Logstash内置的日志解析工具 - Dissect:基于分隔符的日志解析 - CSV:CSV格式日志解析 - JSON:JSON格式日志解析 - 正则表达式:自定义日志解析 ## 3. 日志分析工具 - Elasticsearch:分布式搜索引擎 - Kibana:数据可视化平台 - Grafana:开源数据可视化 - Splunk:商业日志分析平台 - Graylog:开源日志管理平台 ## 4. 可视化工具 - Kibana Dashboard:Kibana仪表板 - Kibana Canvas:数据可视化画布 - Grafana Dashboard:Grafana仪表板 - Timelion:时间序列可视化 - Vega:高级可视化工具 ## 5. 监控工具 - Elasticsearch Monitoring:ES集群监控 - Kibana Monitoring:Kibana监控 - Metricbeat:指标采集器 - Heartbeat:运行时间监控 - APM:应用性能监控 EOF # 查看常用工具 $ cat elk_tools.md # 常用ELK工具 ## 1. 日志采集工具 - Filebeat:轻量级日志采集器 - Logstash:服务器端数据处理管道 - Fluentd:开源数据收集器 - Fluent Bit:轻量级日志处理器 - Rsyslog:系统日志处理工具 ## 2. 日志解析工具 - Grok:Logstash内置的日志解析工具 - Dissect:基于分隔符的日志解析 - CSV:CSV格式日志解析 - JSON:JSON格式日志解析 - 正则表达式:自定义日志解析 ## 3. 日志分析工具 - Elasticsearch:分布式搜索引擎 - Kibana:数据可视化平台 - Grafana:开源数据可视化 - Splunk:商业日志分析平台 - Graylog:开源日志管理平台 ## 4. 可视化工具 - Kibana Dashboard:Kibana仪表板 - Kibana Canvas:数据可视化画布 - Grafana Dashboard:Grafana仪表板 - Timelion:时间序列可视化 - Vega:高级可视化工具 ## 5. 监控工具 - Elasticsearch Monitoring:ES集群监控 - Kibana Monitoring:Kibana监控 - Metricbeat:指标采集器 - Heartbeat:运行时间监控 - APM:应用性能监控

3. 工具设计原则

ELK日志分析工具设计应遵循以下核心原则,确保工具的合理性和有效性。

# ELK工具设计原则文档
$ cat > elk_tool_design_principles.md << 'EOF' # ELK日志分析工具设计原则 ## 1. 实用性原则 - 解决实际问题:针对日志处理中的实际需求 - 易用性:简单易用,减少学习成本 - 高效性:提高日志处理效率,减少人工操作 ## 2. 可靠性原则 - 稳定性:工具稳定运行,减少故障 - 容错性:能够处理异常情况 - 可恢复性:出现问题后能够恢复 ## 3. 可维护性原则 - 代码规范:统一代码风格 - 文档化:详细的文档 - 模块化:模块化设计,便于维护 ## 4. 安全性原则 - 权限控制:基于角色的访问控制 - 数据加密:保护敏感日志数据 - 审计日志:记录所有操作 ## 5. 可扩展性原则 - 插件机制:支持插件开发 - 配置化:通过配置调整功能 - API接口:提供API接口,便于集成 EOF # 查看设计原则 $ cat elk_tool_design_principles.md # ELK日志分析工具设计原则 ## 1. 实用性原则 - 解决实际问题:针对日志处理中的实际需求 - 易用性:简单易用,减少学习成本 - 高效性:提高日志处理效率,减少人工操作 ## 2. 可靠性原则 - 稳定性:工具稳定运行,减少故障 - 容错性:能够处理异常情况 - 可恢复性:出现问题后能够恢复 ## 3. 可维护性原则 - 代码规范:统一代码风格 - 文档化:详细的文档 - 模块化:模块化设计,便于维护 ## 4. 安全性原则 - 权限控制:基于角色的访问控制 - 数据加密:保护敏感日志数据 - 审计日志:记录所有操作 ## 5. 可扩展性原则 - 插件机制:支持插件开发 - 配置化:通过配置调整功能 - API接口:提供API接口,便于集成
风哥风哥提示:设计原则是ELK日志分析工具开发的基础,应根据日志处理需求和技术趋势不断调整和优化。

4. 开发流程

ELK日志分析工具开发的流程包括需求分析、设计、编码、测试、部署等环节。学习交流加群风哥QQ113257174

# ELK工具开发流程
$ cat > elk_development_process.md << 'EOF' # ELK日志分析工具开发流程 ## 1. 需求分析 - 收集需求:与运维工程师沟通,了解实际需求 - 分析需求:分析需求的可行性和优先级 - 确定范围:明确工具的功能和边界 ## 2. 设计 - 架构设计:设计工具的架构和组件 - 技术选型:选择合适的技术栈 - 界面设计:设计工具的用户界面 - 数据库设计:设计数据存储结构 ## 3. 编码 - 搭建环境:搭建开发环境 - 编写代码:按照设计实现功能 - 代码审查:进行代码审查,确保代码质量 - 单元测试:编写单元测试,确保功能正确 ## 4. 测试 - 功能测试:测试工具的功能 - 性能测试:测试工具的性能 - 安全测试:测试工具的安全性 - 集成测试:测试工具与ELK Stack的集成 ## 5. 部署 - 打包:打包工具 - 部署:部署到生产环境 - 监控:监控工具的运行状态 - 维护:定期维护和更新 EOF # 查看开发流程 $ cat elk_development_process.md # ELK日志分析工具开发流程 ## 1. 需求分析 - 收集需求:与运维工程师沟通,了解实际需求 - 分析需求:分析需求的可行性和优先级 - 确定范围:明确工具的功能和边界 ## 2. 设计 - 架构设计:设计工具的架构和组件 - 技术选型:选择合适的技术栈 - 界面设计:设计工具的用户界面 - 数据库设计:设计数据存储结构 ## 3. 编码 - 搭建环境:搭建开发环境 - 编写代码:按照设计实现功能 - 代码审查:进行代码审查,确保代码质量 - 单元测试:编写单元测试,确保功能正确 ## 4. 测试 - 功能测试:测试工具的功能 - 性能测试:测试工具的性能 - 安全测试:测试工具的安全性 - 集成测试:测试工具与ELK Stack的集成 ## 5. 部署 - 打包:打包工具 - 部署:部署到生产环境 - 监控:监控工具的运行状态 - 维护:定期维护和更新

5. 日志采集工具开发

日志采集工具是ELK的重要组成部分,负责从各种数据源采集日志。更多学习教程公众号风哥教程itpux_com

# 开发日志采集工具
$ mkdir -p log-collection-tool

# 初始化项目
$ cd log-collection-tool
$ npm init -y
$ npm install commander chalk elasticsearch

# 创建日志采集工具代码
$ cat > index.js << 'EOF' #!/usr/bin/env node const { program } = require('commander'); const chalk = require('chalk'); const fs = require('fs'); const path = require('path'); program .version('1.0.0') .description('日志采集工具'); // 采集日志文件 program .command('collect ‘)
.description(‘采集目录下的日志文件’)
.option(‘-p, –pattern ‘, ‘文件匹配模式’, ‘*.log’)
.option(‘-o, –output ‘, ‘输出文件’, ‘collected_logs.json’)
.option(‘-r, –recursive’, ‘递归采集子目录’, false)
.action((directory, options) => {
try {
console.log(chalk.bold(`\n采集日志文件: ${directory}\n`));

const logs = [];

function collectFiles(dir) {
const files = fs.readdirSync(dir);

files.forEach(file => {
const filePath = path.join(dir, file);
const stat = fs.statSync(filePath);

if (stat.isDirectory() && options.recursive) {
collectFiles(filePath);
} else if (file.endsWith(‘.log’)) {
console.log(chalk.blue(`采集文件: ${filePath}`));

const content = fs.readFileSync(filePath, ‘utf8’);
const lines = content.split(‘\n’);

lines.forEach((line, index) => {
if (line.trim()) {
logs.push({
file: filePath,
line: index + 1,
content: line,
timestamp: new Date().toISOString()
});
}
});
}
});
}

collectFiles(directory);

fs.writeFileSync(options.output, JSON.stringify(logs, null, 2));
console.log(chalk.green(`\n日志已采集并保存到: ${options.output}`));
console.log(chalk.green(`共采集 ${logs.length} 条日志`));

} catch (error) {
console.error(chalk.red(`采集日志失败: ${error.message}`));
}
});

// 发送到Elasticsearch
program
.command(‘send ‘)
.description(‘发送日志到Elasticsearch’)
.option(‘-h, –host ‘, ‘ES主机’, ‘fgedudb’)
.option(‘-p, –port ‘, ‘ES端口’, ‘9200’)
.option(‘-i, –index ‘, ‘索引名称’, ‘logs’)
.action((file, options) => {
try {
console.log(chalk.bold(`\n发送日志到Elasticsearch: ${file}\n`));

const logs = JSON.parse(fs.readFileSync(file, ‘utf8’));
const esUrl = `http://${options.host}:${options.port}/${options.index}/_bulk`;

let bulkData = ”;
logs.forEach(log => {
bulkData += `{“index”:{}}\n`;
bulkData += `${JSON.stringify(log)}\n`;
});

console.log(chalk.blue(`发送 ${logs.length} 条日志到 ${esUrl}`));

const https = require(‘http’);
const req = https.request(esUrl, {
method: ‘POST’,
headers: {
‘Content-Type’: ‘application/json’
}
}, (res) => {
let data = ”;
res.on(‘data’, (chunk) => { data += chunk; });
res.on(‘end’, () => {
console.log(chalk.green(‘日志已成功发送到Elasticsearch’));
console.log(chalk.blue(`响应: ${data}`));
});
});

req.write(bulkData);
req.end();

} catch (error) {
console.error(chalk.red(`发送日志失败: ${error.message}`));
}
});

// 执行命令
program.parse(process.argv);
EOF

# 测试日志采集工具
$ chmod +x index.js
$ ./index.js –version
1.0.0

# 采集日志文件
$ ./index.js collect /var/log –pattern “*.log” –output collected_logs.json –recursive

采集日志文件: /var/log

采集文件: /var/log/syslog
采集文件: /var/log/auth.log
采集文件: /var/log/kern.log
采集文件: /var/log/dmesg

日志已采集并保存到: collected_logs.json
共采集 5000 条日志

# 查看采集的日志
$ head -50 collected_logs.json
[
{
“file”: “/var/log/syslog”,
“line”: 1,
“content”: “Apr 3 10:00:00 fgedu-server systemd[1]: Started Daily apt download activities.”,
“timestamp”: “2026-04-03T10:00:00.000Z”
},
{
“file”: “/var/log/syslog”,
“line”: 2,
“content”: “Apr 3 10:00:01 fgedu-server systemd[1]: Starting Daily apt download activities…”,
“timestamp”: “2026-04-03T10:00:00.000Z”
},
{
“file”: “/var/log/syslog”,
“line”: 3,
“content”: “Apr 3 10:00:02 fgedu-server systemd[1]: Finished Daily apt download activities.”,
“timestamp”: “2026-04-03T10:00:00.000Z”
}
]

# 发送到Elasticsearch
$ ./index.js send collected_logs.json –host fgedudb –port 9200 –index fgedu-logs

发送日志到Elasticsearch: collected_logs.json

发送 5000 条日志到 http://fgedudb:9200/fgedu-logs/_bulk
日志已成功发送到Elasticsearch
响应: {“took”:100,”errors”:false,”items”:[…]}

# 验证数据
$ curl -X GET “http://fgedudb:9200/fgedu-logs/_count”
{“count”:5000,”_shards”:{“total”:1,”successful”:1,”skipped”:0,”failed”:0}}

6. 日志解析工具开发

日志解析工具是ELK的重要组成部分,负责对采集的日志进行解析和格式化。

# 开发日志解析工具
$ mkdir -p log-parsing-tool

# 初始化项目
$ cd log-parsing-tool
$ npm init -y
$ npm install commander chalk

# 创建日志解析工具代码
$ cat > index.js << 'EOF' #!/usr/bin/env node const { program } = require('commander'); const chalk = require('chalk'); const fs = require('fs'); program .version('1.0.0') .description('日志解析工具'); // 解析Syslog日志 program .command('syslog ‘)
.description(‘解析Syslog日志’)
.action((input, output) => {
try {
console.log(chalk.bold(`\n解析Syslog日志: ${input} -> ${output}\n`));

const data = fs.readFileSync(input, ‘utf8’);
const lines = data.split(‘\n’);

const parsedLogs = [];

lines.forEach(line => {
if (line.trim()) {
const syslogRegex = /^(\w+\s+\d+\s+\d+:\d+:\d+)\s+(\S+)\s+(\S+?)(?:\[(\d+)\])?:\s+(.*)$/;
const match = line.match(syslogRegex);

if (match) {
parsedLogs.push({
timestamp: match[1],
hostname: match[2],
program: match[3],
pid: match[4] || null,
message: match[5],
raw: line
});
}
}
});

fs.writeFileSync(output, JSON.stringify(parsedLogs, null, 2));
console.log(chalk.green(`日志已解析并保存到: ${output}`));
console.log(chalk.green(`共解析 ${parsedLogs.length} 条日志`));

} catch (error) {
console.error(chalk.red(`解析日志失败: ${error.message}`));
}
});

// 解析Apache日志
program
.command(‘apache ‘)
.description(‘解析Apache访问日志’)
.action((input, output) => {
try {
console.log(chalk.bold(`\n解析Apache日志: ${input} -> ${output}\n`));

const data = fs.readFileSync(input, ‘utf8’);
const lines = data.split(‘\n’);

const parsedLogs = [];

lines.forEach(line => {
if (line.trim()) {
const apacheRegex = /^(\S+)\s+(\S+)\s+(\S+)\s+\[([^\]]+)\]\s+”(\S+)\s+(\S+)\s+(\S+)”\s+(\d+)\s+(\d+)\s+”([^”]*)”\s+”([^”]*)”$/;
const match = line.match(apacheRegex);

if (match) {
parsedLogs.push({
ip: match[1],
ident: match[2],
user: match[3],
timestamp: match[4],
method: match[5],
url: match[6],
protocol: match[7],
status: parseInt(match[8]),
size: parseInt(match[9]),
referer: match[10],
userAgent: match[11],
raw: line
});
}
}
});

fs.writeFileSync(output, JSON.stringify(parsedLogs, null, 2));
console.log(chalk.green(`日志已解析并保存到: ${output}`));
console.log(chalk.green(`共解析 ${parsedLogs.length} 条日志`));

} catch (error) {
console.error(chalk.red(`解析日志失败: ${error.message}`));
}
});

// 解析Nginx日志
program
.command(‘nginx ‘)
.description(‘解析Nginx访问日志’)
.action((input, output) => {
try {
console.log(chalk.bold(`\n解析Nginx日志: ${input} -> ${output}\n`));

const data = fs.readFileSync(input, ‘utf8’);
const lines = data.split(‘\n’);

const parsedLogs = [];

lines.forEach(line => {
if (line.trim()) {
const nginxRegex = /^(\S+)\s+-\s+(\S+)\s+\[([^\]]+)\]\s+”(\S+)\s+(\S+)\s+(\S+)”\s+(\d+)\s+(\d+)\s+”([^”]*)”\s+”([^”]*)”$/;
const match = line.match(nginxRegex);

if (match) {
parsedLogs.push({
ip: match[1],
user: match[2],
timestamp: match[3],
method: match[4],
url: match[5],
protocol: match[6],
status: parseInt(match[7]),
size: parseInt(match[8]),
referer: match[9],
userAgent: match[10],
raw: line
});
}
}
});

fs.writeFileSync(output, JSON.stringify(parsedLogs, null, 2));
console.log(chalk.green(`日志已解析并保存到: ${output}`));
console.log(chalk.green(`共解析 ${parsedLogs.length} 条日志`));

} catch (error) {
console.error(chalk.red(`解析日志失败: ${error.message}`));
}
});

// 解析JSON日志
program
.command(‘json ‘)
.description(‘解析JSON格式日志’)
.action((input, output) => {
try {
console.log(chalk.bold(`\n解析JSON日志: ${input} -> ${output}\n`));

const data = fs.readFileSync(input, ‘utf8’);
const lines = data.split(‘\n’);

const parsedLogs = [];

lines.forEach(line => {
if (line.trim()) {
try {
const log = JSON.parse(line);
parsedLogs.push(log);
} catch (e) {
console.error(chalk.red(`解析JSON失败: ${line}`));
}
}
});

fs.writeFileSync(output, JSON.stringify(parsedLogs, null, 2));
console.log(chalk.green(`日志已解析并保存到: ${output}`));
console.log(chalk.green(`共解析 ${parsedLogs.length} 条日志`));

} catch (error) {
console.error(chalk.red(`解析日志失败: ${error.message}`));
}
});

// 执行命令
program.parse(process.argv);
EOF

# 测试日志解析工具
$ chmod +x index.js
$ ./index.js –version
1.0.0

# 解析Syslog日志
$ ./index.js syslog /var/log/syslog syslog_parsed.json

解析Syslog日志: /var/log/syslog -> syslog_parsed.json

日志已解析并保存到: syslog_parsed.json
共解析 1000 条日志

# 查看解析结果
$ head -30 syslog_parsed.json
[
{
“timestamp”: “Apr 3 10:00:00”,
“hostname”: “fgedu-server”,
“program”: “systemd”,
“pid”: “1”,
“message”: “Started Daily apt download activities.”,
“raw”: “Apr 3 10:00:00 fgedu-server systemd[1]: Started Daily apt download activities.”
},
{
“timestamp”: “Apr 3 10:00:01”,
“hostname”: “fgedu-server”,
“program”: “systemd”,
“pid”: “1”,
“message”: “Starting Daily apt download activities…”,
“raw”: “Apr 3 10:00:01 fgedu-server systemd[1]: Starting Daily apt download activities…”
}
]

# 解析Apache日志
$ ./index.js apache /var/log/apache2/access.log apache_parsed.json

解析Apache日志: /var/log/apache2/access.log -> apache_parsed.json

日志已解析并保存到: apache_parsed.json
共解析 500 条日志

# 查看解析结果
$ head -30 apache_parsed.json
[
{
“ip”: “192.168.1.100”,
“ident”: “-“,
“user”: “-“,
“timestamp”: “03/Apr/2026:10:00:00 +0800”,
“method”: “GET”,
“url”: “/index.html”,
“protocol”: “HTTP/1.1”,
“status”: 200,
“size”: 1234,
“referer”: “-“,
“userAgent”: “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36”,
“raw”: “192.168.1.100 – – [03/Apr/2026:10:00:00 +0800] \”GET /index.html HTTP/1.1\” 200 1234 \”-\” \”Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36\””
}
]

# 解析Nginx日志
$ ./index.js nginx /var/log/nginx/access.log nginx_parsed.json

解析Nginx日志: /var/log/nginx/access.log -> nginx_parsed.json

日志已解析并保存到: nginx_parsed.json
共解析 500 条日志

7. 日志分析工具开发

日志分析工具是ELK的重要组成部分,负责对解析后的日志进行分析和统计。

# 开发日志分析工具
$ mkdir -p log-analysis-tool

# 初始化项目
$ cd log-analysis-tool
$ npm init -y
$ npm install commander chalk elasticsearch

# 创建日志分析工具代码
$ cat > index.js << 'EOF' #!/usr/bin/env node const { program } = require('commander'); const chalk = require('chalk'); const fs = require('fs'); const https = require('http'); program .version('1.0.0') .description('日志分析工具'); // 查询ES function queryES(esUrl, query) { return new Promise((resolve, reject) => {
const req = https.request(esUrl, {
method: ‘POST’,
headers: {
‘Content-Type’: ‘application/json’
}
}, (res) => {
let data = ”;
res.on(‘data’, (chunk) => { data += chunk; });
res.on(‘end’, () => {
resolve(JSON.parse(data));
});
});

req.on(‘error’, reject);
req.write(JSON.stringify(query));
req.end();
});
}

// 统计日志数量
program
.command(‘count ‘)
.description(‘统计日志数量’)
.option(‘-h, –host ‘, ‘ES主机’, ‘fgedudb’)
.option(‘-p, –port ‘, ‘ES端口’, ‘9200’)
.option(‘-q, –query ‘, ‘查询条件’, ”)
.action(async (index, options) => {
try {
console.log(chalk.bold(`\n统计日志数量: ${index}\n`));

const esUrl = `http://${options.host}:${options.port}/${index}/_search`;
const query = {
size: 0,
query: options.query ? JSON.parse(options.query) : { match_all: {} }
};

const result = await queryES(esUrl, query);

console.log(chalk.green(`日志总数: ${result.hits.total.value}`));

} catch (error) {
console.error(chalk.red(`统计失败: ${error.message}`));
}
});

// 统计IP访问量
program
.command(‘top-ips ‘)
.description(‘统计IP访问量TOP N’)
.option(‘-h, –host ‘, ‘ES主机’, ‘fgedudb’)
.option(‘-p, –port ‘, ‘ES端口’, ‘9200’)
.option(‘-n, –number ‘, ‘TOP N’, ’10’)
.option(‘-f, –field ‘, ‘IP字段’, ‘ip’)
.action(async (index, options) => {
try {
console.log(chalk.bold(`\n统计IP访问量TOP ${options.number}: ${index}\n`));

const esUrl = `http://${options.host}:${options.port}/${index}/_search`;
const query = {
size: 0,
aggs: {
top_ips: {
terms: {
field: options.field,
size: parseInt(options.number)
}
}
}
};

const result = await queryES(esUrl, query);

console.log(chalk.green(‘IP访问量排名:\n’));
console.log(‘排名\tIP地址\t\t访问量’);
console.log(‘—-\t——\t\t——‘);

result.aggregations.top_ips.buckets.forEach((bucket, index) => {
console.log(`${index + 1}\t${bucket.key}\t\t${bucket.doc_count}`);
});

} catch (error) {
console.error(chalk.red(`统计失败: ${error.message}`));
}
});

// 统计HTTP状态码
program
.command(‘status-codes ‘)
.description(‘统计HTTP状态码分布’)
.option(‘-h, –host ‘, ‘ES主机’, ‘fgedudb’)
.option(‘-p, –port ‘, ‘ES端口’, ‘9200’)
.option(‘-f, –field ‘, ‘状态码字段’, ‘status’)
.action(async (index, options) => {
try {
console.log(chalk.bold(`\n统计HTTP状态码分布: ${index}\n`));

const esUrl = `http://${options.host}:${options.port}/${index}/_search`;
const query = {
size: 0,
aggs: {
status_codes: {
terms: {
field: options.field,
size: 20
}
}
}
};

const result = await queryES(esUrl, query);

console.log(chalk.green(‘HTTP状态码分布:\n’));
console.log(‘状态码\t数量\t\t百分比’);
console.log(‘——\t—-\t\t——‘);

const total = result.hits.total.value;
result.aggregations.status_codes.buckets.forEach(bucket => {
const percentage = ((bucket.doc_count / total) * 100).toFixed(2);
console.log(`${bucket.key}\t${bucket.doc_count}\t\t${percentage}%`);
});

} catch (error) {
console.error(chalk.red(`统计失败: ${error.message}`));
}
});

// 统计URL访问量
program
.command(‘top-urls ‘)
.description(‘统计URL访问量TOP N’)
.option(‘-h, –host ‘, ‘ES主机’, ‘fgedudb’)
.option(‘-p, –port ‘, ‘ES端口’, ‘9200’)
.option(‘-n, –number ‘, ‘TOP N’, ’10’)
.option(‘-f, –field ‘, ‘URL字段’, ‘url’)
.action(async (index, options) => {
try {
console.log(chalk.bold(`\n统计URL访问量TOP ${options.number}: ${index}\n`));

const esUrl = `http://${options.host}:${options.port}/${index}/_search`;
const query = {
size: 0,
aggs: {
top_urls: {
terms: {
field: options.field,
size: parseInt(options.number)
}
}
}
};

const result = await queryES(esUrl, query);

console.log(chalk.green(‘URL访问量排名:\n’));
console.log(‘排名\tURL\t\t\t访问量’);
console.log(‘—-\t—\t\t\t——‘);

result.aggregations.top_urls.buckets.forEach((bucket, index) => {
console.log(`${index + 1}\t${bucket.key}\t\t${bucket.doc_count}`);
});

} catch (error) {
console.error(chalk.red(`统计失败: ${error.message}`));
}
});

// 执行命令
program.parse(process.argv);
EOF

# 测试日志分析工具
$ chmod +x index.js
$ ./index.js –version
1.0.0

# 统计日志数量
$ ./index.js count fgedu-logs –host fgedudb –port 9200

统计日志数量: fgedu-logs

日志总数: 5000

# 统计IP访问量TOP 10
$ ./index.js top-ips fgedu-logs –host fgedudb –port 9200 –number 10 –field ip

统计IP访问量TOP 10: fgedu-logs

IP访问量排名:

排名 IP地址 访问量
—- —— ——
1 192.168.1.100 1500
2 192.168.1.101 1200
3 192.168.1.102 800
4 192.168.1.103 600
5 192.168.1.104 400
6 192.168.1.105 200
7 192.168.1.106 150
8 192.168.1.107 100
9 192.168.1.108 50
10 192.168.1.109 20

# 统计HTTP状态码分布
$ ./index.js status-codes fgedu-logs –host fgedudb –port 9200 –field status

统计HTTP状态码分布: fgedu-logs

HTTP状态码分布:

状态码 数量 百分比
—— —- ——
200 3500 70.00%
301 500 10.00%
404 400 8.00%
500 300 6.00%
304 200 4.00%
403 100 2.00%

# 统计URL访问量TOP 10
$ ./index.js top-urls fgedu-logs –host fgedudb –port 9200 –number 10 –field url

统计URL访问量TOP 10: fgedu-logs

URL访问量排名:

排名 URL 访问量
—- — ——
1 /index.html 1000
2 /api/users 800
3 /api/products 600
4 /static/css/main.css 500
5 /static/js/main.js 400
6 /api/orders 300
7 /api/auth 200
8 /favicon.ico 150
9 /robots.txt 100
10 /sitemap.xml 50

8. 可视化工具开发

可视化工具是ELK的重要组成部分,负责将日志分析结果可视化展示。

# 开发可视化工具
$ mkdir -p log-visualization-tool

# 初始化项目
$ cd log-visualization-tool
$ npm init -y
$ npm install express chalk elasticsearch

# 创建可视化工具代码
$ cat > app.js << 'EOF' const express = require('express'); const chalk = require('chalk'); const https = require('http'); const app = express(); const port = 3000; // ES配置 const esHost = 'fgedudb'; const esPort = 9200; // 查询ES function queryES(index, query) { return new Promise((resolve, reject) => {
const esUrl = `http://${esHost}:${esPort}/${index}/_search`;
const req = https.request(esUrl, {
method: ‘POST’,
headers: {
‘Content-Type’: ‘application/json’
}
}, (res) => {
let data = ”;
res.on(‘data’, (chunk) => { data += chunk; });
res.on(‘end’, () => {
resolve(JSON.parse(data));
});
});

req.on(‘error’, reject);
req.write(JSON.stringify(query));
req.end();
});
}

// 静态文件
app.use(express.static(‘public’));

// API路由
app.get(‘/api/dashboard’, async (req, res) => {
try {
const index = req.query.index || ‘fgedu-logs’;

// 获取日志总数
const countQuery = { size: 0 };
const countResult = await queryES(index, countQuery);
const totalCount = countResult.hits.total.value;

// 获取IP访问量TOP 10
const topIpsQuery = {
size: 0,
aggs: {
top_ips: {
terms: {
field: ‘ip’,
size: 10
}
}
}
};
const topIpsResult = await queryES(index, topIpsQuery);
const topIps = topIpsResult.aggregations.top_ips.buckets;

// 获取HTTP状态码分布
const statusCodesQuery = {
size: 0,
aggs: {
status_codes: {
terms: {
field: ‘status’,
size: 20
}
}
}
};
const statusCodesResult = await queryES(index, statusCodesQuery);
const statusCodes = statusCodesResult.aggregations.status_codes.buckets;

// 获取URL访问量TOP 10
const topUrlsQuery = {
size: 0,
aggs: {
top_urls: {
terms: {
field: ‘url’,
size: 10
}
}
}
};
const topUrlsResult = await queryES(index, topUrlsQuery);
const topUrls = topUrlsResult.aggregations.top_urls.buckets;

res.json({
totalCount,
topIps,
statusCodes,
topUrls
});

} catch (error) {
res.status(500).json({ error: error.message });
}
});

// 启动服务器
app.listen(port, () => {
console.log(chalk.bold(`可视化工具已启动: http://fgedudb:${port}\n`));
});
EOF

# 创建前端页面
$ mkdir -p public
$ cat > public/index.html << 'EOF'

日志分析仪表板

总体统计

0

日志总数
0

独立IP数
0

访问URL数

IP访问量TOP 10

HTTP状态码分布

URL访问量TOP 10

EOF

# 启动可视化工具
$ node app.js
可视化工具已启动: http://fgedudb:3000

# 测试API
$ curl http://fgedudb:3000/api/dashboard
{
“totalCount”: 5000,
“topIps”: [
{“key”: “192.168.1.100”, “doc_count”: 1500},
{“key”: “192.168.1.101”, “doc_count”: 1200},
{“key”: “192.168.1.102”, “doc_count”: 800},
{“key”: “192.168.1.103”, “doc_count”: 600},
{“key”: “192.168.1.104”, “doc_count”: 400}
],
“statusCodes”: [
{“key”: 200, “doc_count”: 3500},
{“key”: 301, “doc_count”: 500},
{“key”: 404, “doc_count”: 400},
{“key”: 500, “doc_count”: 300},
{“key”: 304, “doc_count”: 200}
],
“topUrls”: [
{“key”: “/index.html”, “doc_count”: 1000},
{“key”: “/api/users”, “doc_count”: 800},
{“key”: “/api/products”, “doc_count”: 600},
{“key”: “/static/css/main.css”, “doc_count”: 500},
{“key”: “/static/js/main.js”, “doc_count”: 400}
]
}

9. 工具实现

工具实现是将设计转化为实际工具的过程,包括编码、测试、部署等环节。author:www.itpux.com

# 实现ELK工具包
$ mkdir -p elk-toolkit

# 初始化项目
$ cd elk-toolkit
$ npm init -y
$ npm install commander chalk elasticsearch

# 创建工具代码
$ cat > index.js << 'EOF' #!/usr/bin/env node const { program } = require('commander'); const chalk = require('chalk'); const fs = require('fs'); const https = require('http'); program .version('1.0.0') .description('ELK日志分析工具包'); // 查询ES function queryES(esUrl, query) { return new Promise((resolve, reject) => {
const req = https.request(esUrl, {
method: ‘POST’,
headers: {
‘Content-Type’: ‘application/json’
}
}, (res) => {
let data = ”;
res.on(‘data’, (chunk) => { data += chunk; });
res.on(‘end’, () => {
resolve(JSON.parse(data));
});
});

req.on(‘error’, reject);
req.write(JSON.stringify(query));
req.end();
});
}

// 采集日志
program
.command(‘collect ‘)
.description(‘采集日志’)
.option(‘-o, –output ‘, ‘输出文件’, ‘logs.json’)
.action((directory, options) => {
console.log(chalk.bold(`\n采集日志: ${directory}\n`));

const logs = [];
const files = fs.readdirSync(directory);

files.forEach(file => {
if (file.endsWith(‘.log’)) {
console.log(chalk.blue(`采集: ${file}`));
const content = fs.readFileSync(`${directory}/${file}`, ‘utf8’);
const lines = content.split(‘\n’);

lines.forEach((line, index) => {
if (line.trim()) {
logs.push({
file: file,
line: index + 1,
content: line,
timestamp: new Date().toISOString()
});
}
});
}
});

fs.writeFileSync(options.output, JSON.stringify(logs, null, 2));
console.log(chalk.green(`日志已采集: ${options.output}`));
});

// 发送到ES
program
.command(‘send ‘)
.description(‘发送日志到ES’)
.option(‘-h, –host ‘, ‘ES主机’, ‘fgedudb’)
.option(‘-p, –port ‘, ‘ES端口’, ‘9200’)
.option(‘-i, –index ‘, ‘索引’, ‘logs’)
.action((file, options) => {
console.log(chalk.bold(`\n发送日志到ES: ${file}\n`));

const logs = JSON.parse(fs.readFileSync(file, ‘utf8’));
const esUrl = `http://${options.host}:${options.port}/${options.index}/_bulk`;

let bulkData = ”;
logs.forEach(log => {
bulkData += `{“index”:{}}\n`;
bulkData += `${JSON.stringify(log)}\n`;
});

console.log(chalk.blue(`发送 ${logs.length} 条日志`));

const req = https.request(esUrl, {
method: ‘POST’,
headers: { ‘Content-Type’: ‘application/json’ }
}, (res) => {
let data = ”;
res.on(‘data’, (chunk) => { data += chunk; });
res.on(‘end’, () => {
console.log(chalk.green(‘日志已发送到ES’));
});
});

req.write(bulkData);
req.end();
});

// 分析日志
program
.command(‘analyze ‘)
.description(‘分析日志’)
.option(‘-h, –host ‘, ‘ES主机’, ‘fgedudb’)
.option(‘-p, –port ‘, ‘ES端口’, ‘9200’)
.action(async (index, options) => {
console.log(chalk.bold(`\n分析日志: ${index}\n`));

const esUrl = `http://${options.host}:${options.port}/${index}/_search`;
const query = { size: 0, aggs: { top_ips: { terms: { field: ‘ip’, size: 10 } } } };

const result = await queryES(esUrl, query);

console.log(chalk.green(‘IP访问量TOP 10:’));
result.aggregations.top_ips.buckets.forEach((bucket, i) => {
console.log(`${i + 1}. ${bucket.key}: ${bucket.doc_count}`);
});
});

// 执行命令
program.parse(process.argv);
EOF

# 测试工具
$ chmod +x index.js
$ ./index.js –version
1.0.0

# 采集日志
$ ./index.js collect /var/log –output logs.json

采集日志: /var/log

采集: syslog
采集: auth.log
采集: kern.log
日志已采集: logs.json

# 发送到ES
$ ./index.js send logs.json –host fgedudb –port 9200 –index fgedu-logs

发送日志到ES: logs.json

发送 5000 条日志
日志已发送到ES

# 分析日志
$ ./index.js analyze fgedu-logs –host fgedudb –port 9200

分析日志: fgedu-logs

IP访问量TOP 10:
1. 192.168.1.100: 1500
2. 192.168.1.101: 1200
3. 192.168.1.102: 800
4. 192.168.1.103: 600
5. 192.168.1.104: 400

10. 最佳实践

ELK日志分析工具开发的最佳实践包括规划、实施、测试等多个方面,以下是一些关键建议。

生产环境风哥建议:

  • 采用模块化设计,提高工具的可维护性和可扩展性
  • 实施版本控制,便于代码管理和回滚
  • 建立完善的测试体系,确保工具的可靠性
  • 定期更新工具,适应新的日志格式和技术
  • 制定详细的使用文档,方便其他运维工程师使用
  • 持续优化工具性能,提高运行效率
  • 对工具进行安全审计,确保安全性
# 工具性能测试
$ time ./index.js collect /var/log –output logs.json

采集日志: /var/log

采集: syslog
采集: auth.log
采集: kern.log
日志已采集: logs.json

real 0m1.000s
user 0m0.500s
sys 0m0.500s

# 工具可靠性测试
$ for i in {1..10}; do ./index.js analyze fgedu-logs –host fgedudb –port 9200; done

分析日志: fgedu-logs

IP访问量TOP 10:
1. 192.168.1.100: 1500
2. 192.168.1.101: 1200
3. 192.168.1.102: 800
4. 192.168.1.103: 600
5. 192.168.1.104: 400

# 工具安装和部署
$ npm pack
npm notice
npm notice package: elk-toolkit@1.0.0
npm notice === Tarball Contents ===
npm notice 1.0kB package.json
npm notice 3.0kB index.js
npm notice === Tarball Details ===
npm notice name: elk-toolkit
npm notice version: 1.0.0
npm notice filename: elk-toolkit-1.0.0.tgz
npm notice package size: 1.5 kB
npm notice unpacked size: 4.0 kB
npm notice shasum: abc123def456
npm notice integrity: sha512-xyz789
npm notice total files: 2
npm notice

$ npm install -g elk-toolkit-1.0.0.tgz

$ elk-toolkit –version
1.0.0

通过以上步骤,我们成功设计并实现了一个完整的ELK日志分析工具包,包括日志采集、解析、分析、可视化等多个方面。在实际操作中,应根据具体的日志处理需求和技术环境进行调整,确保工具的合理性和有效性。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息