1. 首页 > Hadoop教程 > 正文

大数据教程FG152-Hadoop实时数据服务层建设实战

本文详细介绍Hadoop实时数据服务层建设实战,包括服务层架构设计、API服务、缓存层、网关层、监控告警等内容,风哥教程参考Kafka、Spark、Redis等官方文档,适合大数据架构师和工程师在生产环境中使用。学习交流加群风哥微信: itpux-com

Part01-基础概念与理论知识

1.1 实时数据服务层概述

实时数据服务层是连接大数据存储和上层应用的关键组件,提供低延迟、高并发的数据查询服务。更多视频教程www.fgedu.net.cn

实时数据服务层核心价值:

  • 低延迟查询:毫秒级响应
  • 高并发支持:支持数万QPS
  • 统一数据出口:标准化API
  • 安全认证:权限控制
  • 监控告警:实时监控服务状态
  • 弹性伸缩:根据负载自动扩缩容

1.2 实时数据服务架构

实时数据服务层典型架构:

# 实时数据服务架构分层
接入层 → 网关层 → API服务层 → 缓存层 → 数据存储层

# 各层职责
接入层:
– Nginx/HAProxy:负载均衡
– TLS/SSL:加密传输
– 限流熔断:防止系统过载

网关层:
– API Gateway:统一入口
– 路由转发:请求分发
– 认证鉴权:权限验证
– 日志记录:访问日志

API服务层:
– RESTful API:标准化接口
– GraphQL:灵活查询
– gRPC:高性能RPC

缓存层:
– Redis:热点数据缓存
– Caffeine:本地缓存
– 多级缓存:提升性能

数据存储层:
– HBase:实时查询
– ClickHouse/Doris:OLAP查询
– Hudi/Iceberg:数据湖查询
– ElasticSearch:全文检索

1.3 核心服务组件

实时数据服务层核心组件:

  • Nginx/HAProxy:负载均衡和反向代理
  • Spring Cloud Gateway:API网关
  • Spring Boot:微服务框架
  • Redis Cluster:分布式缓存
  • Prometheus/Grafana:监控告警
  • ELK:日志收集分析
风哥提示:架构设计要根据业务需求选择合适的组件。对于简单场景,可以从简化架构开始,逐步迭代优化。学习交流加群风哥QQ113257174

Part02-生产环境规划与建议

2.1 服务层架构规划

服务层架构规划要点:

# 服务器规划
接入层:
– 服务器数量:2-4台
– 配置:8核16GB
– 组件:Nginx + Keepalived

网关层:
– 服务器数量:3-5台
– 配置:16核32GB
– 组件:Spring Cloud Gateway

API服务层:
– 服务器数量:5-10台
– 配置:16核32GB
– 组件:Spring Boot应用

缓存层:
– Redis Cluster:3主3从
– 配置:16核64GB
– 内存:每实例32GB

# 网络规划
– 接入层公网IP,其他层内网
– 服务间通信使用内网
– 网络带宽:千兆或万兆
– 网络延迟:<2ms

2.2 资源配置建议

资源配置建议:

# JVM配置
-Xms8g -Xmx8g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/bigdata/fgdata/heapdump
-XX:+PrintGCDetails
-XX:+PrintGCDateStamps
-Xloggc:/bigdata/fgdata/logs/gc.log

# 线程池配置
corePoolSize: 200
maxPoolSize: 500
queueCapacity: 10000
keepAliveSeconds: 60

# 连接池配置
maxTotal: 200
maxIdle: 50
minIdle: 10
maxWaitMillis: 3000

# 超时配置
连接超时: 3s
读取超时: 10s
写入超时: 10s

2.3 SLA设计与保障

SLA设计与保障:

SLA指标设计:

  • 可用性:99.9%(年度 downtime < 8.76小时)
  • 响应时间:P99 < 200ms,P95 < 100ms
  • QPS:支持10000+ QPS
  • 错误率:< 0.1%
  • 数据一致性:最终一致或强一致

更多学习教程公众号风哥教程itpux_com

Part03-生产环境项目实施方案

3.1 API服务层建设

3.1.1 使用Spring Boot构建API服务

# 创建Spring Boot项目
$ mkdir -p /bigdata/app/fgedu-data-service
$ cd /bigdata/app/fgedu-data-service

# pom.xml配置
<?xml version=”1.0″ encoding=”UTF-8″?>
<project xmlns=”http://maven.apache.org/POM/4.0.0″
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd”>
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
</parent>

<groupId>cn.fgedu</groupId>
<artifactId>fgedu-data-service</artifactId>
<version>1.0.0</version>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.9</version>
</dependency>
</dependencies>
</project>

# application.yml配置
server:
port: 8080
spring:
application:
name: fgedu-data-service
redis:
cluster:
nodes: 192.168.1.100:7001,192.168.1.101:7001,192.168.1.102:7001
timeout: 3000
lettuce:
pool:
max-active: 200
max-idle: 50
min-idle: 10

# 创建UserController
@RestController
@RequestMapping(“/api/v1/user”)
public class UserController {

@Autowired
private UserService userService;

@GetMapping(“/profile/{userId}”)
public ResponseEntity<UserProfile> getUserProfile(@PathVariable String userId) {
UserProfile profile = userService.getUserProfile(userId);
return ResponseEntity.ok(profile);
}
}

# 创建UserService
@Service
public class UserService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private HBaseTemplate hBaseTemplate;

private static final String USER_PROFILE_CACHE_KEY = “user:profile:”;

public UserProfile getUserProfile(String userId) {
String cacheKey = USER_PROFILE_CACHE_KEY + userId;

UserProfile profile = (UserProfile) redisTemplate.opsForValue().get(cacheKey);
if (profile != null) {
return profile;
}

profile = hBaseTemplate.queryUserProfile(userId);
if (profile != null) {
redisTemplate.opsForValue().set(cacheKey, profile, 1, TimeUnit.HOURS);
}

return profile;
}
}

# 启动服务
$ mvn clean package
$ java -jar fgedu-data-service-1.0.0.jar

3.2 缓存层建设

3.2.1 Redis Cluster部署

#!/bin/bash
# install_redis_cluster.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

# 1. 下载安装Redis
cd /bigdata/app
wget https://download.redis.io/releases/redis-7.0.12.tar.gz
tar -zxvf redis-7.0.12.tar.gz
cd redis-7.0.12
make
make install

# 2. 创建节点目录
mkdir -p /bigdata/fgdata/redis/{7001,7002,7003}

# 3. 配置文件
cat > /bigdata/fgdata/redis/7001/redis.conf << ‘EOF’
port 7001
cluster-enabled yes
cluster-config-file nodes-7001.conf
cluster-node-timeout 5000
appendonly yes
dir /bigdata/fgdata/redis/7001
bind 0.0.0.0
protected-mode no
maxmemory 32gb
maxmemory-policy allkeys-lru
EOF

# 复制配置到其他节点
sed ‘s/7001/7002/g’ /bigdata/fgdata/redis/7001/redis.conf > /bigdata/fgdata/redis/7002/redis.conf
sed ‘s/7001/7003/g’ /bigdata/fgdata/redis/7001/redis.conf > /bigdata/fgdata/redis/7003/redis.conf

# 4. 启动Redis节点
redis-server /bigdata/fgdata/redis/7001/redis.conf &
redis-server /bigdata/fgdata/redis/7002/redis.conf &
redis-server /bigdata/fgdata/redis/7003/redis.conf &

# 5. 创建集群
redis-cli –cluster create \
192.168.1.100:7001 \
192.168.1.101:7001 \
192.168.1.102:7001 \
–cluster-replicas 1

# 6. 验证集群
redis-cli -c -p 7001 cluster info
redis-cli -c -p 7001 cluster nodes

# 7. 测试写入
redis-cli -c -p 7001 set fgedu:test “hello world”
redis-cli -c -p 7001 get fgedu:test

3.3 网关层建设

3.3.1 Spring Cloud Gateway部署

# 网关项目配置
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

# application.yml
spring:
cloud:
gateway:
routes:
– id: user-service
uri: lb://FGEDU-DATA-SERVICE
predicates:
– Path=/api/v1/user/**
filters:
– StripPrefix=0
– name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 100
redis-rate-limiter.burstCapacity: 200
– id: order-service
uri: lb://FGEDU-ORDER-SERVICE
predicates:
– Path=/api/v1/order/**
filters:
– StripPrefix=0
discovery:
locator:
enabled: true

# 全局过滤器
@Component
public class AuthFilter implements GlobalFilter, Ordered {

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String token = request.getHeaders().getFirst(“Authorization”);

if (token == null || !validateToken(token)) {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}

return chain.filter(exchange);
}

@Override
public int getOrder() {
return -100;
}
}

# Nginx配置
upstream gateway_cluster {
server 192.168.1.100:8080;
server 192.168.1.101:8080;
server 192.168.1.102:8080;
keepalive 32;
}

server {
listen 80;
server_name api.fgedu.net.cn;

location / {
proxy_pass http://gateway_cluster;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection “”;
}
}

风哥提示:网关层是系统的入口,一定要做好限流熔断和安全认证。建议使用成熟的API网关组件,不要自己造轮子。from bigdata视频:www.itpux.com

Part04-生产案例与实战讲解

4.1 用户画像查询服务实战

4.1.1 用户画像服务实现

# 用户画像数据模型
CREATE TABLE fgedu_user_profile (
user_id STRING,
name STRING,
age INT,
gender STRING,
city STRING,
tags ARRAY<STRING>,
update_time BIGINT
)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (
‘hbase.columns.mapping’ = ‘:key,info:name,info:age,info:gender,info:city,profile:tags,profile:update_time’
)
TBLPROPERTIES (‘hbase.table.name’ = ‘fgedu_user_profile’);

# HBase查询实现
public class HBaseTemplate {

private Connection connection;

public UserProfile queryUserProfile(String userId) {
try (Table table = connection.getTable(TableName.valueOf(“fgedu_user_profile”))) {
Get get = new Get(Bytes.toBytes(userId));
Result result = table.get(get);

if (result.isEmpty()) {
return null;
}

UserProfile profile = new UserProfile();
profile.setUserId(userId);
profile.setName(Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“name”))));
profile.setAge(Bytes.toInt(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“age”))));
profile.setGender(Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“gender”))));
profile.setCity(Bytes.toString(result.getValue(Bytes.toBytes(“info”), Bytes.toBytes(“city”))));

return profile;
} catch (IOException e) {
throw new RuntimeException(“HBase query error”, e);
}
}
}

# API测试
$ curl -X GET http://api.fgedu.net.cn/api/v1/user/profile/fgedu001 \
-H “Authorization: Bearer your-token-here”

{
“userId”: “fgedu001”,
“name”: “风哥”,
“age”: 28,
“gender”: “男”,
“city”: “北京”,
“tags”: [“大数据”, “Java”, “架构师”],
“updateTime”: 1712345678901
}

# 性能测试
$ ab -n 10000 -c 100 http://api.fgedu.net.cn/api/v1/user/profile/fgedu001

Concurrency Level: 100
Time taken for tests: 10.567 seconds
Complete requests: 10000
Failed requests: 0
Requests per second: 946.34 [#/sec] (mean)
Time per request: 105.670 [ms] (mean)
Time per request: 1.057 [ms] (mean, across all concurrent requests)
Transfer rate: 256.78 [Kbytes/sec] received

4.2 实时指标查询服务实战

4.2.1 实时指标查询实现

# ClickHouse实时指标表
CREATE TABLE fgedu_realtime_metrics (
dt Date,
ts DateTime,
metric_name String,
metric_value Float64,
dimension1 String,
dimension2 String,
dimension3 String
)
ENGINE = MergeTree()
PARTITION BY dt
ORDER BY (metric_name, ts)
SETTINGS index_granularity = 8192;

# 实时指标查询服务
@RestController
@RequestMapping(“/api/v1/metrics”)
public class MetricsController {

@Autowired
private MetricsService metricsService;

@GetMapping(“/query”)
public ResponseEntity<List<MetricData>> queryMetrics(
@RequestParam String metricName,
@RequestParam String startTime,
@RequestParam String endTime,
@RequestParam(required = false) String dimension) {

List<MetricData> data = metricsService.queryMetrics(
metricName, startTime, endTime, dimension);
return ResponseEntity.ok(data);
}
}

@Service
public class MetricsService {

@Autowired
private JdbcTemplate clickHouseJdbcTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

public List<MetricData> queryMetrics(String metricName,
String startTime, String endTime, String dimension) {

String cacheKey = String.format(“metrics:%s:%s:%s:%s”,
metricName, startTime, endTime, dimension);

@SuppressWarnings(“unchecked”)
List<MetricData> cachedData = (List<MetricData>) redisTemplate.opsForValue().get(cacheKey);
if (cachedData != null) {
return cachedData;
}

String sql = “SELECT ts, metric_value FROM fgedu_realtime_metrics ” +
“WHERE metric_name = ? AND ts >= ? AND ts <= ?”;

if (dimension != null) {
sql += ” AND dimension1 = ?”;
}

List<MetricData> data = clickHouseJdbcTemplate.query(sql,
(rs, rowNum) -> {
MetricData md = new MetricData();
md.setTimestamp(rs.getTimestamp(“ts”).getTime());
md.setValue(rs.getDouble(“metric_value”));
return md;
},
metricName, startTime, endTime);

redisTemplate.opsForValue().set(cacheKey, data, 5, TimeUnit.MINUTES);

return data;
}
}

# 查询示例
$ curl -X GET “http://api.fgedu.net.cn/api/v1/metrics/query?metricName=pv&startTime=2024-04-01%2000:00:00&endTime=2024-04-08%2023:59:59” \
-H “Authorization: Bearer your-token-here”

[
{“timestamp”: 1711929600000, “value”: 1024567},
{“timestamp”: 1712016000000, “value”: 1156789},
{“timestamp”: 1712102400000, “value”: 987654}
]

4.3 服务监控与告警实战

4.3.1 Prometheus+Grafana监控

# 引入Actuator依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

# application.yml配置
management:
endpoints:
web:
exposure:
include: health,info,prometheus,metrics
metrics:
tags:
application: ${spring.application.name}
export:
prometheus:
enabled: true

# Prometheus配置
global:
scrape_interval: 15s
evaluation_interval: 15s

scrape_configs:
– job_name: ‘fgedu-data-service’
metrics_path: ‘/actuator/prometheus’
static_configs:
– targets: [‘192.168.1.100:8080’, ‘192.168.1.101:8080’]
– job_name: ‘redis’
static_configs:
– targets: [‘192.168.1.100:9121’]

# 告警规则
groups:
– name: service_alerts
rules:
– alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: “Instance {{ $labels.instance }} down”
description: “{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 1 minute.”

– alert: HighErrorRate
expr: rate(http_server_requests_seconds_count{status=~”5..”}[5m]) / rate(http_server_requests_seconds_count[5m]) > 0.01
for: 5m
labels:
severity: warning
annotations:
summary: “High error rate on {{ $labels.instance }}”
description: “Error rate is {{ $value | humanizePercentage }} on {{ $labels.instance }}”

– alert: HighResponseTime
expr: histogram_quantile(0.99, rate(http_server_requests_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
annotations:
summary: “High response time on {{ $labels.instance }}”
description: “P99 response time is {{ $value }}s on {{ $labels.instance }}”

# Grafana Dashboard配置
# 导入JVM Micrometer Dashboard
# 导入Spring Boot Statistics Dashboard
# 自定义关键指标面板:QPS、响应时间、错误率、缓存命中率

生产环境建议:监控告警是保障服务稳定运行的关键。建议配置多层监控,包括基础设施监控、应用监控、业务监控。更多视频教程www.fgedu.net.cn

Part05-风哥经验总结与分享

5.1 实时数据服务最佳实践

实时数据服务最佳实践:

  • 多级缓存:本地缓存+分布式缓存,减少底层压力
  • 限流熔断:防止系统过载,保护下游服务
  • 异步处理:非核心逻辑异步执行,提升响应速度
  • 数据预加载:热点数据预热,减少冷启动
  • 读写分离:查询走从库,写入走主库
  • 容量规划:根据业务增长预留资源

5.2 常见问题处理

# 常见问题1:响应时间过长
– 检查缓存命中率
– 优化SQL查询
– 增加服务实例
– 检查网络延迟

# 常见问题2:缓存雪崩
– 缓存时间随机化
– 使用多级缓存
– 缓存预热
– 限流保护

# 常见问题3:服务雪崩
– 配置熔断机制
– 服务降级
– 快速失败
– 资源隔离

# 常见问题4:数据不一致
– 使用分布式锁
– 最终一致设计
– 数据校验
– 补偿机制

# 常见问题5:OOM问题
– 调整JVM参数
– 优化内存使用
– 使用堆外内存
– 流量控制

5.3 运维检查清单

# 实时数据服务运维检查清单
– [ ] 服务健康检查
– [ ] 响应时间监控
– [ ] QPS监控
– [ ] 错误率监控
– [ ] 缓存命中率
– [ ] JVM内存使用
– [ ] GC频率和耗时
– [ ] 连接池状态
– [ ] 线程池状态
– [ ] CPU使用率
– [ ] 内存使用率
– [ ] 磁盘使用率
– [ ] 网络流量
– [ ] 日志异常检查
– [ ] 告警规则检查
– [ ] 数据一致性校验

# 日常巡检流程
1. 检查Grafana监控面板
2. 查看告警记录
3. 检查错误日志
4. 检查服务状态
5. 性能指标分析
6. 容量规划评估
7. 安全漏洞扫描
8. 备份状态检查

风哥提示:实时数据服务层建设是一个持续优化的过程。建议先满足核心需求,再逐步迭代完善。建立完善的监控告警体系,及时发现和处理问题。学习交流加群风哥微信: itpux-com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息