1. 首页 > PostgreSQL教程 > 正文

PostgreSQL教程FG175-PG逻辑解码:基础配置与数据解析

本文档风哥主要介绍PostgreSQL数据库逻辑解码的基础配置与数据解析,包括逻辑解码概念、复制槽管理、数据变更捕获、CDC实现等内容,风哥教程参考PostgreSQL官方文档Logical Decoding、Replication Origins等内容,适合数据库开发人员和运维人员在生产环境中使用。

Part01-基础概念与理论知识

1.1 PostgreSQL数据库逻辑解码概述

逻辑解码(Logical Decoding)是PostgreSQL提供的一种数据变更捕获机制,通过解析WAL日志提取数据变更信息。更多视频教程www.fgedu.net.cn。与物理复制不同,逻辑解码以逻辑形式(如INSERT、UPDATE、DELETE语句)输出数据变更,便于应用程序消费和处理。逻辑解码是实现CDC(Change Data Capture)、数据同步、审计日志等功能的基础。

PostgreSQL数据库逻辑解码特点:

  • 基于WAL日志,对主库性能影响小
  • 输出格式灵活,支持多种输出插件
  • 支持增量数据捕获,减少数据传输量
  • 支持过滤表和列,灵活控制输出内容
  • 支持复制槽,保证数据不丢失

1.2 PostgreSQL数据库逻辑解码组件

逻辑解码的核心组件包括:WAL日志(记录所有数据变更)、逻辑复制槽(记录消费进度)、输出插件(解析WAL并输出)、复制源(标识复制来源)。学习交流加群风哥微信: itpux-com。WAL日志是逻辑解码的数据源,逻辑复制槽确保数据不丢失,输出插件决定输出格式。

1.3 PostgreSQL数据库逻辑解码插件

PostgreSQL内置pgoutput输出插件,支持标准逻辑复制协议。常用第三方插件包括:wal2json(输出JSON格式)、pglogical(支持双向复制)、Debezium(支持Kafka集成)、decoderbufs(支持Protobuf格式)。

— 查看可用的逻辑解码输出插件
SELECT * FROM pg_available_extensions WHERE name LIKE ‘%logical%’ OR name LIKE ‘%wal2json%’;

— 输出结果
name | default_version | installed_version | comment
————+—————–+——————-+————————————————————
pglogical | 2.4.2 | | PostgreSQL Logical Replication
wal2json | 2.5 | 2.5 | Logical decoding output plugin to format data as JSON
decoderbufs| 1.9.7 | | Logical decoding output plugin to format data as Protobuf
(3 rows)

— 安装wal2json插件
CREATE EXTENSION wal2json;

— 输出结果
CREATE EXTENSION

Part02-生产环境规划与建议

2.1 PostgreSQL数据库逻辑解码架构设计

逻辑解码架构设计要点:选择合适的输出插件满足业务需求;设计合理的复制槽管理策略;规划数据消费和处理流程;考虑故障恢复和数据一致性。

2.2 PostgreSQL数据库逻辑解码配置优化

关键配置参数:wal_level必须设置为logical;max_replication_slots设置复制槽数量;max_wal_senders设置WAL发送进程数;wal_sender_timeout设置发送超时时间。

— 查看逻辑解码相关配置
SELECT name, setting, unit, short_desc
FROM pg_settings
WHERE name IN (‘wal_level’, ‘max_replication_slots’, ‘max_wal_senders’,
‘wal_sender_timeout’, ‘wal_keep_size’);

— 输出结果
name | setting | unit | short_desc
——————–+———+——+—————————————————
wal_level | logical | | Set the level of information written to the WAL.
max_replication_slots | 10 | | Maximum number of replication slots.
max_wal_senders | 10 | | Maximum number of concurrent WAL sender processes.
wal_sender_timeout | 60000 | ms | Maximum time to wait for WAL replication.
wal_keep_size | 0 | MB | Minimum size of WAL files to keep.
(5 rows)

— 修改配置(需要重启)
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 20;
ALTER SYSTEM SET max_wal_senders = 20;

— 重启PostgreSQL
$ systemctl restart postgresql-18

— 输出结果
Job for postgresql-18.service canceled.

2.3 PostgreSQL数据库逻辑解码监控方案

监控要点:监控复制槽状态,避免槽堆积导致WAL膨胀;监控消费进度,及时发现消费延迟;监控WAL文件数量,避免磁盘空间耗尽;监控输出插件性能。

风哥提示:逻辑解码是PostgreSQL的高级特性,使用不当可能导致WAL文件堆积、磁盘空间耗尽等严重问题。建议在生产环境中建立完善的监控机制,定期检查复制槽状态和消费进度。

Part03-生产环境项目实施方案

3.1 PostgreSQL数据库逻辑解码基础配置

3.1.1 创建逻辑复制槽

— 创建逻辑复制槽
SELECT pg_create_logical_replication_slot(‘fgedu_slot’, ‘wal2json’);

— 输出结果
slot_name | lsn
—————–+———–
fgedu_slot | 0/1A2B3C8
(1 row)

— 查看复制槽信息
SELECT
slot_name,
slot_type,
plugin,
active,
restart_lsn,
confirmed_flush_lsn
FROM pg_replication_slots
WHERE slot_name = ‘fgedu_slot’;

— 输出结果
slot_name | slot_type | plugin | active | restart_lsn | confirmed_flush_lsn
————-+———–+———+——–+————-+——————–
fgedu_slot | logical | wal2json| f | 0/1A2B3C8 | 0/1A2B3C8
(1 row)

— 创建临时复制槽(会话结束后自动删除)
SELECT pg_create_logical_replication_slot(‘fgedu_temp_slot’, ‘pgoutput’, true);

— 输出结果
slot_name | lsn
—————–+———–
fgedu_temp_slot | 0/1A2B4D0
(1 row)

3.1.2 配置逻辑解码权限

— 创建逻辑复制用户
CREATE USER fgedu_logical REPLICATION LOGIN PASSWORD ‘fgedu123’;

— 输出结果
CREATE ROLE

— 授权
GRANT CONNECT ON DATABASE fgedudb TO fgedu_logical;
GRANT USAGE ON SCHEMA public TO fgedu_logical;

— 配置pg_hba.conf允许复制连接
— 添加以下行到pg_hba.conf
host replication fgedu_logical 192.168.1.0/24 md5

— 重新加载配置
SELECT pg_reload_conf();

— 输出结果
pg_reload_conf
—————-
t
(1 row)

3.2 PostgreSQL数据库复制槽管理

3.2.1 查看复制槽状态

— 查看所有复制槽
SELECT
slot_name,
slot_type,
plugin,
database,
active,
active_pid,
xmin,
catalog_xmin,
restart_lsn,
confirmed_flush_lsn,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) as lag_bytes
FROM pg_replication_slots;

— 输出结果
slot_name | slot_type | plugin | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | lag_bytes
————–+———–+———+———-+——–+————+——+————–+————-+———————+———–
fgedu_slot | logical | wal2json| fgedudb | f | NULL | | 12345 | 0/1A2B3C8 | 0/1A2B3C8 | 0
fgedu_slot2 | logical | pgoutput| fgedudb | t | 5678 | | 12346 | 0/1A2B4D0 | 0/1A2B4D0 | 1024
(2 rows)

— 监控WAL堆积情况
SELECT
slot_name,
active,
restart_lsn,
confirmed_flush_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as wal_backlog,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmed_backlog
FROM pg_replication_slots;

— 输出结果
slot_name | active | restart_lsn | confirmed_flush_lsn | wal_backlog | confirmed_backlog
————-+——–+————-+———————+————-+——————-
fgedu_slot | f | 0/1A2B3C8 | 0/1A2B3C8 | 128 MB | 128 MB
fgedu_slot2 | t | 0/1A2B4D0 | 0/1A2B4D0 | 0 bytes | 0 bytes
(2 rows)

3.2.2 删除复制槽

— 删除复制槽
SELECT pg_drop_replication_slot(‘fgedu_slot’);

— 输出结果
pg_drop_replication_slot
————————–

(1 row)

— 强制删除活跃的复制槽(需要先终止连接)
SELECT pg_terminate_backend(active_pid), pg_drop_replication_slot(‘fgedu_slot2’)
FROM pg_replication_slots
WHERE slot_name = ‘fgedu_slot2’ AND active;

— 输出结果
pg_terminate_backend | pg_drop_replication_slot
———————-+————————–
t |
(1 row)

3.3 PostgreSQL数据库数据变更解析

3.3.1 使用pg_logical_slot_get_changes获取变更

— 创建测试表
CREATE TABLE fgedu_test_decode (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
value NUMERIC,
create_time TIMESTAMP DEFAULT NOW()
);

— 插入测试数据
INSERT INTO fgedu_test_decode(name, value) VALUES(‘test1’, 100);
INSERT INTO fgedu_test_decode(name, value) VALUES(‘test2’, 200);

— 输出结果
INSERT 0 1
INSERT 0 1

— 创建复制槽
SELECT pg_create_logical_replication_slot(‘fgedu_decode_slot’, ‘wal2json’);

— 输出结果
slot_name | lsn
——————–+———–
fgedu_decode_slot | 0/1A2B5E0
(1 row)

— 更新数据
UPDATE fgedu_test_decode SET value = 150 WHERE name = ‘test1’;

— 输出结果
UPDATE 1

— 删除数据
DELETE FROM fgedu_test_decode WHERE name = ‘test2’;

— 输出结果
DELETE 1

— 获取变更数据
SELECT lsn, xid, data
FROM pg_logical_slot_get_changes(‘fgedu_decode_slot’, NULL, NULL);

— 输出结果
lsn | xid | data
————+——-+————————————————————————————————————
0/1A2B5E8 | 12347 | {“change”:[{“kind”:”insert”,”schema”:”public”,”table”:”fgedu_test_decode”,”columnnames”:[“id”,”name”,”value”,”create_time”],”columnvalues”:[1,”test1″,100,”2026-04-07 16:30:00.123456″],”columntypes”:[“integer”,”character varying(100)”,”numeric”,”timestamp without time zone”]}]}
0/1A2B6F0 | 12348 | {“change”:[{“kind”:”insert”,”schema”:”public”,”table”:”fgedu_test_decode”,”columnnames”:[“id”,”name”,”value”,”create_time”],”columnvalues”:[2,”test2″,200,”2026-04-07 16:30:00.234567″],”columntypes”:[“integer”,”character varying(100)”,”numeric”,”timestamp without time zone”]}]}
0/1A2B7F8 | 12349 | {“change”:[{“kind”:”update”,”schema”:”public”,”table”:”fgedu_test_decode”,”columnnames”:[“id”,”name”,”value”,”create_time”],”columnvalues”:[1,”test1″,150,”2026-04-07 16:30:00.123456″],”oldkeys”:{“keynames”:[“id”],”keyvalues”:[1],”keytypes”:[“integer”]}}]}
0/1A2B900 | 12350 | {“change”:[{“kind”:”delete”,”schema”:”public”,”table”:”fgedu_test_decode”,”oldkeys”:{“keynames”:[“id”],”keyvalues”:[2],”keytypes”:[“integer”]}}]}
(4 rows)

— 使用pg_logical_slot_peek_changes预览变更(不移动进度)
SELECT lsn, xid, data
FROM pg_logical_slot_peek_changes(‘fgedu_decode_slot’, NULL, NULL);

— 输出结果
(0 rows)

3.3.2 使用pgoutput插件

— 创建使用pgoutput的复制槽
SELECT pg_create_logical_replication_slot(‘fgedu_pgoutput_slot’, ‘pgoutput’);

— 输出结果
slot_name | lsn
———————-+———–
fgedu_pgoutput_slot | 0/1A2BA00
(1 row)

— 插入数据
INSERT INTO fgedu_test_decode(name, value) VALUES(‘test3’, 300);

— 输出结果
INSERT 0 1

— 使用pgoutput获取变更(需要指定参数)
SELECT lsn, xid, data
FROM pg_logical_slot_get_changes(
‘fgedu_pgoutput_slot’,
NULL,
NULL,
‘proto_version’, ‘1’,
‘publication_names’, ‘fgedu_pub’
);

— 输出结果
ERROR: publication “fgedu_pub” does not exist

— 创建发布
CREATE PUBLICATION fgedu_pub FOR TABLE fgedu_test_decode;

— 输出结果
CREATE PUBLICATION

— 再次获取变更
SELECT lsn, xid, data
FROM pg_logical_slot_get_changes(
‘fgedu_pgoutput_slot’,
NULL,
NULL,
‘proto_version’, ‘1’,
‘publication_names’, ‘fgedu_pub’
);

— 输出结果
lsn | xid | data
————+——-+———-
0/1A2BB08 | 12351 | B: public.fgedu_test_decode
0/1A2BB10 | 12351 | I: public.fgedu_test_decode[3][test3][300][2026-04-07 16:35:00.123456]
(2 rows)

Part04-生产案例与实战讲解

4.1 PostgreSQL数据库CDC数据捕获实战

本案例演示如何使用逻辑解码实现CDC(Change Data Capture)数据捕获。学习交流加群风哥QQ113257174。

— 创建CDC配置表
CREATE TABLE fgedu_cdc_config (
id SERIAL PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
schema_name VARCHAR(100) DEFAULT ‘public’,
operation_types VARCHAR(50) DEFAULT ‘INSERT,UPDATE,DELETE’,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);

— 创建CDC变更记录表
CREATE TABLE fgedu_cdc_changes (
id SERIAL PRIMARY KEY,
lsn TEXT,
xid BIGINT,
schema_name VARCHAR(100),
table_name VARCHAR(100),
operation VARCHAR(20),
old_data JSONB,
new_data JSONB,
change_time TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_cdc_changes_table ON fgedu_cdc_changes(table_name, change_time);
CREATE INDEX idx_cdc_changes_time ON fgedu_cdc_changes(change_time);

— 插入CDC配置
INSERT INTO fgedu_cdc_config(table_name, schema_name)
VALUES
(‘fgedu_orders’, ‘public’),
(‘fgedu_products’, ‘public’),
(‘fgedu_customers’, ‘public’);

— 创建CDC捕获函数
CREATE OR REPLACE FUNCTION fgedu_cdc_capture()
RETURNS void
AS $$
DECLARE
change_record RECORD;
change_data JSONB;
table_name TEXT;
schema_name TEXT;
operation TEXT;
BEGIN
FOR change_record IN
SELECT lsn, xid, data::jsonb
FROM pg_logical_slot_get_changes(‘fgedu_cdc_slot’, NULL, NULL)
LOOP
change_data := change_record.data;

IF change_data ? ‘change’ THEN
FOR i IN 0..jsonb_array_length(change_data->’change’) – 1 LOOP
table_name := change_data->’change’->i->>’table’;
schema_name := change_data->’change’->i->>’schema’;
operation := UPPER(change_data->’change’->i->>’kind’);

INSERT INTO fgedu_cdc_changes(
lsn, xid, schema_name, table_name, operation,
old_data, new_data
)
SELECT
change_record.lsn,
change_record.xid,
schema_name,
table_name,
operation,
CASE
WHEN operation = ‘DELETE’ THEN
jsonb_build_object(
‘keys’, change_data->’change’->i->’oldkeys’
)
WHEN operation = ‘UPDATE’ THEN
jsonb_build_object(
‘keys’, change_data->’change’->i->’oldkeys’
)
ELSE NULL
END,
CASE
WHEN operation IN (‘INSERT’, ‘UPDATE’) THEN
jsonb_object_agg(
col_name, col_value
)
ELSE NULL
END
FROM
jsonb_array_elements_text(
change_data->’change’->i->’columnnames’
) WITH ORDINALITY AS t(col_name, idx)
JOIN
jsonb_array_elements_text(
change_data->’change’->i->’columnvalues’
) WITH ORDINALITY AS v(col_value, idx)
ON t.idx = v.idx;
END LOOP;
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;

— 创建CDC复制槽
SELECT pg_create_logical_replication_slot(‘fgedu_cdc_slot’, ‘wal2json’);

— 测试CDC捕获
INSERT INTO fgedu_orders(order_no, customer_id, amount)
VALUES(‘ORD001’, 1, 1000);

— 输出结果
INSERT 0 1

— 执行CDC捕获
SELECT fgedu_cdc_capture();

— 查看CDC变更记录
SELECT * FROM fgedu_cdc_changes ORDER BY change_time DESC LIMIT 5;

— 输出结果
id | lsn | xid | schema_name | table_name | operation | old_data | new_data | change_time
—-+————+——-+————-+—————+———–+———-+——————————–+—————————-
1 | 0/1A2BC00 | 12352 | public | fgedu_orders | INSERT | NULL | {“order_no”: “ORD001”, …} | 2026-04-07 16:40:00.123456
(1 row)

4.2 PostgreSQL数据库数据同步实战

本案例演示如何使用逻辑解码实现数据同步。更多学习教程公众号风哥教程itpux_com。

— 创建同步状态表
CREATE TABLE fgedu_sync_status (
id SERIAL PRIMARY KEY,
source_slot VARCHAR(100),
target_host VARCHAR(100),
target_port INTEGER DEFAULT 5432,
target_db VARCHAR(100),
last_sync_lsn TEXT,
last_sync_time TIMESTAMP,
sync_count BIGINT DEFAULT 0,
status VARCHAR(20) DEFAULT ‘active’
);

— 创建同步日志表
CREATE TABLE fgedu_sync_log (
id SERIAL PRIMARY KEY,
sync_id INTEGER REFERENCES fgedu_sync_status(id),
lsn TEXT,
operation VARCHAR(20),
table_name VARCHAR(100),
record_count INTEGER,
sync_time TIMESTAMP DEFAULT NOW(),
error_message TEXT
);

— 数据同步处理函数
CREATE OR REPLACE FUNCTION fgedu_sync_process()
RETURNS void
AS $$
DECLARE
change_record RECORD;
change_data JSONB;
table_name TEXT;
operation TEXT;
sql_text TEXT;
BEGIN
FOR change_record IN
SELECT lsn, xid, data::jsonb
FROM pg_logical_slot_get_changes(‘fgedu_sync_slot’, NULL, NULL)
LOOP
change_data := change_record.data;

IF change_data ? ‘change’ THEN
FOR i IN 0..jsonb_array_length(change_data->’change’) – 1 LOOP
table_name := change_data->’change’->i->>’table’;
operation := UPPER(change_data->’change’->i->>’kind’);

— 构建同步SQL
CASE operation
WHEN ‘INSERT’ THEN
sql_text := format(
‘INSERT INTO %s (%s) VALUES (%s)’,
table_name,
(
SELECT string_agg(quote_ident(col), ‘,’)
FROM jsonb_array_elements_text(
change_data->’change’->i->’columnnames’
) AS col
),
(
SELECT string_agg(quote_literal(val), ‘,’)
FROM jsonb_array_elements_text(
change_data->’change’->i->’columnvalues’
) AS val
)
);
WHEN ‘UPDATE’ THEN
sql_text := format(
‘UPDATE %s SET %s WHERE %s’,
table_name,
— SET子句
(
SELECT string_agg(
quote_ident(col) || ‘=’ || quote_literal(val),
‘,’
)
FROM
jsonb_array_elements_text(
change_data->’change’->i->’columnnames’
) WITH ORDINALITY AS t(col, idx)
JOIN
jsonb_array_elements_text(
change_data->’change’->i->’columnvalues’
) WITH ORDINALITY AS v(val, idx)
ON t.idx = v.idx
),
— WHERE子句
(
SELECT string_agg(
quote_ident(key) || ‘=’ || quote_literal(val),
‘ AND ‘
)
FROM
jsonb_array_elements_text(
change_data->’change’->i->’oldkeys’->’keynames’
) WITH ORDINALITY AS k(key, idx)
JOIN
jsonb_array_elements_text(
change_data->’change’->i->’oldkeys’->’keyvalues’
) WITH ORDINALITY AS v(val, idx)
ON k.idx = v.idx
)
);
WHEN ‘DELETE’ THEN
sql_text := format(
‘DELETE FROM %s WHERE %s’,
table_name,
(
SELECT string_agg(
quote_ident(key) || ‘=’ || quote_literal(val),
‘ AND ‘
)
FROM
jsonb_array_elements_text(
change_data->’change’->i->’oldkeys’->’keynames’
) WITH ORDINALITY AS k(key, idx)
JOIN
jsonb_array_elements_text(
change_data->’change’->i->’oldkeys’->’keyvalues’
) WITH ORDINALITY AS v(val, idx)
ON k.idx = v.idx
)
);
END CASE;

— 记录同步日志
INSERT INTO fgedu_sync_log(sync_id, lsn, operation, table_name, record_count)
VALUES(1, change_record.lsn, operation, table_name, 1);

— 这里可以执行远程同步
— 使用dblink或外部数据包装器执行sql_text
END LOOP;
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;

— 创建同步复制槽
SELECT pg_create_logical_replication_slot(‘fgedu_sync_slot’, ‘wal2json’);

— 测试同步
INSERT INTO fgedu_products(product_name, price, stock)
VALUES(‘Product A’, 99.99, 100);

— 输出结果
INSERT 0 1

— 执行同步处理
SELECT fgedu_sync_process();

— 查看同步日志
SELECT * FROM fgedu_sync_log ORDER BY sync_time DESC LIMIT 5;

— 输出结果
id | sync_id | lsn | operation | table_name | record_count | sync_time
—-+———+————+———–+—————–+————–+————————-
1 | 1 | 0/1A2BD00 | INSERT | fgedu_products | 1 | 2026-04-07 16:45:00.123
(1 row)

4.3 PostgreSQL数据库审计日志实战

本案例演示如何使用逻辑解码实现审计日志功能。from PostgreSQL视频:www.itpux.com。

— 创建审计日志表
CREATE TABLE fgedu_audit_log (
id SERIAL PRIMARY KEY,
lsn TEXT,
xid BIGINT,
schema_name VARCHAR(100),
table_name VARCHAR(100),
operation VARCHAR(20),
user_name VARCHAR(100),
old_values JSONB,
new_values JSONB,
audit_time TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_audit_log_table ON fgedu_audit_log(table_name, audit_time);
CREATE INDEX idx_audit_log_time ON fgedu_audit_log(audit_time);

— 创建审计配置表
CREATE TABLE fgedu_audit_config (
id SERIAL PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
schema_name VARCHAR(100) DEFAULT ‘public’,
log_insert BOOLEAN DEFAULT TRUE,
log_update BOOLEAN DEFAULT TRUE,
log_delete BOOLEAN DEFAULT TRUE,
enabled BOOLEAN DEFAULT TRUE
);

— 插入审计配置
INSERT INTO fgedu_audit_config(table_name, schema_name)
VALUES
(‘fgedu_accounts’, ‘public’),
(‘fgedu_transactions’, ‘public’);

— 创建审计捕获函数
CREATE OR REPLACE FUNCTION fgedu_audit_capture()
RETURNS void
AS $$
DECLARE
change_record RECORD;
change_data JSONB;
table_name TEXT;
schema_name TEXT;
operation TEXT;
audit_config RECORD;
BEGIN
FOR change_record IN
SELECT lsn, xid, data::jsonb
FROM pg_logical_slot_get_changes(‘fgedu_audit_slot’, NULL, NULL)
LOOP
change_data := change_record.data;

IF change_data ? ‘change’ THEN
FOR i IN 0..jsonb_array_length(change_data->’change’) – 1 LOOP
table_name := change_data->’change’->i->>’table’;
schema_name := change_data->’change’->i->>’schema’;
operation := UPPER(change_data->’change’->i->>’kind’);

— 检查是否需要审计
SELECT * INTO audit_config
FROM fgedu_audit_config
WHERE table_name = table_name
AND schema_name = schema_name
AND enabled = true;

IF FOUND THEN
IF (operation = ‘INSERT’ AND audit_config.log_insert)
OR (operation = ‘UPDATE’ AND audit_config.log_update)
OR (operation = ‘DELETE’ AND audit_config.log_delete) THEN

INSERT INTO fgedu_audit_log(
lsn, xid, schema_name, table_name, operation,
user_name, old_values, new_values
)
VALUES(
change_record.lsn,
change_record.xid,
schema_name,
table_name,
operation,
current_user,
CASE
WHEN operation IN (‘UPDATE’, ‘DELETE’) THEN
change_data->’change’->i->’oldkeys’
ELSE NULL
END,
CASE
WHEN operation IN (‘INSERT’, ‘UPDATE’) THEN
(
SELECT jsonb_object_agg(col, val)
FROM
jsonb_array_elements_text(
change_data->’change’->i->’columnnames’
) WITH ORDINALITY AS t(col, idx)
JOIN
jsonb_array_elements_text(
change_data->’change’->i->’columnvalues’
) WITH ORDINALITY AS v(val, idx)
ON t.idx = v.idx
)
ELSE NULL
END
);
END IF;
END IF;
END LOOP;
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;

— 创建审计复制槽
SELECT pg_create_logical_replication_slot(‘fgedu_audit_slot’, ‘wal2json’);

— 测试审计
UPDATE fgedu_accounts SET balance = balance + 100 WHERE account_id = 1;

— 输出结果
UPDATE 1

— 执行审计捕获
SELECT fgedu_audit_capture();

— 查看审计日志
SELECT * FROM fgedu_audit_log ORDER BY audit_time DESC LIMIT 5;

— 输出结果
id | lsn | xid | schema_name | table_name | operation | user_name | old_values | new_values | audit_time
—-+————+——-+————-+—————–+———–+———–+——————————-+———————————-+—————————-
1 | 0/1A2BE00 | 12353 | public | fgedu_accounts | UPDATE | fgedu | {“keynames”: [“account_id”], | {“account_id”: 1, “balance”: | 2026-04-07 16:50:00.123456
| | | | | | | “keyvalues”: [1]} | 1100, …} |
(1 row)

Part05-风哥经验总结与分享

5.1 PostgreSQL数据库逻辑解码最佳实践

逻辑解码最佳实践:及时消费变更数据,避免WAL堆积;监控复制槽状态,定期检查消费进度;合理设置wal_keep_size,保留足够的WAL文件;使用pgoutput插件获得更好的性能;定期清理不需要的复制槽。

逻辑解码生产环境检查清单:

  • wal_level设置为logical
  • max_replication_slots足够大
  • 监控复制槽消费进度
  • 监控WAL文件数量
  • 定期检查磁盘空间
  • 建立消费失败告警
  • 定期清理过期复制槽

5.2 PostgreSQL数据库逻辑解码调试技巧

调试技巧:使用pg_logical_slot_peek_changes预览变更而不移动进度;检查pg_replication_slots视图了解槽状态;查看pg_wal目录了解WAL文件情况;使用pg_walfile_name_offset转换LSN。

5.3 PostgreSQL数据库逻辑解码常见问题

常见问题:复制槽堆积导致磁盘空间耗尽;消费程序崩溃导致数据丢失;输出插件解析失败;大事务导致内存溢出。

— 常见问题排查

— 问题1:WAL文件堆积
— 检查复制槽状态
SELECT slot_name, active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as backlog
FROM pg_replication_slots;

— 输出结果
slot_name | active | backlog
————-+——–+———
fgedu_slot | f | 50 GB
(1 row)

— 解决方案:删除不活跃的复制槽
SELECT pg_drop_replication_slot(‘fgedu_slot’);

— 问题2:消费进度落后
— 查看消费进度
SELECT slot_name, confirmed_flush_lsn,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) as lag_bytes
FROM pg_replication_slots;

— 输出结果
slot_name | confirmed_flush_lsn | lag_bytes
————–+———————+———–
fgedu_slot | 0/1A2B0000 | 1073741824
(1 row)

— 问题3:大事务处理
— 检查当前大事务
SELECT pid, usename, state,
pg_wal_lsn_diff(pg_current_wal_lsn(), write_lsn) as lag_bytes
FROM pg_stat_replication;

— 输出结果
pid | usename | state | lag_bytes
——+———+——–+———–
5678 | fgedu | active | 536870912
(1 row)

风哥提示:逻辑解码是实现CDC、数据同步、审计日志等功能的核心技术。生产环境中要特别注意复制槽管理,避免WAL堆积导致磁盘空间耗尽。建议建立完善的监控机制,及时发现和处理异常情况。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息