本文详细介绍Hadoop实时数据服务层建设实战,包括服务层架构设计、API服务、缓存层、网关层、监控告警等内容,风哥教程参考Kafka、Spark、Redis等官方文档,适合大数据架构师和工程师在生产环境中使用。学习交流加群风哥微信: itpux-com
Part01-基础概念与理论知识
1.1 实时数据服务层概述
实时数据服务层是连接大数据存储和上层应用的关键组件,提供低延迟、高并发的数据查询服务。更多视频教程www.fgedu.net.cn
- 低延迟查询:毫秒级响应
- 高并发支持:支持数万QPS
- 统一数据出口:标准化API
- 安全认证:权限控制
- 监控告警:实时监控服务状态
- 弹性伸缩:根据负载自动扩缩容
1.2 实时数据服务架构
实时数据服务层典型架构:
接入层 → 网关层 → API服务层 → 缓存层 → 数据存储层
# 各层职责
接入层:
– Nginx/HAProxy:负载均衡
– TLS/SSL:加密传输
– 限流熔断:防止系统过载
网关层:
– API Gateway:统一入口
– 路由转发:请求分发
– 认证鉴权:权限验证
– 日志记录:访问日志
API服务层:
– RESTful API:标准化接口
– GraphQL:灵活查询
– gRPC:高性能RPC
缓存层:
– Redis:热点数据缓存
– Caffeine:本地缓存
– 多级缓存:提升性能
数据存储层:
– HBase:实时查询
– ClickHouse/Doris:OLAP查询
– Hudi/Iceberg:数据湖查询
– ElasticSearch:全文检索
1.3 核心服务组件
实时数据服务层核心组件:
- Nginx/HAProxy:负载均衡和反向代理
- Spring Cloud Gateway:API网关
- Spring Boot:微服务框架
- Redis Cluster:分布式缓存
- Prometheus/Grafana:监控告警
- ELK:日志收集分析
Part02-生产环境规划与建议
2.1 服务层架构规划
服务层架构规划要点:
接入层:
– 服务器数量:2-4台
– 配置:8核16GB
– 组件:Nginx + Keepalived
网关层:
– 服务器数量:3-5台
– 配置:16核32GB
– 组件:Spring Cloud Gateway
API服务层:
– 服务器数量:5-10台
– 配置:16核32GB
– 组件:Spring Boot应用
缓存层:
– Redis Cluster:3主3从
– 配置:16核64GB
– 内存:每实例32GB
# 网络规划
– 接入层公网IP,其他层内网
– 服务间通信使用内网
– 网络带宽:千兆或万兆
– 网络延迟:<2ms
2.2 资源配置建议
资源配置建议:
-Xms8g -Xmx8g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/bigdata/fgdata/heapdump
-XX:+PrintGCDetails
-XX:+PrintGCDateStamps
-Xloggc:/bigdata/fgdata/logs/gc.log
# 线程池配置
corePoolSize: 200
maxPoolSize: 500
queueCapacity: 10000
keepAliveSeconds: 60
# 连接池配置
maxTotal: 200
maxIdle: 50
minIdle: 10
maxWaitMillis: 3000
# 超时配置
连接超时: 3s
读取超时: 10s
写入超时: 10s
2.3 SLA设计与保障
SLA设计与保障:
- 可用性:99.9%(年度 downtime < 8.76小时)
- 响应时间:P99 < 200ms,P95 < 100ms
- QPS:支持10000+ QPS
- 错误率:< 0.1%
- 数据一致性:最终一致或强一致
更多学习教程公众号风哥教程itpux_com
Part03-生产环境项目实施方案
3.1 API服务层建设
3.1.1 使用Spring Boot构建API服务
$ mkdir -p /bigdata/app/fgedu-data-service
$ cd /bigdata/app/fgedu-data-service
# pom.xml配置
<?xml version=”1.0″ encoding=”UTF-8″?>
<project xmlns=”http://maven.apache.org/POM/4.0.0″
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd”>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
</parent>
<groupId>cn.fgedu</groupId>
<artifactId>fgedu-data-service</artifactId>
<version>1.0.0</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.9</version>
</dependency>
</dependencies>
</project>
# application.yml配置
server:
port: 8080
spring:
application:
name: fgedu-data-service
redis:
cluster:
nodes: 192.168.1.100:7001,192.168.1.101:7001,192.168.1.102:7001
timeout: 3000
lettuce:
pool:
max-active: 200
max-idle: 50
min-idle: 10
# 创建UserController
@RestController
@RequestMapping(“/api/v1/user”)
public class UserController {
@Autowired
private UserService userService;
@GetMapping(“/profile/{userId}”)
public ResponseEntity<UserProfile> getUserProfile(@PathVariable String userId) {
UserProfile profile = userService.getUserProfile(userId);
return ResponseEntity.ok(profile);
}
}
# 创建UserService
@Service
public class UserService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private HBaseTemplate hBaseTemplate;
private static final String USER_PROFILE_CACHE_KEY = “user:profile:”;
public UserProfile getUserProfile(String userId) {
String cacheKey = USER_PROFILE_CACHE_KEY + userId;
UserProfile profile = (UserProfile) redisTemplate.opsForValue().get(cacheKey);
if (profile != null) {
return profile;
}
profile = hBaseTemplate.queryUserProfile(userId);
if (profile != null) {
redisTemplate.opsForValue().set(cacheKey, profile, 1, TimeUnit.HOURS);
}
return profile;
}
}
# 启动服务
$ mvn clean package
$ java -jar fgedu-data-service-1.0.0.jar
3.2 缓存层建设
3.2.1 Redis Cluster部署
# install_redis_cluster.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
# 1. 下载安装Redis
cd /bigdata/app
wget https://download.redis.io/releases/redis-7.0.12.tar.gz
tar -zxvf redis-7.0.12.tar.gz
cd redis-7.0.12
make
make install
# 2. 创建节点目录
mkdir -p /bigdata/fgdata/redis/{7001,7002,7003}
# 3. 配置文件
cat > /bigdata/fgdata/redis/7001/redis.conf << ‘EOF’
port 7001
cluster-enabled yes
cluster-config-file nodes-7001.conf
cluster-node-timeout 5000
appendonly yes
dir /bigdata/fgdata/redis/7001
bind 0.0.0.0
protected-mode no
maxmemory 32gb
maxmemory-policy allkeys-lru
EOF
# 复制配置到其他节点
sed ‘s/7001/7002/g’ /bigdata/fgdata/redis/7001/redis.conf > /bigdata/fgdata/redis/7002/redis.conf
sed ‘s/7001/7003/g’ /bigdata/fgdata/redis/7001/redis.conf > /bigdata/fgdata/redis/7003/redis.conf
# 4. 启动Redis节点
redis-server /bigdata/fgdata/redis/7001/redis.conf &
redis-server /bigdata/fgdata/redis/7002/redis.conf &
redis-server /bigdata/fgdata/redis/7003/redis.conf &
# 5. 创建集群
redis-cli –cluster create \
192.168.1.100:7001 \
192.168.1.101:7001 \
192.168.1.102:7001 \
–cluster-replicas 1
# 6. 验证集群
redis-cli -c -p 7001 cluster info
redis-cli -c -p 7001 cluster nodes
# 7. 测试写入
redis-cli -c -p 7001 set fgedu:test “hello world”
redis-cli -c -p 7001 get fgedu:test
3.3 网关层建设
3.3.1 Spring Cloud Gateway部署
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
# application.yml
spring:
cloud:
gateway:
routes:
– id: user-service
uri: lb://FGEDU-DATA-SERVICE
predicates:
– Path=/api/v1/user/**
filters:
– StripPrefix=0
– name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 100
redis-rate-limiter.burstCapacity: 200
– id: order-service
uri: lb://FGEDU-ORDER-SERVICE
predicates:
– Path=/api/v1/order/**
filters:
– StripPrefix=0
discovery:
locator:
enabled: true
# 全局过滤器
@Component
public class AuthFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String token = request.getHeaders().getFirst(“Authorization”);
if (token == null || !validateToken(token)) {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
@Override
public int getOrder() {
return -100;
}
}
# Nginx配置
upstream gateway_cluster {
server 192.168.1.100:8080;
server 192.168.1.101:8080;
server 192.168.1.102:8080;
keepalive 32;
}
server {
listen 80;
server_name api.fgedu.net.cn;
location / {
proxy_pass http://gateway_cluster;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection “”;
}
}
Part04-生产案例与实战讲解
4.1 用户画像查询服务实战
4.1.1 用户画像服务实现
CREATE TABLE fgedu_user_profile (
user_id STRING,
name STRING,
age INT,
gender STRING,
city STRING,
tags ARRAY<STRING>,
update_time BIGINT
)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (
‘hbase.columns.mapping’ = ‘:key,info:name,info:age,info:gender,info:city,profile:tags,profile:update_time’
)
TBLPROPERTIES (‘hbase.table.name’ = ‘fgedu_user_profile’);
# HBase查询实现
public class HBaseTemplate {
private Connection connection;
public UserProfile queryUserProfile(String userId) {
try (Table table = connection.getTable(TableName.valueOf(“fgedu_user_profile”))) {
Get get = new Get(Bytes.toBytes(userId));
Result result = table.get(get);
if (result.isEmpty()) {
return null;
}
UserProfile profile = new UserProfile();
profile.setUserId(userId);
profile.setName(Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“name”))));
profile.setAge(Bytes.toInt(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“age”))));
profile.setGender(Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“gender”))));
profile.setCity(Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“city”))));
return profile;
} catch (IOException e) {
throw new RuntimeException(“HBase query error”, e);
}
}
}
# API测试
$ curl -X GET http://api.fgedu.net.cn/api/v1/user/profile/fgedu001 \
-H “Authorization: Bearer your-token-here”
{
“userId”: “fgedu001”,
“name”: “风哥”,
“age”: 28,
“gender”: “男”,
“city”: “北京”,
“tags”: [“大数据”, “Java”, “架构师”],
“updateTime”: 1712345678901
}
# 性能测试
$ ab -n 10000 -c 100 http://api.fgedu.net.cn/api/v1/user/profile/fgedu001
Concurrency Level: 100
Time taken for tests: 10.567 seconds
Complete requests: 10000
Failed requests: 0
Requests per second: 946.34 [#/sec] (mean)
Time per request: 105.670 [ms] (mean)
Time per request: 1.057 [ms] (mean, across all concurrent requests)
Transfer rate: 256.78 [Kbytes/sec] received
4.2 实时指标查询服务实战
4.2.1 实时指标查询实现
CREATE TABLE fgedu_realtime_metrics (
dt Date,
ts DateTime,
metric_name String,
metric_value Float64,
dimension1 String,
dimension2 String,
dimension3 String
)
ENGINE = MergeTree()
PARTITION BY dt
ORDER BY (metric_name, ts)
SETTINGS index_granularity = 8192;
# 实时指标查询服务
@RestController
@RequestMapping(“/api/v1/metrics”)
public class MetricsController {
@Autowired
private MetricsService metricsService;
@GetMapping(“/query”)
public ResponseEntity<List<MetricData>> queryMetrics(
@RequestParam String metricName,
@RequestParam String startTime,
@RequestParam String endTime,
@RequestParam(required = false) String dimension) {
List<MetricData> data = metricsService.queryMetrics(
metricName, startTime, endTime, dimension);
return ResponseEntity.ok(data);
}
}
@Service
public class MetricsService {
@Autowired
private JdbcTemplate clickHouseJdbcTemplate;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public List<MetricData> queryMetrics(String metricName,
String startTime, String endTime, String dimension) {
String cacheKey = String.format(“metrics:%s:%s:%s:%s”,
metricName, startTime, endTime, dimension);
@SuppressWarnings(“unchecked”)
List<MetricData> cachedData = (List<MetricData>) redisTemplate.opsForValue().get(cacheKey);
if (cachedData != null) {
return cachedData;
}
String sql = “SELECT ts, metric_value FROM fgedu_realtime_metrics ” +
“WHERE metric_name = ? AND ts >= ? AND ts <= ?”;
if (dimension != null) {
sql += ” AND dimension1 = ?”;
}
List<MetricData> data = clickHouseJdbcTemplate.query(sql,
(rs, rowNum) -> {
MetricData md = new MetricData();
md.setTimestamp(rs.getTimestamp(“ts”).getTime());
md.setValue(rs.getDouble(“metric_value”));
return md;
},
metricName, startTime, endTime);
redisTemplate.opsForValue().set(cacheKey, data, 5, TimeUnit.MINUTES);
return data;
}
}
# 查询示例
$ curl -X GET “http://api.fgedu.net.cn/api/v1/metrics/query?metricName=pv&startTime=2024-04-01%2000:00:00&endTime=2024-04-08%2023:59:59” \
-H “Authorization: Bearer your-token-here”
[
{“timestamp”: 1711929600000, “value”: 1024567},
{“timestamp”: 1712016000000, “value”: 1156789},
{“timestamp”: 1712102400000, “value”: 987654}
]
4.3 服务监控与告警实战
4.3.1 Prometheus+Grafana监控
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
# application.yml配置
management:
endpoints:
web:
exposure:
include: health,info,prometheus,metrics
metrics:
tags:
application: ${spring.application.name}
export:
prometheus:
enabled: true
# Prometheus配置
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
– job_name: ‘fgedu-data-service’
metrics_path: ‘/actuator/prometheus’
static_configs:
– targets: [‘192.168.1.100:8080’, ‘192.168.1.101:8080’]
– job_name: ‘redis’
static_configs:
– targets: [‘192.168.1.100:9121’]
# 告警规则
groups:
– name: service_alerts
rules:
– alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: “Instance {{ $labels.instance }} down”
description: “{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 1 minute.”
– alert: HighErrorRate
expr: rate(http_server_requests_seconds_count{status=~”5..”}[5m]) / rate(http_server_requests_seconds_count[5m]) > 0.01
for: 5m
labels:
severity: warning
annotations:
summary: “High error rate on {{ $labels.instance }}”
description: “Error rate is {{ $value | humanizePercentage }} on {{ $labels.instance }}”
– alert: HighResponseTime
expr: histogram_quantile(0.99, rate(http_server_requests_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
annotations:
summary: “High response time on {{ $labels.instance }}”
description: “P99 response time is {{ $value }}s on {{ $labels.instance }}”
# Grafana Dashboard配置
# 导入JVM Micrometer Dashboard
# 导入Spring Boot Statistics Dashboard
# 自定义关键指标面板:QPS、响应时间、错误率、缓存命中率
Part05-风哥经验总结与分享
5.1 实时数据服务最佳实践
实时数据服务最佳实践:
- 多级缓存:本地缓存+分布式缓存,减少底层压力
- 限流熔断:防止系统过载,保护下游服务
- 异步处理:非核心逻辑异步执行,提升响应速度
- 数据预加载:热点数据预热,减少冷启动
- 读写分离:查询走从库,写入走主库
- 容量规划:根据业务增长预留资源
5.2 常见问题处理
– 检查缓存命中率
– 优化SQL查询
– 增加服务实例
– 检查网络延迟
# 常见问题2:缓存雪崩
– 缓存时间随机化
– 使用多级缓存
– 缓存预热
– 限流保护
# 常见问题3:服务雪崩
– 配置熔断机制
– 服务降级
– 快速失败
– 资源隔离
# 常见问题4:数据不一致
– 使用分布式锁
– 最终一致设计
– 数据校验
– 补偿机制
# 常见问题5:OOM问题
– 调整JVM参数
– 优化内存使用
– 使用堆外内存
– 流量控制
5.3 运维检查清单
– [ ] 服务健康检查
– [ ] 响应时间监控
– [ ] QPS监控
– [ ] 错误率监控
– [ ] 缓存命中率
– [ ] JVM内存使用
– [ ] GC频率和耗时
– [ ] 连接池状态
– [ ] 线程池状态
– [ ] CPU使用率
– [ ] 内存使用率
– [ ] 磁盘使用率
– [ ] 网络流量
– [ ] 日志异常检查
– [ ] 告警规则检查
– [ ] 数据一致性校验
# 日常巡检流程
1. 检查Grafana监控面板
2. 查看告警记录
3. 检查错误日志
4. 检查服务状态
5. 性能指标分析
6. 容量规划评估
7. 安全漏洞扫描
8. 备份状态检查
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
