1. 首页 > Hadoop教程 > 正文

大数据教程FG214-Hadoop数据双写一致性实战

目录大纲

Part01-基础概念与理论知识

1.1 双写一致性概念与原理

双写一致性是指在数据处理过程中,同时将数据写入两个不同的存储系统,确保数据在两个系统中保持一致。在Hadoop生态系统中,常见的双写场景包括:

  • 关系型数据库(如MySQL)到HBase的双写
  • Kafka到HDFS的双写
  • 应用系统到多个数据存储的双写

双写一致性的核心原理是确保数据在两个系统中的写入操作要么都成功,要么都失败,从而避免数据不一致的情况。

学习交流加群风哥微信: itpux-com

1.2 双写一致性的挑战

双写一致性面临的主要挑战:

  • 网络故障:在双写过程中,网络故障可能导致其中一个写入操作失败
  • 系统故障:目标存储系统的故障可能导致写入失败
  • 并发冲突:多个并发操作可能导致数据冲突
  • 性能影响:双写会增加系统的延迟和负载
  • 数据一致性验证:需要验证两个系统中的数据是否一致

1.3 双写一致性的解决方案

双写一致性的解决方案:

  • 事务机制:使用分布式事务确保双写操作的原子性
  • 消息队列:使用Kafka等消息队列作为中间件,确保数据可靠传递
  • 补偿机制:当双写失败时,通过补偿操作确保数据一致
  • 最终一致性:采用最终一致性模型,通过定期同步确保数据一致
  • 监控与告警:建立监控系统,及时发现和处理双写失败的情况

更多视频教程www.fgedu.net.cn

Part02-生产环境规划与建议

2.1 双写一致性架构设计

风哥提示:双写一致性架构设计需要考虑系统的可靠性、性能和可维护性,选择合适的架构模式。

双写一致性架构设计建议:

  • 同步双写:在应用层同时写入两个系统,确保数据实时一致
  • 异步双写:通过消息队列异步写入第二个系统,提高系统性能
  • 混合模式:关键数据使用同步双写,非关键数据使用异步双写
  • 主从模式:以一个系统为主,另一个系统为从,通过复制机制保持一致

2.2 技术选型与工具推荐

双写一致性技术选型:

场景 推荐工具 优势
MySQL到HBase双写 Canal + Kafka + HBase 实时同步,可靠性高
Kafka到HDFS双写 Flume + Kafka + HDFS 高吞吐,容错性好
应用到多数据源双写 Spring Cloud Stream + Kafka 易于集成,可扩展性强
跨集群双写 DistCp + Kafka 适合大规模数据同步

学习交流加群风哥QQ113257174

2.3 部署与配置建议

双写一致性部署与配置建议:

  • 高可用配置:确保双写涉及的所有系统都配置为高可用
  • 网络优化:确保网络带宽足够,减少网络延迟
  • 监控配置:配置详细的监控和告警机制
  • 容错配置:配置合适的重试机制和错误处理策略
  • 性能调优:根据实际情况调整系统参数,优化性能

Part03-生产环境项目实施方案

3.1 双写一致性实现步骤

双写一致性实现步骤:

  1. 设计双写架构和数据模型
  2. 选择合适的技术栈和工具
  3. 实现双写逻辑和错误处理
  4. 配置监控和告警机制
  5. 进行测试和性能优化
  6. 部署到生产环境
  7. 定期验证数据一致性

3.2 代码实现与配置

双写一致性代码实现示例:

# MySQL到HBase双写配置(Canal + Kafka + HBase)

$ vi /bigdata/app/canal/conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 1234

## position info
canal.instance.master.address = 192.168.1.100:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.gtid =

## rds oss binlog
canal.instance.rds.accesskey =
canal.instance.rds.secretkey =
canal.instance.rds.instanceId =

## table meta tsdb info
canal.instance.tsdb.enable = true

## username/password
canal.instance.dbUsername = fgedu
canal.instance.dbPassword = fgedu123
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName = fgedudb
canal.instance.filter.regex = fgedudb\.fgedu_.*

## mq config
canal.mq.topic = fgedu_mysql_binlog
canal.mq.partition = 0

# Kafka到HDFS双写配置(Flume)

$ vi /bigdata/app/flume/conf/kafka_to_hdfs.conf

# Name the components on this agent
a1.sources = kafka-source
a1.sinks = hdfs-sink
a1.channels = memory-channel

# Describe/configure the source
a1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafka-source.kafka.bootstrap.servers =
192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092
a1.sources.kafka-source.kafka.topics = fgedu_events
a1.sources.kafka-source.kafka.consumer.group.id = flume-consumer

# Describe the sink
a1.sinks.hdfs-sink.type = hdfs
a1.sinks.hdfs-sink.hdfs.path = hdfs://fgedu.net.cn:9000/bigdata/fgdata/kafka/%Y/%m/%d
a1.sinks.hdfs-sink.hdfs.filePrefix = events-
a1.sinks.hdfs-sink.hdfs.fileType = DataStream
a1.sinks.hdfs-sink.hdfs.writeFormat = Text
a1.sinks.hdfs-sink.hdfs.rollInterval = 3600
a1.sinks.hdfs-sink.hdfs.rollSize = 134217728
a1.sinks.hdfs-sink.hdfs.rollCount = 0
a1.sinks.hdfs-sink.hdfs.batchSize = 1000
a1.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
a1.channels.memory-channel.type = memory
a1.channels.memory-channel.capacity = 10000
a1.channels.memory-channel.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.kafka-source.channels = memory-channel
a1.sinks.hdfs-sink.channel = memory-channel

更多学习教程公众号风哥教程itpux_com

3.3 监控与告警

双写一致性监控与告警配置:

# Prometheus监控配置

$ vi /bigdata/app/prometheus/prometheus.yml

scrape_configs:
– job_name: ‘canal’
static_configs:
– targets: [‘192.168.1.101:9100’]

– job_name: ‘kafka’
static_configs:
– targets: [‘192.168.1.101:9308’, ‘192.168.1.102:9308’, ‘192.168.1.103:9308’]

– job_name: ‘hbase’
static_configs:
– targets: [‘192.168.1.101:9100’, ‘192.168.1.102:9100’, ‘192.168.1.103:9100’]

– job_name: ‘hdfs’
static_configs:
– targets: [‘192.168.1.101:9100’, ‘192.168.1.102:9100’]

# Grafana告警配置

$ vi /bigdata/app/grafana/dashboards/dual-write-dashboard.json

{
“annotations”: {
“list”: [
{
“builtIn”: 1,
“datasource”: “– Grafana –“,
“enable”: true,
“hide”: true,
“iconColor”: “rgba(0, 211, 255, 1)”,
“name”: “Annotations & Alerts”,
“type”: “dashboard”
}
]
},
“editable”: true,
“gnetId”: null,
“graphTooltip”: 0,
“id”: 1,
“links”: [],
“panels”: [
{
“aliasColors”: {},
“bars”: false,
“dashLength”: 10,
“dashes”: false,
“datasource”: “Prometheus”,
“fieldConfig”: {
“defaults”: {
“custom”: {}
},
“overrides”: []
},
“fill”: 1,
“fillGradient”: 0,
“gridPos”: {
“h”: 8,
“w”: 12,
“x”: 0,
“y”: 0
},
“hiddenSeries”: false,
“id”: 2,
“legend”: {
“avg”: false,
“current”: false,
“max”: false,
“min”: false,
“show”: true,
“total”: false,
“values”: false
},
“lines”: true,
“linewidth”: 1,
“nullPointMode”: “null”,
“options”: {
“alertThreshold”: true
},
“percentage”: false,
“pluginVersion”: “7.5.7”,
“pointradius”: 2,
“points”: false,
“renderer”: “flot”,
“seriesOverrides”: [],
“spaceLength”: 10,
“stack”: false,
“steppedLine”: false,
“targets”: [
{
“expr”: “rate(canal_binlog_processed_total[5m])”,
“interval”: “”,
“legendFormat”: “Canal Binlog Processed”,
“refId”: “A”
}
],
“thresholds”: [
{
“colorMode”: “critical”,
“fill”: true,
“line”: true,
“op”: “lt”,
“value”: 1,
“visible”: true
}
],
“timeFrom”: null,
“timeRegions”: [],
“timeShift”: null,
“title”: “Canal Binlog Processing Rate”,
“tooltip”: {
“shared”: true,
“sort”: 0,
“value_type”: “individual”
},
“type”: “graph”,
“xaxis”: {
“buckets”: null,
“mode”: “time”,
“name”: null,
“show”: true,
“values”: []
},
“yaxes”: [
{
“format”: “short”,
“label”: null,
“logBase”: 1,
“max”: null,
“min”: “0”,
“show”: true
},
{
“format”: “short”,
“label”: null,
“logBase”: 1,
“max”: null,
“min”: null,
“show”: true
}
],
“yaxis”: {
“align”: false,
“alignLevel”: null
}
}
],
“schemaVersion”: 26,
“style”: “dark”,
“tags”: [],
“templating”: {
“list”: []
},
“time”: {
“from”: “now-6h”,
“to”: “now”
},
“timepicker”: {},
“timezone”: “”,
“title”: “Dual Write Monitoring”,
“uid”: “dual-write-dashboard”,
“version”: 1
}

Part04-生产案例与实战讲解

4.1 MySQL到HBase双写实战

MySQL到HBase双写实战:

# 启动Canal服务

$ cd /bigdata/app/canal/bin

$ ./startup.sh

cd to /bigdata/app/canal/bin for workaround relative path

LOG CONFIGURATION : /bigdata/app/canal/conf/logback.xml

canal server is starting …

2023-07-25 12:00:00.000 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher – ##
start the canal server.

2023-07-25 12:00:00.100 [main] INFO com.alibaba.otter.canal.deployer.CanalController –
start canal server[192.168.1.101:11111]

2023-07-25 12:00:00.200 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher – ##
the canal server is running now ……

# 查看Kafka主题

$ kafka-topics.sh –bootstrap-server 192.168.1.101:9092 –list

fgedu_mysql_binlog

fgedu_events

# 启动HBase消费者

$ spark-submit –class com.fgedu.hbase.HBaseConsumer –master yarn
/bigdata/app/jars/hbase-consumer-1.0.jar

2023-07-25 12:05:00.000 INFO SparkContext:54 – Running Spark version 3.2.1

2023-07-25 12:05:00.100 INFO SparkContext:54 – Submitted application: HBase Consumer

2023-07-25 12:05:01.200 INFO YarnClientSchedulerBackend:54 – Connected to ResourceManager
at fgedu.net.cn:8032

2023-07-25 12:05:02.300 INFO HBaseConsumer:54 – Starting HBase consumer…

2023-07-25 12:05:03.400 INFO HBaseConsumer:54 – Subscribed to topic: fgedu_mysql_binlog

2023-07-25 12:05:04.500 INFO HBaseConsumer:54 – Consumer started successfully

from bigdata视频:www.itpux.com

4.2 Kafka到HDFS双写实战

Kafka到HDFS双写实战:

# 启动Flume代理

$ cd /bigdata/app/flume/bin

$ ./flume-ng agent –name a1 –conf conf –conf-file conf/kafka_to_hdfs.conf
-Dflume.root.logger=INFO,console

2023-07-25 12:10:00.000 INFO node.PollingPropertiesFileConfigurationProvider:114 –
Configuration provider starting

2023-07-25 12:10:00.100 INFO node.PollingPropertiesFileConfigurationProvider:162 –
Reloading configuration file: conf/kafka_to_hdfs.conf

2023-07-25 12:10:00.200 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:00.300 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:00.400 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:00.500 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:00.600 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:00.700 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:00.800 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:00.900 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:01.000 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:01.100 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:01.200 INFO conf.FlumeConfiguration:1016 – Processing:hdfs-sink

2023-07-25 12:10:01.300 INFO conf.FlumeConfiguration:1016 – Processing:kafka-source

2023-07-25 12:10:01.400 INFO conf.FlumeConfiguration:1016 – Processing:kafka-source

2023-07-25 12:10:01.500 INFO conf.FlumeConfiguration:1016 – Processing:kafka-source

2023-07-25 12:10:01.600 INFO conf.FlumeConfiguration:1016 – Processing:kafka-source

2023-07-25 12:10:01.700 INFO conf.FlumeConfiguration:1016 – Processing:kafka-source

2023-07-25 12:10:01.800 INFO conf.FlumeConfiguration:1016 – Processing:memory-channel

2023-07-25 12:10:01.900 INFO conf.FlumeConfiguration:1016 – Processing:memory-channel

2023-07-25 12:10:02.000 INFO conf.FlumeConfiguration:1016 – Processing:memory-channel

2023-07-25 12:10:02.100 INFO node.AbstractConfigurationProvider:150 – Configuration
provider Reloading configuration file: conf/kafka_to_hdfs.conf

2023-07-25 12:10:02.200 INFO node.Application:201 – Starting new configuration:{
sourceRunners:{kafka-source=EventDrivenSourceRunner: {
source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source,state:IDLE} }}
sinkRunners:{hdfs-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4a4a4a4a
counterGroup:{ name:null counters:{} } }}
channels:{memory-channel=org.apache.flume.channel.MemoryChannel{name: memory-channel}}

2023-07-25 12:10:02.300 INFO node.Application:215 – Starting Channel memory-channel

2023-07-25 12:10:02.400 INFO node.Application:231 – Starting Sink hdfs-sink

2023-07-25 12:10:02.500 INFO node.Application:246 – Starting Source kafka-source

2023-07-25 12:10:02.600 INFO kafka.KafkaSource:165 – Kafka source kafka-source started.

# 查看HDFS中的数据

$ hdfs dfs -ls /bigdata/fgdata/kafka/2023/07/25/

Found 2 items

-rw-r–r– 3 hdfs supergroup 1024 2023-07-25 12:15
/bigdata/fgdata/kafka/2023/07/25/events-1690272900000

-rw-r–r– 3 hdfs supergroup 2048 2023-07-25 12:20
/bigdata/fgdata/kafka/2023/07/25/events-1690273200000

风哥提示:Kafka到HDFS的双写可以通过Flume实现,确保数据可靠地从Kafka写入HDFS。

4.3 应用到多数据源双写实战

应用到多数据源双写实战:

# 应用代码示例(Spring Boot + Kafka)

$ vi
/bigdata/app/applications/spring-boot-dual-write/src/main/java/com/fgedu/DualWriteService.java

package com.fgedu;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

@Service
public class DualWriteService {

@Autowired
private DataSource dataSource;

@Autowired
private KafkaTemplate kafkaTemplate;

@Transactional
public void dualWrite(String data) throws SQLException {
// 写入MySQL
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(“INSERT INTO fgedu_data (data) VALUES (?)”)
) {
stmt.setString(1, data);
stmt.executeUpdate();
}

// 写入Kafka
kafkaTemplate.send(“fgedu_events”, data);
}
}

# 测试双写功能

$ curl -X POST http://localhost:8080/api/dual-write -H “Content-Type: application/json”
-d ‘{“data”: “test data”}’

{“status”: “success”, “message”: “Dual write completed successfully”}

# 验证MySQL中的数据

$ mysql -u fgedu -p fgedudb -e “SELECT * FROM fgedu_data”

Enter password:

+—-+———–+———————+

| id | data | created_at |

+—-+———–+———————+

| 1 | test data | 2023-07-25 12:25:00 |

+—-+———–+———————+

# 验证Kafka中的数据

$ kafka-console-consumer.sh –bootstrap-server 192.168.1.101:9092 –topic fgedu_events
–from-beginning

test data

Part05-风哥经验总结与分享

5.1 双写一致性最佳实践

  • 选择合适的双写模式:根据业务需求选择同步或异步双写模式
  • 使用可靠的消息队列:使用Kafka等可靠的消息队列确保数据传递
  • 实现错误处理机制:处理双写过程中的各种异常情况
  • 建立监控和告警:实时监控双写状态,及时发现问题
  • 定期验证数据一致性:定期检查两个系统中的数据是否一致
  • 优化性能:根据实际情况调整系统参数,提高双写性能

5.2 常见问题与解决方案

问题 解决方案
双写过程中网络故障 实现重试机制,使用消息队列确保数据可靠传递
目标系统写入失败 实现补偿机制,定期同步数据
数据一致性验证困难 编写自动化验证脚本,定期检查数据一致性
双写性能瓶颈 优化系统配置,使用异步双写模式
并发冲突 实现乐观锁或悲观锁机制,避免数据冲突

更多视频教程www.fgedu.net.cn

5.3 性能优化建议

  • 批处理:使用批处理减少网络开销和系统负载
  • 并行处理:使用多线程或分布式处理提高双写速度
  • 缓存机制:使用缓存减少对存储系统的直接访问
  • 压缩传输:对数据进行压缩,减少网络传输量
  • 优化存储配置:根据实际情况调整存储系统的配置参数
  • 监控和调优:通过监控发现性能瓶颈,进行针对性优化

学习交流加群风哥微信: itpux-com

风哥提示:双写一致性是确保数据可靠性的重要手段,但也会增加系统的复杂性和开销。在实际项目中,应根据业务需求和系统特点,选择合适的双写策略和技术方案,确保数据一致性的同时,不影响系统的性能和可用性。

更多学习教程公众号风哥教程itpux_com
from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息