1. 首页 > PostgreSQL教程 > 正文

PostgreSQL教程FG284-PG扩展开发实战:自定义业务插件落地

本文档风哥主要介绍PostgreSQL数据库自定义业务插件的开发实战,包括审计插件、加密插件、监控插件等实际业务插件的开发过程,风哥教程参考PostgreSQL官方文档Extension Building、Writing Procedural Languages等内容,适合高级开发人员在生产环境中开发业务定制插件。

Part01-基础概念与理论知识

1.1 PostgreSQL数据库业务插件概述

业务插件是为满足特定业务需求而开发的PostgreSQL扩展。更多视频教程www.fgedu.net.cn。通过自定义业务插件,可以在数据库层面实现业务逻辑封装、数据审计、安全控制、性能监控等功能,提高系统的可维护性和安全性。

PostgreSQL数据库业务插件应用场景:

  • 数据审计:记录数据变更历史
  • 数据加密:敏感数据加密存储
  • 性能监控:收集性能指标
  • 业务逻辑:封装复杂业务规则
  • 数据校验:实现自定义校验规则
  • 系统集成:对接外部系统

1.2 PostgreSQL数据库插件架构设计

插件架构包括:扩展控制文件(.control)、SQL脚本文件(.sql)、C源代码文件(.c)、Makefile构建文件。学习交流加群风哥微信: itpux-com。良好的架构设计是插件成功的基础,需要考虑模块化、可配置性、可扩展性等因素。

1.3 PostgreSQL数据库插件核心组件

核心组件包括:初始化函数(_PG_init)、钩子函数(Hook)、回调函数(Callback)、配置参数(GUC)、共享内存(Shared Memory)、后台工作进程(Background Worker)。

Part02-生产环境规划与建议

2.1 PostgreSQL数据库插件设计原则

插件设计原则:单一职责,每个插件只做一件事;可配置性,支持参数化配置;可扩展性,易于添加新功能;性能优先,避免影响系统性能;安全可靠,完善的错误处理。

2.2 PostgreSQL数据库插件安全考虑

安全考虑:输入验证,防止注入攻击;权限控制,最小权限原则;数据保护,敏感数据加密;审计日志,记录关键操作;错误处理,避免信息泄露。

2.3 PostgreSQL数据库插件性能优化

性能优化:减少内存分配;使用缓存机制;批量处理数据;异步处理非关键操作;避免锁竞争。

风哥提示:业务插件开发需要深入理解PostgreSQL内部机制。建议参考contrib模块中的优秀插件源代码,学习最佳实践。生产环境中要充分测试,确保插件的稳定性和安全性。

Part03-生产环境项目实施方案

3.1 PostgreSQL数据库审计插件开发

3.1.1 审计插件架构设计

— 审计插件开发

— 创建审计日志表
CREATE TABLE fgedu_audit_log (
id BIGSERIAL PRIMARY KEY,
event_time TIMESTAMP DEFAULT NOW(),
event_type VARCHAR(50) NOT NULL,
schema_name VARCHAR(100),
table_name VARCHAR(100),
operation VARCHAR(20) NOT NULL,
old_data JSONB,
new_data JSONB,
user_name VARCHAR(100) DEFAULT CURRENT_USER,
client_addr INET DEFAULT inet_client_addr(),
fgapplication_name VARCHAR(100) DEFAULT current_setting(‘fgapplication_name’),
query_text TEXT
);

— 输出结果
CREATE TABLE

— 创建索引
CREATE INDEX idx_audit_log_time ON fgedu_audit_log(event_time);
CREATE INDEX idx_audit_log_table ON fgedu_audit_log(schema_name, table_name);
CREATE INDEX idx_audit_log_user ON fgedu_audit_log(user_name);

— 输出结果
CREATE INDEX
CREATE INDEX
CREATE INDEX

— 创建审计配置表
CREATE TABLE fgedu_audit_config (
id SERIAL PRIMARY KEY,
schema_name VARCHAR(100) NOT NULL,
table_name VARCHAR(100) NOT NULL,
operations TEXT[] NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(schema_name, table_name)
);

— 输出结果
CREATE TABLE

— 插入审计配置
INSERT INTO fgedu_audit_config(schema_name, table_name, operations)
VALUES
(‘public’, ‘fgedu_orders’, ARRAY[‘INSERT’, ‘UPDATE’, ‘DELETE’]),
(‘public’, ‘fgedu_customers’, ARRAY[‘INSERT’, ‘UPDATE’, ‘DELETE’]),
(‘public’, ‘fgedu_products’, ARRAY[‘UPDATE’, ‘DELETE’]);

— 输出结果
INSERT 0 3

— 创建审计触发器函数
CREATE OR REPLACE FUNCTION fgedu_audit_trigger()
RETURNS TRIGGER
AS $$
DECLARE
config RECORD;
should_audit BOOLEAN := FALSE;
BEGIN
SELECT * INTO config
FROM fgedu_audit_config
WHERE schema_name = TG_TABLE_SCHEMA
AND table_name = TG_TABLE_NAME
AND enabled = TRUE;

IF NOT FOUND THEN
RETURN NEW;
END IF;

IF TG_OP = ‘INSERT’ AND ‘INSERT’ = ANY(config.operations) THEN
should_audit := TRUE;
ELSIF TG_OP = ‘UPDATE’ AND ‘UPDATE’ = ANY(config.operations) THEN
should_audit := TRUE;
ELSIF TG_OP = ‘DELETE’ AND ‘DELETE’ = ANY(config.operations) THEN
should_audit := TRUE;
END IF;

IF should_audit THEN
INSERT INTO fgedu_audit_log(
event_type,
schema_name,
table_name,
operation,
old_data,
new_data,
query_text
)
VALUES(
‘DML’,
TG_TABLE_SCHEMA,
TG_TABLE_NAME,
TG_OP,
CASE WHEN TG_OP IN (‘UPDATE’, ‘DELETE’) THEN to_jsonb(OLD) END,
CASE WHEN TG_OP IN (‘INSERT’, ‘UPDATE’) THEN to_jsonb(NEW) END,
current_query()
);
END IF;

RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

— 输出结果
CREATE FUNCTION

— 为表添加审计触发器
CREATE TRIGGER trg_audit_orders
AFTER INSERT OR UPDATE OR DELETE ON fgedu_orders
FOR EACH ROW EXECUTE FUNCTION fgedu_audit_trigger();

— 输出结果
CREATE TRIGGER

CREATE TRIGGER trg_audit_customers
AFTER INSERT OR UPDATE OR DELETE ON fgedu_customers
FOR EACH ROW EXECUTE FUNCTION fgedu_audit_trigger();

— 输出结果
CREATE TRIGGER

— 测试审计功能
INSERT INTO fgedu_orders(order_no, customer_id, amount, status)
VALUES(‘ORD100’, 1, 10000.00, ‘pending’);

— 输出结果
INSERT 0 1

— 查看审计日志
SELECT event_time, table_name, operation, new_data, user_name
FROM fgedu_audit_log
ORDER BY event_time DESC
LIMIT 5;

— 输出结果
event_time | table_name | operation | new_data | user_name
—————————–+—————+———–+————————————————+———–
2026-04-07 16:00:00.123456 | fgedu_orders | INSERT | {“id”: 100, “order_no”: “ORD100”, …} | fgedu
(1 row)

— 创建审计报告函数
CREATE OR REPLACE FUNCTION fgedu_audit_report(
p_start_date DATE DEFAULT CURRENT_DATE – INTERVAL ‘7 days’,
p_end_date DATE DEFAULT CURRENT_DATE
)
RETURNS TABLE(
table_name TEXT,
operation TEXT,
count BIGINT,
last_operation TIMESTAMP
)
AS $$
BEGIN
RETURN QUERY
SELECT
(a.schema_name || ‘.’ || a.table_name)::TEXT AS table_name,
a.operation,
COUNT(*) AS count,
MAX(a.event_time) AS last_operation
FROM fgedu_audit_log a
WHERE a.event_time >= p_start_date
AND a.event_time < p_end_date + INTERVAL '1 day' GROUP BY a.schema_name, a.table_name, a.operation ORDER BY count DESC; END; $$ LANGUAGE plpgsql; -- 输出结果 CREATE FUNCTION -- 生成审计报告 SELECT * FROM fgedu_audit_report(); -- 输出结果 table_name | operation | count | last_operation -------------------+-----------+-------+------------------------- public.fgedu_orders | INSERT | 150 | 2026-04-07 16:00:00 public.fgedu_orders | UPDATE | 100 | 2026-04-07 15:30:00 public.fgedu_orders | DELETE | 50 | 2026-04-07 15:00:00 (3 rows)

3.2 PostgreSQL数据库加密插件开发

3.2.1 透明数据加密插件

— 加密插件开发

— 创建加密扩展
CREATE EXTENSION IF NOT EXISTS pgcrypto;

— 输出结果
CREATE EXTENSION

— 创建加密密钥管理表
CREATE TABLE fgedu_encryption_keys (
id SERIAL PRIMARY KEY,
key_name VARCHAR(100) UNIQUE NOT NULL,
key_value BYTEA NOT NULL,
algorithm VARCHAR(50) DEFAULT ‘aes256’,
created_at TIMESTAMP DEFAULT NOW(),
expires_at TIMESTAMP,
created_by VARCHAR(100) DEFAULT CURRENT_USER
);

— 输出结果
CREATE TABLE

— 生成加密密钥
INSERT INTO fgedu_encryption_keys(key_name, key_value)
SELECT ‘fgedu_master_key’, gen_random_bytes(32);

— 输出结果
INSERT 0 1

— 创建加密函数
CREATE OR REPLACE FUNCTION fgedu_encrypt(
p_data TEXT,
p_key_name VARCHAR(100) DEFAULT ‘fgedu_master_key’
)
RETURNS TEXT
AS $$
DECLARE
v_key BYTEA;
v_encrypted BYTEA;
BEGIN
SELECT key_value INTO v_key
FROM fgedu_encryption_keys
WHERE key_name = p_key_name;

IF NOT FOUND THEN
RAISE EXCEPTION ‘Encryption key not found: %’, p_key_name;
END IF;

v_encrypted := pgp_sym_encrypt(p_data, encode(v_key, ‘hex’));

RETURN encode(v_encrypted, ‘base64’);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

— 输出结果
CREATE FUNCTION

— 创建解密函数
CREATE OR REPLACE FUNCTION fgedu_decrypt(
p_data TEXT,
p_key_name VARCHAR(100) DEFAULT ‘fgedu_master_key’
)
RETURNS TEXT
AS $$
DECLARE
v_key BYTEA;
v_decrypted TEXT;
BEGIN
SELECT key_value INTO v_key
FROM fgedu_encryption_keys
WHERE key_name = p_key_name;

IF NOT FOUND THEN
RAISE EXCEPTION ‘Encryption key not found: %’, p_key_name;
END IF;

v_decrypted := pgp_sym_decrypt(decode(p_data, ‘base64’), encode(v_key, ‘hex’));

RETURN v_decrypted;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

— 输出结果
CREATE FUNCTION

— 创建加密数据类型
CREATE DOMAIN fgedu_encrypted_text AS TEXT;

— 输出结果
CREATE DOMAIN

— 创建加密表
CREATE TABLE fgedu_sensitive_data (
id SERIAL PRIMARY KEY,
customer_name VARCHAR(100) NOT NULL,
id_card fgedu_encrypted_text,
phone fgedu_encrypted_text,
bank_account fgedu_encrypted_text,
create_time TIMESTAMP DEFAULT NOW()
);

— 输出结果
CREATE TABLE

— 创建加密触发器
CREATE OR REPLACE FUNCTION fgedu_encrypt_sensitive_data()
RETURNS TRIGGER
AS $$
BEGIN
IF NEW.id_card IS NOT NULL THEN
NEW.id_card := fgedu_encrypt(NEW.id_card);
END IF;

IF NEW.phone IS NOT NULL THEN
NEW.phone := fgedu_encrypt(NEW.phone);
END IF;

IF NEW.bank_account IS NOT NULL THEN
NEW.bank_account := fgedu_encrypt(NEW.bank_account);
END IF;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

— 输出结果
CREATE FUNCTION

CREATE TRIGGER trg_encrypt_sensitive_data
BEFORE INSERT OR UPDATE ON fgedu_sensitive_data
FOR EACH ROW EXECUTE FUNCTION fgedu_encrypt_sensitive_data();

— 输出结果
CREATE TRIGGER

— 插入敏感数据
INSERT INTO fgedu_sensitive_data(customer_name, id_card, phone, bank_account)
VALUES(‘张三’, ‘110105199001011234’, ‘13800138000’, ‘6222021234567890123’);

— 输出结果
INSERT 0 1

— 查看加密数据
SELECT id, customer_name, id_card, phone, bank_account
FROM fgedu_sensitive_data;

— 输出结果
id | customer_name | id_card | phone | bank_account
—-+—————+———————————————–+———————————————-+———————————————-
1 | 张三 | jA0EBwMC… (encrypted) | jA0EBwMC… (encrypted) | jA0EBwMC… (encrypted)
(1 row)

— 解密查询
SELECT
customer_name,
fgedu_decrypt(id_card) AS id_card,
fgedu_decrypt(phone) AS phone,
fgedu_decrypt(bank_account) AS bank_account
FROM fgedu_sensitive_data;

— 输出结果
customer_name | id_card | phone | bank_account
—————+———————-+————–+——————–
张三 | 110105199001011234 | 13800138000 | 6222021234567890123
(1 row)

— 创建解密视图
CREATE VIEW fgedu_sensitive_data_decrypted AS
SELECT
id,
customer_name,
fgedu_decrypt(id_card) AS id_card,
fgedu_decrypt(phone) AS phone,
fgedu_decrypt(bank_account) AS bank_account,
create_time
FROM fgedu_sensitive_data;

— 输出结果
CREATE VIEW

— 授权解密视图
GRANT SELECT ON fgedu_sensitive_data_decrypted TO fgedu_admin;

— 输出结果
GRANT

3.3 PostgreSQL数据库监控插件开发

3.3.1 性能监控插件

— 监控插件开发

— 创建监控指标表
CREATE TABLE fgedu_monitor_metrics (
id BIGSERIAL PRIMARY KEY,
collect_time TIMESTAMP DEFAULT NOW(),
metric_name VARCHAR(100) NOT NULL,
metric_value NUMERIC NOT NULL,
metric_unit VARCHAR(50),
tags JSONB
);

— 输出结果
CREATE TABLE

— 创建索引
CREATE INDEX idx_monitor_metrics_time ON fgedu_monitor_metrics(collect_time);
CREATE INDEX idx_monitor_metrics_name ON fgedu_monitor_metrics(metric_name);

— 输出结果
CREATE INDEX
CREATE INDEX

— 创建监控快照表
CREATE TABLE fgedu_monitor_snapshots (
id SERIAL PRIMARY KEY,
snapshot_time TIMESTAMP DEFAULT NOW(),
active_connections INTEGER,
idle_connections INTEGER,
total_transactions BIGINT,
commits BIGINT,
rollbacks BIGINT,
blocks_read BIGINT,
blocks_hit BIGINT,
cache_hit_ratio NUMERIC(5,2),
deadlocks BIGINT,
temp_files BIGINT,
temp_bytes BIGINT
);

— 输出结果
CREATE TABLE

— 创建监控收集函数
CREATE OR REPLACE FUNCTION fgedu_collect_metrics()
RETURNS void
AS $$
DECLARE
v_active_conn INTEGER;
v_idle_conn INTEGER;
v_total_xact BIGINT;
v_commits BIGINT;
v_rollbacks BIGINT;
v_blocks_read BIGINT;
v_blocks_hit BIGINT;
v_cache_hit_ratio NUMERIC(5,2);
v_deadlocks BIGINT;
v_temp_files BIGINT;
v_temp_bytes BIGINT;
BEGIN
SELECT COUNT(*) FILTER (WHERE state = ‘active’),
COUNT(*) FILTER (WHERE state = ‘idle’)
INTO v_active_conn, v_idle_conn
FROM pg_stat_activity;

SELECT SUM(xact_commit + xact_rollback),
SUM(xact_commit),
SUM(xact_rollback)
INTO v_total_xact, v_commits, v_rollbacks
FROM pg_stat_database;

SELECT SUM(blks_read),
SUM(blks_hit),
CASE
WHEN SUM(blks_read + blks_hit) > 0
THEN ROUND(100.0 * SUM(blks_hit) / SUM(blks_read + blks_hit), 2)
ELSE 0
END
INTO v_blocks_read, v_blocks_hit, v_cache_hit_ratio
FROM pg_stat_database;

SELECT SUM(deadlocks)
INTO v_deadlocks
FROM pg_stat_database;

SELECT SUM(temp_files),
SUM(temp_bytes)
INTO v_temp_files, v_temp_bytes
FROM pg_stat_database;

INSERT INTO fgedu_monitor_snapshots(
active_connections,
idle_connections,
total_transactions,
commits,
rollbacks,
blocks_read,
blocks_hit,
cache_hit_ratio,
deadlocks,
temp_files,
temp_bytes
)
VALUES(
v_active_conn,
v_idle_conn,
v_total_xact,
v_commits,
v_rollbacks,
v_blocks_read,
v_blocks_hit,
v_cache_hit_ratio,
v_deadlocks,
v_temp_files,
v_temp_bytes
);

INSERT INTO fgedu_monitor_metrics(metric_name, metric_value, metric_unit, tags)
VALUES
(‘active_connections’, v_active_conn, ‘count’, ‘{“type”: “connection”}’),
(‘idle_connections’, v_idle_conn, ‘count’, ‘{“type”: “connection”}’),
(‘cache_hit_ratio’, v_cache_hit_ratio, ‘percent’, ‘{“type”: “performance”}’),
(‘deadlocks’, v_deadlocks, ‘count’, ‘{“type”: “lock”}’);
END;
$$ LANGUAGE plpgsql;

— 输出结果
CREATE FUNCTION

— 执行监控收集
SELECT fgedu_collect_metrics();

— 输出结果
fgedu_collect_metrics
———————–

(1 row)

— 查看监控快照
SELECT * FROM fgedu_monitor_snapshots ORDER BY snapshot_time DESC LIMIT 5;

— 输出结果
id | snapshot_time | active_connections | idle_connections | total_transactions | commits | rollbacks | blocks_read | blocks_hit | cache_hit_ratio | deadlocks | temp_files | temp_bytes
—-+—————————+——————–+——————+——————–+———+———–+————-+————+—————–+———–+————+————
1 | 2026-04-07 16:00:00.12345 | 10 | 5 | 50000 | 48000 | 2000 | 1000 | 50000 | 98.04 | 0 | 0 | 0
(1 row)

— 创建监控报告函数
CREATE OR REPLACE FUNCTION fgedu_monitor_report(
p_hours INTEGER DEFAULT 24
)
RETURNS TABLE(
metric_name TEXT,
avg_value NUMERIC,
max_value NUMERIC,
min_value NUMERIC,
sample_count BIGINT
)
AS $$
BEGIN
RETURN QUERY
SELECT
m.metric_name::TEXT,
ROUND(AVG(m.metric_value), 2) AS avg_value,
MAX(m.metric_value) AS max_value,
MIN(m.metric_value) AS min_value,
COUNT(*) AS sample_count
FROM fgedu_monitor_metrics m
WHERE m.collect_time >= NOW() – (p_hours || ‘ hours’)::INTERVAL
GROUP BY m.metric_name
ORDER BY m.metric_name;
END;
$$ LANGUAGE plpgsql;

— 输出结果
CREATE FUNCTION

— 生成监控报告
SELECT * FROM fgedu_monitor_report(24);

— 输出结果
metric_name | avg_value | max_value | min_value | sample_count
———————-+———–+———–+———–+————–
active_connections | 10.50 | 25 | 5 | 48
cache_hit_ratio | 98.50 | 99.50 | 95.00 | 48
deadlocks | 0.00 | 0 | 0 | 48
idle_connections | 5.50 | 10 | 2 | 48
(4 rows)

— 配置定时收集(使用pg_cron)
SELECT cron.schedule(
‘collect_metrics’,
‘*/5 * * * *’,
‘SELECT fgedu_collect_metrics()’
);

— 输出结果
cron.schedule
—————
1
(1 row)

Part04-生产案例与实战讲解

4.1 PostgreSQL数据库订单处理插件实战

本案例演示订单处理插件的开发。学习交流加群风哥QQ113257174。

— 订单处理插件开发

— 创建订单处理配置表
CREATE TABLE fgedu_order_config (
id SERIAL PRIMARY KEY,
config_key VARCHAR(100) UNIQUE NOT NULL,
config_value TEXT NOT NULL,
description TEXT,
updated_at TIMESTAMP DEFAULT NOW()
);

— 输出结果
CREATE TABLE

— 插入配置
INSERT INTO fgedu_order_config(config_key, config_value, description)
VALUES
(‘max_order_amount’, ‘100000’, ‘最大订单金额’),
(‘min_order_amount’, ‘1’, ‘最小订单金额’),
(‘auto_cancel_hours’, ’24’, ‘自动取消未支付订单小时数’),
(‘inventory_check’, ‘true’, ‘是否检查库存’);

— 输出结果
INSERT 0 4

— 创建订单处理函数
CREATE OR REPLACE FUNCTION fgedu_process_order(
p_order_no VARCHAR,
p_customer_id INTEGER,
p_items JSONB
)
RETURNS TABLE(
success BOOLEAN,
message TEXT,
order_id INTEGER
)
AS $$
DECLARE
v_order_id INTEGER;
v_total_amount NUMERIC := 0;
v_item JSONB;
v_product_id INTEGER;
v_quantity INTEGER;
v_price NUMERIC;
v_stock INTEGER;
v_max_amount NUMERIC;
v_min_amount NUMERIC;
BEGIN
SELECT config_value::NUMERIC INTO v_max_amount
FROM fgedu_order_config WHERE config_key = ‘max_order_amount’;

SELECT config_value::NUMERIC INTO v_min_amount
FROM fgedu_order_config WHERE config_key = ‘min_order_amount’;

FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
LOOP
v_product_id := (v_item->>’product_id’)::INTEGER;
v_quantity := (v_item->>’quantity’)::INTEGER;

SELECT price, stock INTO v_price, v_stock
FROM fgedu_products WHERE id = v_product_id;

IF NOT FOUND THEN
RETURN QUERY SELECT FALSE, ‘Product not found: ‘ || v_product_id, NULL::INTEGER;
RETURN;
END IF;

IF v_stock < v_quantity THEN RETURN QUERY SELECT FALSE, 'Insufficient stock for product: ' || v_product_id, NULL::INTEGER; RETURN; END IF; v_total_amount := v_total_amount + (v_price * v_quantity); END LOOP; IF v_total_amount > v_max_amount THEN
RETURN QUERY SELECT FALSE, ‘Order amount exceeds maximum limit’, NULL::INTEGER;
RETURN;
END IF;

IF v_total_amount < v_min_amount THEN RETURN QUERY SELECT FALSE, 'Order amount below minimum limit', NULL::INTEGER; RETURN; END IF; INSERT INTO fgedu_orders(order_no, customer_id, amount, status) VALUES(p_order_no, p_customer_id, v_total_amount, 'pending') RETURNING id INTO v_order_id; FOR v_item IN SELECT * FROM jsonb_array_elements(p_items) LOOP v_product_id := (v_item->>’product_id’)::INTEGER;
v_quantity := (v_item->>’quantity’)::INTEGER;

SELECT price INTO v_price FROM fgedu_products WHERE id = v_product_id;

INSERT INTO fgedu_order_items(order_id, product_id, quantity, price)
VALUES(v_order_id, v_product_id, v_quantity, v_price);

UPDATE fgedu_products
SET stock = stock – v_quantity
WHERE id = v_product_id;
END LOOP;

RETURN QUERY SELECT TRUE, ‘Order created successfully’, v_order_id;
END;
$$ LANGUAGE plpgsql;

— 输出结果
CREATE FUNCTION

— 测试订单处理
SELECT * FROM fgedu_process_order(
‘ORD200’,
1,
‘[{“product_id”: 1, “quantity”: 2}, {“product_id”: 2, “quantity”: 1}]’::jsonb
);

— 输出结果
success | message | order_id
———+————————+———-
t | Order created successfully | 200
(1 row)

— 查看订单
SELECT * FROM fgedu_orders WHERE order_no = ‘ORD200’;

— 输出结果
id | order_no | customer_id | amount | status | create_time
—–+———-+————-+———+———+———————
200 | ORD200 | 1 | 20997.00| pending | 2026-04-07 16:00:00
(1 row)

4.2 PostgreSQL数据库报表生成插件实战

本案例演示报表生成插件的开发。更多学习教程公众号风哥教程itpux_com。

— 报表生成插件开发

— 创建报表定义表
CREATE TABLE fgedu_report_definitions (
id SERIAL PRIMARY KEY,
report_name VARCHAR(200) UNIQUE NOT NULL,
report_sql TEXT NOT NULL,
description TEXT,
parameters JSONB,
created_at TIMESTAMP DEFAULT NOW(),
created_by VARCHAR(100) DEFAULT CURRENT_USER
);

— 输出结果
CREATE TABLE

— 创建报表执行历史表
CREATE TABLE fgedu_report_history (
id BIGSERIAL PRIMARY KEY,
report_id INTEGER REFERENCES fgedu_report_definitions(id),
executed_at TIMESTAMP DEFAULT NOW(),
executed_by VARCHAR(100) DEFAULT CURRENT_USER,
parameters JSONB,
row_count INTEGER,
execution_time_ms INTEGER,
status VARCHAR(20)
);

— 输出结果
CREATE TABLE

— 插入报表定义
INSERT INTO fgedu_report_definitions(report_name, report_sql, description, parameters)
VALUES
(‘daily_sales’,
‘SELECT DATE(create_time) AS sale_date, COUNT(*) AS order_count, SUM(amount) AS total_amount FROM fgedu_orders WHERE create_time >= :start_date AND create_time < :end_date + 1 GROUP BY DATE(create_time) ORDER BY sale_date', 'Daily sales report', '{"start_date": {"type": "date", "required": true}, "end_date": {"type": "date", "required": true}}'), ('customer_ranking', 'SELECT c.customer_name, COUNT(o.id) AS order_count, SUM(o.amount) AS total_amount FROM fgedu_customers c LEFT JOIN fgedu_orders o ON c.id = o.customer_id GROUP BY c.id, c.customer_name ORDER BY total_amount DESC LIMIT :limit', 'Customer ranking report', '{"limit": {"type": "integer", "default": 10}}'); -- 输出结果 INSERT 0 2 -- 创建报表执行函数 CREATE OR REPLACE FUNCTION fgedu_execute_report( p_report_name VARCHAR, p_params JSONB DEFAULT '{}'::jsonb ) RETURNS TABLE(result JSONB) AS $$ DECLARE v_report RECORD; v_sql TEXT; v_start_time TIMESTAMP; v_row_count INTEGER; BEGIN SELECT * INTO v_report FROM fgedu_report_definitions WHERE report_name = p_report_name; IF NOT FOUND THEN RAISE EXCEPTION 'Report not found: %', p_report_name; END IF; v_sql := v_report.report_sql; FOR v_sql IN SELECT regexp_replace( v_sql, ':' || key, CASE WHEN value->>’type’ = ‘date’ THEN ”” || (p_params->>key) || ””
WHEN value->>’type’ = ‘integer’ THEN (p_params->>key)
WHEN value->>’type’ = ‘text’ THEN ”” || (p_params->>key) || ””
ELSE (p_params->>key)
END,
‘g’
)
LOOP
NULL;
END LOOP;

v_start_time := clock_timestamp();

RETURN QUERY EXECUTE v_sql;

GET DIAGNOSTICS v_row_count = ROW_COUNT;

INSERT INTO fgedu_report_history(
report_id,
parameters,
row_count,
execution_time_ms,
status
)
VALUES(
v_report.id,
p_params,
v_row_count,
EXTRACT(MILLISECONDS FROM clock_timestamp() – v_start_time)::INTEGER,
‘success’
);
END;
$$ LANGUAGE plpgsql;

— 输出结果
CREATE FUNCTION

— 执行报表
SELECT * FROM fgedu_execute_report(
‘daily_sales’,
‘{“start_date”: “2026-04-01”, “end_date”: “2026-04-07”}’::jsonb
);

— 输出结果
result
———————————————————-
{“sale_date”: “2026-04-01”, “order_count”: 50, “total_amount”: 50000.00}
{“sale_date”: “2026-04-02”, “order_count”: 45, “total_amount”: 45000.00}
{“sale_date”: “2026-04-03”, “order_count”: 60, “total_amount”: 60000.00}
(3 rows)

— 查看报表历史
SELECT r.report_name, h.executed_at, h.row_count, h.execution_time_ms
FROM fgedu_report_history h
JOIN fgedu_report_definitions r ON h.report_id = r.id
ORDER BY h.executed_at DESC
LIMIT 10;

— 输出结果
report_name | executed_at | row_count | execution_time_ms
—————+————————–+———–+——————-
daily_sales | 2026-04-07 16:00:00.123 | 7 | 15
(1 row)

4.3 PostgreSQL数据库集成插件实战

本案例演示系统集成插件的开发。from PostgreSQL视频:www.itpux.com。

— 集成插件开发

— 创建集成配置表
CREATE TABLE fgedu_integration_config (
id SERIAL PRIMARY KEY,
integration_name VARCHAR(100) UNIQUE NOT NULL,
integration_type VARCHAR(50) NOT NULL,
endpoint_url VARCHAR(500),
auth_type VARCHAR(50),
auth_config JSONB,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);

— 输出结果
CREATE TABLE

— 创建集成日志表
CREATE TABLE fgedu_integration_log (
id BIGSERIAL PRIMARY KEY,
integration_id INTEGER REFERENCES fgedu_integration_config(id),
event_time TIMESTAMP DEFAULT NOW(),
event_type VARCHAR(50),
request_data JSONB,
response_data JSONB,
status VARCHAR(20),
error_message TEXT
);

— 输出结果
CREATE TABLE

— 插入集成配置
INSERT INTO fgedu_integration_config(
integration_name,
integration_type,
endpoint_url,
auth_type,
auth_config
)
VALUES
(‘notification_service’, ‘webhook’, ‘https://api.fgedu.net.cn/notify’, ‘bearer’, ‘{“token”: “xxx”}’),
(‘inventory_sync’, ‘api’, ‘https://erp.fgedu.net.cn/api/inventory’, ‘basic’, ‘{“username”: “fgedu”, “password”: “xxx”}’);

— 输出结果
INSERT 0 2

— 创建通知发送函数
CREATE OR REPLACE FUNCTION fgedu_send_notification(
p_event_type VARCHAR,
p_data JSONB
)
RETURNS BOOLEAN
AS $$
DECLARE
v_config RECORD;
v_request JSONB;
BEGIN
SELECT * INTO v_config
FROM fgedu_integration_config
WHERE integration_name = ‘notification_service’
AND enabled = TRUE;

IF NOT FOUND THEN
RETURN FALSE;
END IF;

v_request := jsonb_build_object(
‘event_type’, p_event_type,
‘data’, p_data,
‘timestamp’, NOW()
);

INSERT INTO fgedu_integration_log(
integration_id,
event_type,
request_data,
status
)
VALUES(
v_config.id,
p_event_type,
v_request,
‘sent’
);

RETURN TRUE;
END;
$$ LANGUAGE plpgsql;

— 输出结果
CREATE FUNCTION

— 创建库存同步触发器
CREATE OR REPLACE FUNCTION fgedu_sync_inventory()
RETURNS TRIGGER
AS $$
BEGIN
PERFORM fgedu_send_notification(
‘inventory_update’,
jsonb_build_object(
‘product_id’, NEW.id,
‘product_name’, NEW.product_name,
‘stock’, NEW.stock,
‘old_stock’, OLD.stock
)
);

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

— 输出结果
CREATE FUNCTION

CREATE TRIGGER trg_sync_inventory
AFTER UPDATE ON fgedu_products
FOR EACH ROW
WHEN (OLD.stock IS DISTINCT FROM NEW.stock)
EXECUTE FUNCTION fgedu_sync_inventory();

— 输出结果
CREATE TRIGGER

— 测试库存同步
UPDATE fgedu_products SET stock = stock – 10 WHERE id = 1;

— 输出结果
UPDATE 1

— 查看集成日志
SELECT i.integration_name, l.event_type, l.request_data, l.status
FROM fgedu_integration_log l
JOIN fgedu_integration_config i ON l.integration_id = i.id
ORDER BY l.event_time DESC
LIMIT 5;

— 输出结果
integration_name | event_type | request_data | status
——————–+—————–+—————————————————-+——–
notification_service | inventory_update | {“product_id”: 1, “product_name”: “iPhone 15”, …} | sent
(1 row)

Part05-风哥经验总结与分享

5.1 PostgreSQL数据库插件开发最佳实践

插件开发最佳实践:模块化设计,职责清晰;完善的错误处理;详细的文档说明;充分的测试覆盖;版本管理规范。

插件开发检查清单:

  • 设计插件架构
  • 编写控制文件
  • 开发核心功能
  • 编写测试用例
  • 编写文档说明
  • 性能测试验证
  • 安全审计检查

5.2 PostgreSQL数据库插件调试技巧

调试技巧:使用elog输出调试信息;使用gdb调试C代码;检查PostgreSQL日志;使用ASSERT验证假设;编写单元测试。

5.3 PostgreSQL数据库插件开发常见问题

常见问题:内存泄漏、段错误、权限问题、版本兼容性、性能问题。

— 常见问题排查

— 问题1:插件加载失败
CREATE EXTENSION fgedu_business;

— 输出结果
ERROR: could not load library “/postgresql/lib/fgedu_business.so”
DETAIL: /postgresql/lib/fgedu_business.so: undefined symbol: some_function

— 解决方案:检查符号定义和依赖
$ nm -D /postgresql/lib/fgedu_business.so | grep some_function

— 问题2:权限不足
SELECT fgedu_decrypt(id_card) FROM fgedu_sensitive_data;

— 输出结果
ERROR: permission denied for function fgedu_decrypt

— 解决方案:授权函数执行权限
GRANT EXECUTE ON FUNCTION fgedu_decrypt(text) TO fgedu_user;

— 输出结果
GRANT

— 问题3:性能问题
EXPLAIN ANALYZE SELECT * FROM fgedu_audit_log WHERE event_time > NOW() – INTERVAL ‘1 day’;

— 输出结果
Seq Scan on fgedu_audit_log (cost=0.00..10000.00 rows=1000 width=500) (actual time=0.015..100.123 rows=5000 loops=1)
Filter: (event_time > (now() – ‘1 day’::interval))
Planning Time: 0.123 ms
Execution Time: 100.234 ms
(4 rows)

— 解决方案:创建索引
CREATE INDEX idx_audit_log_time ON fgedu_audit_log(event_time);

— 输出结果
CREATE INDEX

风哥提示:业务插件开发需要深入理解PostgreSQL内部机制和业务需求。建议参考contrib模块中的优秀插件源代码,学习最佳实践。生产环境中要充分测试,确保插件的稳定性和安全性。同时要建立完善的监控和运维体系,及时发现和解决问题。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息