PostgreSQL教程FG180-PG逻辑解码实战:实时数据同步解析
本文档风哥主要介绍PostgreSQL数据库逻辑解码实现实时数据同步的实战应用,包括同步架构设计、程序开发、部署运维等内容,风哥教程参考PostgreSQL官方文档Logical Decoding、Replication Origins等内容,适合数据库开发人员和运维人员在生产环境中实现数据同步。
Part01-基础概念与理论知识
1.1 PostgreSQL数据库实时数据同步概述
实时数据同步是指将源数据库的数据变更实时捕获并同步到目标系统。更多视频教程www.fgedu.net.cn。PostgreSQL的逻辑解码功能提供了强大的CDC(Change Data Capture)能力,可以实时捕获INSERT、UPDATE、DELETE等数据变更,实现异构数据库之间的数据同步、数据仓库实时ETL、搜索引擎索引更新等场景。
- 异构数据库同步(PG到MySQL、Oracle等)
- 数据仓库实时ETL
- 搜索引擎索引更新
- 缓存自动刷新
- 数据审计和追踪
- 微服务数据分发
1.2 PostgreSQL数据库数据同步架构
数据同步架构包括:源数据库(PostgreSQL主库)、逻辑解码组件(捕获变更)、消息队列(可选,缓冲变更)、同步程序(处理和转发)、目标系统(数据库、搜索引擎等)。学习交流加群风哥微信: itpux-com。架构设计需要考虑数据一致性、延迟控制、故障恢复等因素。
1.3 PostgreSQL数据库数据同步挑战
数据同步面临的挑战:数据一致性保证、DDL变更处理、大事务处理、网络故障恢复、目标系统兼容性、性能影响控制。
Part02-生产环境规划与建议
2.1 PostgreSQL数据库实时同步方案设计
同步方案设计要点:选择合适的输出插件(wal2json、pgoutput);设计合理的复制槽管理策略;规划数据转换和映射规则;设计故障恢复机制;考虑数据过滤和路由策略。
2.2 PostgreSQL数据库同步配置优化
配置优化要点:设置合理的wal_level和max_replication_slots;配置足够的wal_keep_size;优化网络传输参数;设置合理的同步延迟告警阈值。
— 查看当前配置
SELECT name, setting, unit
FROM pg_settings
WHERE name IN (
‘wal_level’,
‘max_replication_slots’,
‘max_wal_senders’,
‘wal_keep_size’,
‘wal_sender_timeout’
);
— 输出结果
name | setting | unit
———————+———+——
wal_level | logical |
max_replication_slots | 20 |
max_wal_senders | 20 |
wal_keep_size | 1024 | MB
wal_sender_timeout | 60000 | ms
(5 rows)
— 创建同步用户
CREATE USER fgedu_sync REPLICATION LOGIN PASSWORD ‘fgedu_sync_2026’;
— 授权
GRANT CONNECT ON DATABASE fgedudb TO fgedu_sync;
GRANT USAGE ON SCHEMA public TO fgedu_sync;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO fgedu_sync;
— 配置pg_hba.conf
— 添加以下行
host replication fgedu_sync 192.168.1.0/24 md5
— 重新加载配置
SELECT pg_reload_conf();
— 输出结果
pg_reload_conf
—————-
t
(1 row)
2.3 PostgreSQL数据库同步监控方案
监控方案:监控复制槽状态和消费进度;监控同步延迟;监控WAL文件堆积;监控同步程序运行状态;设置异常告警。
Part03-生产环境项目实施方案
3.1 PostgreSQL数据库实时同步环境搭建
3.1.1 创建同步配置表
CREATE TABLE fgedu_sync_config (
id SERIAL PRIMARY KEY,
source_table VARCHAR(200) NOT NULL,
target_table VARCHAR(200) NOT NULL,
target_system VARCHAR(50) NOT NULL,
target_host VARCHAR(200),
target_port INTEGER,
target_db VARCHAR(100),
column_mfgapping JSONB,
filter_condition TEXT,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);
— 创建同步状态表
CREATE TABLE fgedu_sync_status (
id SERIAL PRIMARY KEY,
slot_name VARCHAR(100) UNIQUE NOT NULL,
last_lsn TEXT,
last_txid BIGINT,
last_sync_time TIMESTAMP,
total_changes BIGINT DEFAULT 0,
status VARCHAR(20) DEFAULT ‘active’,
error_message TEXT
);
— 创建同步日志表
CREATE TABLE fgedu_sync_log (
id SERIAL PRIMARY KEY,
sync_id INTEGER REFERENCES fgedu_sync_status(id),
lsn TEXT,
txid BIGINT,
operation VARCHAR(20),
table_name VARCHAR(200),
change_data JSONB,
sync_time TIMESTAMP DEFAULT NOW(),
status VARCHAR(20),
error_message TEXT
);
CREATE INDEX idx_sync_log_time ON fgedu_sync_log(sync_time);
CREATE INDEX idx_sync_log_table ON fgedu_sync_log(table_name);
— 插入同步配置
INSERT INTO fgedu_sync_config(
source_table, target_table, target_system,
target_host, target_port, target_db, enabled
)
VALUES
(‘fgedu_orders’, ‘orders’, ‘mysql’, ‘192.168.1.200’, 3306, ‘fgedu_db’, TRUE),
(‘fgedu_products’, ‘products’, ‘elasticsearch’, ‘192.168.1.201’, 9200, ‘fgedu_index’, TRUE),
(‘fgedu_customers’, ‘customers’, ‘kafka’, ‘192.168.1.202’, 9092, ‘fgedu_topic’, TRUE);
— 输出结果
INSERT 0 3
3.1.2 创建同步管理函数
CREATE OR REPLACE FUNCTION fgedu_create_sync_slot(
p_slot_name VARCHAR
)
RETURNS BOOLEAN
AS $$
DECLARE
v_slot_exists INTEGER;
BEGIN
SELECT COUNT(*) INTO v_slot_exists
FROM pg_replication_slots
WHERE slot_name = p_slot_name;
IF v_slot_exists > 0 THEN
RAISE NOTICE ‘Slot % already exists’, p_slot_name;
RETURN FALSE;
END IF;
PERFORM pg_create_logical_replication_slot(p_slot_name, ‘wal2json’);
INSERT INTO fgedu_sync_status(slot_name, status)
VALUES(p_slot_name, ‘active’);
RETURN TRUE;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
— 创建同步进度更新函数
CREATE OR REPLACE FUNCTION fgedu_update_sync_progress(
p_slot_name VARCHAR,
p_lsn TEXT,
p_txid BIGINT,
p_changes BIGINT DEFAULT 1
)
RETURNS void
AS $$
BEGIN
UPDATE fgedu_sync_status
SET last_lsn = p_lsn,
last_txid = p_txid,
last_sync_time = NOW(),
total_changes = total_changes + p_changes,
status = ‘active’,
error_message = NULL
WHERE slot_name = p_slot_name;
END;
$$ LANGUAGE plpgsql;
— 创建同步错误记录函数
CREATE OR REPLACE FUNCTION fgedu_record_sync_error(
p_slot_name VARCHAR,
p_error_message TEXT
)
RETURNS void
AS $$
BEGIN
UPDATE fgedu_sync_status
SET status = ‘error’,
error_message = p_error_message
WHERE slot_name = p_slot_name;
END;
$$ LANGUAGE plpgsql;
— 测试创建同步槽
SELECT fgedu_create_sync_slot(‘fgedu_sync_slot’);
— 输出结果
fgedu_create_sync_slot
————————
t
(1 row)
— 查看同步槽状态
SELECT * FROM fgedu_sync_status;
— 输出结果
id | slot_name | last_lsn | last_txid | last_sync_time | total_changes | status | error_message
—-+——————+———-+———–+—————-+—————+——–+—————
1 | fgedu_sync_slot | NULL | NULL | NULL | 0 | active | NULL
(1 row)
3.2 PostgreSQL数据库同步程序开发
3.2.1 Python同步程序示例
— fgedu_sync.py
#!/usr/bin/env python3
# fgedu_sync.py
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
import psycopg2
import psycopg2.extras
import json
import time
import logging
from datetime import datetime
class FgeduSync:
def __init__(self, config):
self.config = config
self.conn = None
self.slot_name = config[‘slot_name’]
self.logger = logging.getLogger(‘FgeduSync’)
def connect(self):
self.conn = psycopg2.connect(
host=self.config[‘host’],
port=self.config[‘port’],
database=self.config[‘database’],
user=self.config[‘user’],
password=self.config[‘password’]
)
self.conn.autocommit = True
self.logger.info(“Connected to PostgreSQL”)
def get_changes(self, limit=1000):
cursor = self.conn.cursor()
cursor.execute(“””
SELECT lsn, xid, data
FROM pg_logical_slot_get_changes(%s, NULL, %s)
“””, (self.slot_name, limit))
changes = []
for row in cursor:
changes.fgappend({
‘lsn’: row[0],
‘xid’: row[1],
‘data’: json.loads(row[2])
})
cursor.close()
return changes
def process_change(self, change):
data = change[‘data’]
if ‘change’ not in data:
return
for item in data[‘change’]:
table_name = item.get(‘table’)
kind = item.get(‘kind’)
if kind == ‘insert’:
self.process_insert(table_name, item)
elif kind == ‘update’:
self.process_update(table_name, item)
elif kind == ‘delete’:
self.process_delete(table_name, item)
# 记录同步日志
self.log_sync(change, table_name, kind)
def process_insert(self, table_name, item):
columns = item.get(‘columnnames’, [])
values = item.get(‘columnvalues’, [])
record = dict(zip(columns, values))
self.logger.info(f”INSERT {table_name}: {record}”)
# 发送到目标系统
self.send_to_target(table_name, ‘insert’, record)
def process_update(self, table_name, item):
columns = item.get(‘columnnames’, [])
values = item.get(‘columnvalues’, [])
keys = item.get(‘oldkeys’, {})
record = dict(zip(columns, values))
self.logger.info(f”UPDATE {table_name}: {record}”)
# 发送到目标系统
self.send_to_target(table_name, ‘update’, record, keys)
def process_delete(self, table_name, item):
keys = item.get(‘oldkeys’, {})
self.logger.info(f”DELETE {table_name}: {keys}”)
# 发送到目标系统
self.send_to_target(table_name, ‘delete’, None, keys)
def send_to_target(self, table_name, operation, record=None, keys=None):
# 实现发送到目标系统的逻辑
pass
def log_sync(self, change, table_name, operation):
cursor = self.conn.cursor()
cursor.execute(“””
INSERT INTO fgedu_sync_log(
sync_id, lsn, txid, operation, table_name,
change_data, status
)
SELECT id, %s, %s, %s, %s, %s, ‘success’
FROM fgedu_sync_status
WHERE slot_name = %s
“””, (
change[‘lsn’],
change[‘xid’],
operation,
table_name,
json.dumps(change[‘data’]),
self.slot_name
))
cursor.close()
def update_progress(self, lsn, txid):
cursor = self.conn.cursor()
cursor.execute(“””
SELECT fgedu_update_sync_progress(%s, %s, %s)
“””, (self.slot_name, lsn, txid))
cursor.close()
def run(self):
self.connect()
while True:
try:
changes = self.get_changes()
for change in changes:
self.process_change(change)
self.update_progress(change[‘lsn’], change[‘xid’])
if not changes:
time.sleep(1)
except Exception as e:
self.logger.error(f”Sync error: {e}”)
cursor = self.conn.cursor()
cursor.execute(“””
SELECT fgedu_record_sync_error(%s, %s)
“””, (self.slot_name, str(e)))
cursor.close()
time.sleep(5)
if __name__ == ‘__main__’:
config = {
‘host’: ‘192.168.1.100’,
‘port’: 5432,
‘database’: ‘fgedudb’,
‘user’: ‘fgedu_sync’,
‘password’: ‘fgedu_sync_2026’,
‘slot_name’: ‘fgedu_sync_slot’
}
sync = FgeduSync(config)
sync.run()
— 运行同步程序
$ python3 fgedu_sync.py
— 输出结果
2026-04-07 16:00:00 INFO Connected to PostgreSQL
2026-04-07 16:00:01 INFO INSERT fgedu_orders: {‘id’: 1, ‘order_no’: ‘ORD001’, …}
2026-04-07 16:00:02 INFO UPDATE fgedu_products: {‘id’: 1, ‘name’: ‘Product A’, …}
3.3 PostgreSQL数据库同步系统部署
3.3.1 使用Systemd管理同步服务
$ cat /etc/systemd/system/fgedu-sync.service
[Unit]
Description=FGEDU PostgreSQL Sync Service
After=network.target postgresql.service
[Service]
Type=simple
User=postgres
Group=postgres
WorkingDirectory=/opt/fgedu/sync
ExecStart=/usr/bin/python3 /opt/fgedu/sync/fgedu_sync.py
Restart=always
RestartSec=10
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=fgedu-sync
[Install]
WantedBy=multi-user.target
— 启动服务
$ systemctl daemon-reload
$ systemctl enable fgedu-sync
$ systemctl start fgedu-sync
— 输出结果
Created symlink /etc/systemd/system/multi-user.target.wants/fgedu-sync.service → /etc/systemd/system/fgedu-sync.service.
— 查看服务状态
$ systemctl status fgedu-sync
— 输出结果
● fgedu-sync.service – FGEDU PostgreSQL Sync Service
Loaded: loaded (/etc/systemd/system/fgedu-sync.service; enabled; vendor preset: disabled)
Active: active (running) since Mon 2026-04-07 16:00:00 CST; 10s ago
Main PID: 12345 (python3)
Tasks: 2 (limit: 4915)
Memory: 25.6M
CGroup: /system.slice/fgedu-sync.service
└─12345 /usr/bin/python3 /opt/fgedu/sync/fgedu_sync.py
— 查看日志
$ journalctl -u fgedu-sync -f
— 输出结果
Apr 07 16:00:00 fgedu-server fgedu-sync[12345]: Connected to PostgreSQL
Apr 07 16:00:01 fgedu-server fgedu-sync[12345]: INSERT fgedu_orders: {‘id’: 1, …}
Part04-生产案例与实战讲解
4.1 PostgreSQL数据库到MySQL实时同步实战
本案例演示如何实现PostgreSQL到MySQL的实时数据同步。学习交流加群风哥QQ113257174。
— 1. 创建MySQL目标表
CREATE TABLE orders (
id INT PRIMARY KEY,
order_no VARCHAR(50),
customer_id INT,
amount DECIMAL(10,2),
status VARCHAR(20),
created_at DATETIME,
updated_at DATETIME
);
— 2. Python同步程序(MySQL版本)
import mysql.connector
class MySQLSync(FgeduSync):
def __init__(self, pg_config, mysql_config):
super().__init__(pg_config)
self.mysql_config = mysql_config
self.mysql_conn = None
def connect_mysql(self):
self.mysql_conn = mysql.connector.connect(
host=self.mysql_config[‘host’],
port=self.mysql_config[‘port’],
database=self.mysql_config[‘database’],
user=self.mysql_config[‘user’],
password=self.mysql_config[‘password’]
)
self.logger.info(“Connected to MySQL”)
def send_to_target(self, table_name, operation, record=None, keys=None):
if table_name != ‘fgedu_orders’:
return
cursor = self.mysql_conn.cursor()
if operation == ‘insert’:
sql = “””
INSERT INTO orders (id, order_no, customer_id, amount, status, created_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
order_no = VALUES(order_no),
amount = VALUES(amount),
status = VALUES(status)
“””
cursor.execute(sql, (
record[‘id’],
record[‘order_no’],
record[‘customer_id’],
record[‘amount’],
record[‘status’],
record[‘create_time’]
))
elif operation == ‘update’:
sql = “””
UPDATE orders SET
order_no = %s,
customer_id = %s,
amount = %s,
status = %s,
updated_at = %s
WHERE id = %s
“””
cursor.execute(sql, (
record[‘order_no’],
record[‘customer_id’],
record[‘amount’],
record[‘status’],
datetime.now(),
keys[‘keyvalues’][0]
))
elif operation == ‘delete’:
sql = “DELETE FROM orders WHERE id = %s”
cursor.execute(sql, (keys[‘keyvalues’][0],))
self.mysql_conn.commit()
cursor.close()
— 3. 测试同步
— PostgreSQL插入数据
INSERT INTO fgedu_orders(order_no, customer_id, amount, status)
VALUES(‘ORD001’, 1, 1000.00, ‘pending’);
— 输出结果
INSERT 0 1
— 查看MySQL同步结果
SELECT * FROM orders WHERE order_no = ‘ORD001’;
— 输出结果
+—-+———-+————-+———+———+———————+————+
| id | order_no | customer_id | amount | status | created_at | updated_at |
+—-+———-+————-+———+———+———————+————+
| 1 | ORD001 | 1 | 1000.00 | pending | 2026-04-07 16:00:00 | NULL |
+—-+———-+————-+———+———+———————+————+
1 row in set (0.00 sec)
4.2 PostgreSQL数据库到Elasticsearch同步实战
本案例演示如何实现PostgreSQL到Elasticsearch的实时数据同步。更多学习教程公众号风哥教程itpux_com。
— 1. 创建Elasticsearch索引
PUT /fgedu_products
{
“mfgappings”: {
“properties”: {
“id”: { “type”: “integer” },
“name”: { “type”: “text” },
“description”: { “type”: “text” },
“price”: { “type”: “float” },
“category”: { “type”: “keyword” },
“stock”: { “type”: “integer” }
}
}
}
— 2. Python同步程序(Elasticsearch版本)
from elasticsearch import Elasticsearch
class ElasticsearchSync(FgeduSync):
def __init__(self, pg_config, es_config):
super().__init__(pg_config)
self.es_config = es_config
self.es_client = None
def connect_elasticsearch(self):
self.es_client = Elasticsearch([
{‘host’: self.es_config[‘host’], ‘port’: self.es_config[‘port’]}
])
self.logger.info(“Connected to Elasticsearch”)
def send_to_target(self, table_name, operation, record=None, keys=None):
if table_name != ‘fgedu_products’:
return
doc_id = record[‘id’] if record else keys[‘keyvalues’][0]
if operation in (‘insert’, ‘update’):
doc = {
‘id’: record[‘id’],
‘name’: record[‘product_name’],
‘description’: record[‘description’],
‘price’: float(record[‘price’]),
‘category’: record[‘category’],
‘stock’: record[‘stock’]
}
self.es_client.index(
index=’fgedu_products’,
id=doc_id,
body=doc
)
elif operation == ‘delete’:
self.es_client.delete(
index=’fgedu_products’,
id=doc_id
)
— 3. 测试同步
— PostgreSQL插入数据
INSERT INTO fgedu_products(product_name, description, price, category, stock)
VALUES(‘iPhone 15’, ‘Apple iPhone 15’, 7999.00, ‘electronics’, 100);
— 输出结果
INSERT 0 1
— 查看Elasticsearch同步结果
GET /fgedu_products/_search
{
“query”: {
“match”: {
“name”: “iPhone”
}
}
}
— 输出结果
{
“hits”: {
“total”: { “value”: 1 },
“hits”: [
{
“_id”: “1”,
“_source”: {
“id”: 1,
“name”: “iPhone 15”,
“description”: “Apple iPhone 15”,
“price”: 7999.0,
“category”: “electronics”,
“stock”: 100
}
}
]
}
}
4.3 PostgreSQL数据库到Kafka同步实战
本案例演示如何实现PostgreSQL到Kafka的实时数据同步。from PostgreSQL视频:www.itpux.com。
— 1. 创建Kafka Topic
$ kafka-topics.sh –create \
–bootstrap-server 192.168.1.202:9092 \
–topic fgedu_changes \
–partitions 3 \
–replication-factor 2
— 输出结果
Created topic fgedu_changes.
— 2. Python同步程序(Kafka版本)
from kafka import KafkaProducer
import json
class KafkaSync(FgeduSync):
def __init__(self, pg_config, kafka_config):
super().__init__(pg_config)
self.kafka_config = kafka_config
self.producer = None
def connect_kafka(self):
self.producer = KafkaProducer(
bootstrap_servers=[f”{self.kafka_config[‘host’]}:{self.kafka_config[‘port’]}”],
value_serializer=lambda v: json.dumps(v).encode(‘utf-8’)
)
self.logger.info(“Connected to Kafka”)
def send_to_target(self, table_name, operation, record=None, keys=None):
message = {
‘table’: table_name,
‘operation’: operation,
‘data’: record,
‘keys’: keys,
‘timestamp’: datetime.now().isoformat()
}
self.producer.send(
self.kafka_config[‘topic’],
key=table_name.encode(‘utf-8’),
value=message
)
self.producer.flush()
— 3. 测试同步
— PostgreSQL插入数据
INSERT INTO fgedu_customers(name, email, phone)
VALUES(‘John Doe’, ‘john@fgedu.net.cn’, ‘13800138000’);
— 输出结果
INSERT 0 1
— 消费Kafka消息
$ kafka-console-consumer.sh \
–bootstrap-server 192.168.1.202:9092 \
–topic fgedu_changes \
–from-beginning
— 输出结果
{“table”: “fgedu_customers”, “operation”: “insert”, “data”: {“id”: 1, “name”: “John Doe”, “email”: “john@fgedu.net.cn”, “phone”: “13800138000”}, “keys”: null, “timestamp”: “2026-04-07T16:00:00.123456”}
— 4. 监控同步状态
SELECT
s.slot_name,
s.status,
s.total_changes,
s.last_sync_time,
pg_wal_lsn_diff(pg_current_wal_lsn(), s.last_lsn::pg_lsn) as lag_bytes
FROM fgedu_sync_status s;
— 输出结果
slot_name | status | total_changes | last_sync_time | lag_bytes
——————+——–+—————+————————-+———–
fgedu_sync_slot | active | 1250 | 2026-04-07 16:05:00.123 | 1024
(1 row)
Part05-风哥经验总结与分享
5.1 PostgreSQL数据库实时同步最佳实践
实时同步最佳实践:使用成熟的同步工具(Debezium、DataX);建立完善的监控机制;实现故障自动恢复;定期验证数据一致性;合理设置同步延迟告警。
- 配置wal_level为logical
- 创建专用同步用户
- 创建复制槽
- 部署同步程序
- 配置监控告警
- 测试故障恢复
- 验证数据一致性
5.2 PostgreSQL数据库同步问题排查技巧
问题排查技巧:检查复制槽状态;查看同步日志;检查网络连接;验证目标系统状态;检查数据格式兼容性。
5.3 PostgreSQL数据库同步常见问题
常见问题:复制槽堆积、同步延迟过大、数据格式不兼容、DDL变更导致同步失败、网络中断导致同步中断。
— 问题1:复制槽堆积
SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as backlog
FROM pg_replication_slots;
— 输出结果
slot_name | backlog
——————+———
fgedu_sync_slot | 50 GB
(1 row)
— 解决方案:检查同步程序状态
$ systemctl status fgedu-sync
— 问题2:同步延迟过大
SELECT
slot_name,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) as lag_bytes
FROM pg_replication_slots;
— 输出结果
slot_name | lag_bytes
——————+———–
fgedu_sync_slot | 1073741824
(1 row)
— 解决方案:优化同步程序性能
— 问题3:数据格式不兼容
— 查看同步日志
SELECT * FROM fgedu_sync_log
WHERE status = ‘error’
ORDER BY sync_time DESC
LIMIT 10;
— 输出结果
id | sync_id | lsn | txid | operation | table_name | status | error_message
—-+———+———–+——+———–+—————-+——–+—————————-
1 | 1 | 0/1A2B3C8 | 1234 | insert | fgedu_orders | error | Data type not supported
(1 row)
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
