PostgreSQL教程FG284-PG扩展开发实战:自定义业务插件落地
本文档风哥主要介绍PostgreSQL数据库自定义业务插件的开发实战,包括审计插件、加密插件、监控插件等实际业务插件的开发过程,风哥教程参考PostgreSQL官方文档Extension Building、Writing Procedural Languages等内容,适合高级开发人员在生产环境中开发业务定制插件。
Part01-基础概念与理论知识
1.1 PostgreSQL数据库业务插件概述
业务插件是为满足特定业务需求而开发的PostgreSQL扩展。更多视频教程www.fgedu.net.cn。通过自定义业务插件,可以在数据库层面实现业务逻辑封装、数据审计、安全控制、性能监控等功能,提高系统的可维护性和安全性。
- 数据审计:记录数据变更历史
- 数据加密:敏感数据加密存储
- 性能监控:收集性能指标
- 业务逻辑:封装复杂业务规则
- 数据校验:实现自定义校验规则
- 系统集成:对接外部系统
1.2 PostgreSQL数据库插件架构设计
插件架构包括:扩展控制文件(.control)、SQL脚本文件(.sql)、C源代码文件(.c)、Makefile构建文件。学习交流加群风哥微信: itpux-com。良好的架构设计是插件成功的基础,需要考虑模块化、可配置性、可扩展性等因素。
1.3 PostgreSQL数据库插件核心组件
核心组件包括:初始化函数(_PG_init)、钩子函数(Hook)、回调函数(Callback)、配置参数(GUC)、共享内存(Shared Memory)、后台工作进程(Background Worker)。
Part02-生产环境规划与建议
2.1 PostgreSQL数据库插件设计原则
插件设计原则:单一职责,每个插件只做一件事;可配置性,支持参数化配置;可扩展性,易于添加新功能;性能优先,避免影响系统性能;安全可靠,完善的错误处理。
2.2 PostgreSQL数据库插件安全考虑
安全考虑:输入验证,防止注入攻击;权限控制,最小权限原则;数据保护,敏感数据加密;审计日志,记录关键操作;错误处理,避免信息泄露。
2.3 PostgreSQL数据库插件性能优化
性能优化:减少内存分配;使用缓存机制;批量处理数据;异步处理非关键操作;避免锁竞争。
Part03-生产环境项目实施方案
3.1 PostgreSQL数据库审计插件开发
3.1.1 审计插件架构设计
— 创建审计日志表
CREATE TABLE fgedu_audit_log (
id BIGSERIAL PRIMARY KEY,
event_time TIMESTAMP DEFAULT NOW(),
event_type VARCHAR(50) NOT NULL,
schema_name VARCHAR(100),
table_name VARCHAR(100),
operation VARCHAR(20) NOT NULL,
old_data JSONB,
new_data JSONB,
user_name VARCHAR(100) DEFAULT CURRENT_USER,
client_addr INET DEFAULT inet_client_addr(),
fgapplication_name VARCHAR(100) DEFAULT current_setting(‘fgapplication_name’),
query_text TEXT
);
— 输出结果
CREATE TABLE
— 创建索引
CREATE INDEX idx_audit_log_time ON fgedu_audit_log(event_time);
CREATE INDEX idx_audit_log_table ON fgedu_audit_log(schema_name, table_name);
CREATE INDEX idx_audit_log_user ON fgedu_audit_log(user_name);
— 输出结果
CREATE INDEX
CREATE INDEX
CREATE INDEX
— 创建审计配置表
CREATE TABLE fgedu_audit_config (
id SERIAL PRIMARY KEY,
schema_name VARCHAR(100) NOT NULL,
table_name VARCHAR(100) NOT NULL,
operations TEXT[] NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(schema_name, table_name)
);
— 输出结果
CREATE TABLE
— 插入审计配置
INSERT INTO fgedu_audit_config(schema_name, table_name, operations)
VALUES
(‘public’, ‘fgedu_orders’, ARRAY[‘INSERT’, ‘UPDATE’, ‘DELETE’]),
(‘public’, ‘fgedu_customers’, ARRAY[‘INSERT’, ‘UPDATE’, ‘DELETE’]),
(‘public’, ‘fgedu_products’, ARRAY[‘UPDATE’, ‘DELETE’]);
— 输出结果
INSERT 0 3
— 创建审计触发器函数
CREATE OR REPLACE FUNCTION fgedu_audit_trigger()
RETURNS TRIGGER
AS $$
DECLARE
config RECORD;
should_audit BOOLEAN := FALSE;
BEGIN
SELECT * INTO config
FROM fgedu_audit_config
WHERE schema_name = TG_TABLE_SCHEMA
AND table_name = TG_TABLE_NAME
AND enabled = TRUE;
IF NOT FOUND THEN
RETURN NEW;
END IF;
IF TG_OP = ‘INSERT’ AND ‘INSERT’ = ANY(config.operations) THEN
should_audit := TRUE;
ELSIF TG_OP = ‘UPDATE’ AND ‘UPDATE’ = ANY(config.operations) THEN
should_audit := TRUE;
ELSIF TG_OP = ‘DELETE’ AND ‘DELETE’ = ANY(config.operations) THEN
should_audit := TRUE;
END IF;
IF should_audit THEN
INSERT INTO fgedu_audit_log(
event_type,
schema_name,
table_name,
operation,
old_data,
new_data,
query_text
)
VALUES(
‘DML’,
TG_TABLE_SCHEMA,
TG_TABLE_NAME,
TG_OP,
CASE WHEN TG_OP IN (‘UPDATE’, ‘DELETE’) THEN to_jsonb(OLD) END,
CASE WHEN TG_OP IN (‘INSERT’, ‘UPDATE’) THEN to_jsonb(NEW) END,
current_query()
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
— 输出结果
CREATE FUNCTION
— 为表添加审计触发器
CREATE TRIGGER trg_audit_orders
AFTER INSERT OR UPDATE OR DELETE ON fgedu_orders
FOR EACH ROW EXECUTE FUNCTION fgedu_audit_trigger();
— 输出结果
CREATE TRIGGER
CREATE TRIGGER trg_audit_customers
AFTER INSERT OR UPDATE OR DELETE ON fgedu_customers
FOR EACH ROW EXECUTE FUNCTION fgedu_audit_trigger();
— 输出结果
CREATE TRIGGER
— 测试审计功能
INSERT INTO fgedu_orders(order_no, customer_id, amount, status)
VALUES(‘ORD100’, 1, 10000.00, ‘pending’);
— 输出结果
INSERT 0 1
— 查看审计日志
SELECT event_time, table_name, operation, new_data, user_name
FROM fgedu_audit_log
ORDER BY event_time DESC
LIMIT 5;
— 输出结果
event_time | table_name | operation | new_data | user_name
—————————–+—————+———–+————————————————+———–
2026-04-07 16:00:00.123456 | fgedu_orders | INSERT | {“id”: 100, “order_no”: “ORD100”, …} | fgedu
(1 row)
— 创建审计报告函数
CREATE OR REPLACE FUNCTION fgedu_audit_report(
p_start_date DATE DEFAULT CURRENT_DATE – INTERVAL ‘7 days’,
p_end_date DATE DEFAULT CURRENT_DATE
)
RETURNS TABLE(
table_name TEXT,
operation TEXT,
count BIGINT,
last_operation TIMESTAMP
)
AS $$
BEGIN
RETURN QUERY
SELECT
(a.schema_name || ‘.’ || a.table_name)::TEXT AS table_name,
a.operation,
COUNT(*) AS count,
MAX(a.event_time) AS last_operation
FROM fgedu_audit_log a
WHERE a.event_time >= p_start_date
AND a.event_time < p_end_date + INTERVAL '1 day'
GROUP BY a.schema_name, a.table_name, a.operation
ORDER BY count DESC;
END;
$$ LANGUAGE plpgsql;
-- 输出结果
CREATE FUNCTION
-- 生成审计报告
SELECT * FROM fgedu_audit_report();
-- 输出结果
table_name | operation | count | last_operation
-------------------+-----------+-------+-------------------------
public.fgedu_orders | INSERT | 150 | 2026-04-07 16:00:00
public.fgedu_orders | UPDATE | 100 | 2026-04-07 15:30:00
public.fgedu_orders | DELETE | 50 | 2026-04-07 15:00:00
(3 rows)
3.2 PostgreSQL数据库加密插件开发
3.2.1 透明数据加密插件
— 创建加密扩展
CREATE EXTENSION IF NOT EXISTS pgcrypto;
— 输出结果
CREATE EXTENSION
— 创建加密密钥管理表
CREATE TABLE fgedu_encryption_keys (
id SERIAL PRIMARY KEY,
key_name VARCHAR(100) UNIQUE NOT NULL,
key_value BYTEA NOT NULL,
algorithm VARCHAR(50) DEFAULT ‘aes256’,
created_at TIMESTAMP DEFAULT NOW(),
expires_at TIMESTAMP,
created_by VARCHAR(100) DEFAULT CURRENT_USER
);
— 输出结果
CREATE TABLE
— 生成加密密钥
INSERT INTO fgedu_encryption_keys(key_name, key_value)
SELECT ‘fgedu_master_key’, gen_random_bytes(32);
— 输出结果
INSERT 0 1
— 创建加密函数
CREATE OR REPLACE FUNCTION fgedu_encrypt(
p_data TEXT,
p_key_name VARCHAR(100) DEFAULT ‘fgedu_master_key’
)
RETURNS TEXT
AS $$
DECLARE
v_key BYTEA;
v_encrypted BYTEA;
BEGIN
SELECT key_value INTO v_key
FROM fgedu_encryption_keys
WHERE key_name = p_key_name;
IF NOT FOUND THEN
RAISE EXCEPTION ‘Encryption key not found: %’, p_key_name;
END IF;
v_encrypted := pgp_sym_encrypt(p_data, encode(v_key, ‘hex’));
RETURN encode(v_encrypted, ‘base64’);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
— 输出结果
CREATE FUNCTION
— 创建解密函数
CREATE OR REPLACE FUNCTION fgedu_decrypt(
p_data TEXT,
p_key_name VARCHAR(100) DEFAULT ‘fgedu_master_key’
)
RETURNS TEXT
AS $$
DECLARE
v_key BYTEA;
v_decrypted TEXT;
BEGIN
SELECT key_value INTO v_key
FROM fgedu_encryption_keys
WHERE key_name = p_key_name;
IF NOT FOUND THEN
RAISE EXCEPTION ‘Encryption key not found: %’, p_key_name;
END IF;
v_decrypted := pgp_sym_decrypt(decode(p_data, ‘base64’), encode(v_key, ‘hex’));
RETURN v_decrypted;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
— 输出结果
CREATE FUNCTION
— 创建加密数据类型
CREATE DOMAIN fgedu_encrypted_text AS TEXT;
— 输出结果
CREATE DOMAIN
— 创建加密表
CREATE TABLE fgedu_sensitive_data (
id SERIAL PRIMARY KEY,
customer_name VARCHAR(100) NOT NULL,
id_card fgedu_encrypted_text,
phone fgedu_encrypted_text,
bank_account fgedu_encrypted_text,
create_time TIMESTAMP DEFAULT NOW()
);
— 输出结果
CREATE TABLE
— 创建加密触发器
CREATE OR REPLACE FUNCTION fgedu_encrypt_sensitive_data()
RETURNS TRIGGER
AS $$
BEGIN
IF NEW.id_card IS NOT NULL THEN
NEW.id_card := fgedu_encrypt(NEW.id_card);
END IF;
IF NEW.phone IS NOT NULL THEN
NEW.phone := fgedu_encrypt(NEW.phone);
END IF;
IF NEW.bank_account IS NOT NULL THEN
NEW.bank_account := fgedu_encrypt(NEW.bank_account);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
— 输出结果
CREATE FUNCTION
CREATE TRIGGER trg_encrypt_sensitive_data
BEFORE INSERT OR UPDATE ON fgedu_sensitive_data
FOR EACH ROW EXECUTE FUNCTION fgedu_encrypt_sensitive_data();
— 输出结果
CREATE TRIGGER
— 插入敏感数据
INSERT INTO fgedu_sensitive_data(customer_name, id_card, phone, bank_account)
VALUES(‘张三’, ‘110105199001011234’, ‘13800138000’, ‘6222021234567890123’);
— 输出结果
INSERT 0 1
— 查看加密数据
SELECT id, customer_name, id_card, phone, bank_account
FROM fgedu_sensitive_data;
— 输出结果
id | customer_name | id_card | phone | bank_account
—-+—————+———————————————–+———————————————-+———————————————-
1 | 张三 | jA0EBwMC… (encrypted) | jA0EBwMC… (encrypted) | jA0EBwMC… (encrypted)
(1 row)
— 解密查询
SELECT
customer_name,
fgedu_decrypt(id_card) AS id_card,
fgedu_decrypt(phone) AS phone,
fgedu_decrypt(bank_account) AS bank_account
FROM fgedu_sensitive_data;
— 输出结果
customer_name | id_card | phone | bank_account
—————+———————-+————–+——————–
张三 | 110105199001011234 | 13800138000 | 6222021234567890123
(1 row)
— 创建解密视图
CREATE VIEW fgedu_sensitive_data_decrypted AS
SELECT
id,
customer_name,
fgedu_decrypt(id_card) AS id_card,
fgedu_decrypt(phone) AS phone,
fgedu_decrypt(bank_account) AS bank_account,
create_time
FROM fgedu_sensitive_data;
— 输出结果
CREATE VIEW
— 授权解密视图
GRANT SELECT ON fgedu_sensitive_data_decrypted TO fgedu_admin;
— 输出结果
GRANT
3.3 PostgreSQL数据库监控插件开发
3.3.1 性能监控插件
— 创建监控指标表
CREATE TABLE fgedu_monitor_metrics (
id BIGSERIAL PRIMARY KEY,
collect_time TIMESTAMP DEFAULT NOW(),
metric_name VARCHAR(100) NOT NULL,
metric_value NUMERIC NOT NULL,
metric_unit VARCHAR(50),
tags JSONB
);
— 输出结果
CREATE TABLE
— 创建索引
CREATE INDEX idx_monitor_metrics_time ON fgedu_monitor_metrics(collect_time);
CREATE INDEX idx_monitor_metrics_name ON fgedu_monitor_metrics(metric_name);
— 输出结果
CREATE INDEX
CREATE INDEX
— 创建监控快照表
CREATE TABLE fgedu_monitor_snapshots (
id SERIAL PRIMARY KEY,
snapshot_time TIMESTAMP DEFAULT NOW(),
active_connections INTEGER,
idle_connections INTEGER,
total_transactions BIGINT,
commits BIGINT,
rollbacks BIGINT,
blocks_read BIGINT,
blocks_hit BIGINT,
cache_hit_ratio NUMERIC(5,2),
deadlocks BIGINT,
temp_files BIGINT,
temp_bytes BIGINT
);
— 输出结果
CREATE TABLE
— 创建监控收集函数
CREATE OR REPLACE FUNCTION fgedu_collect_metrics()
RETURNS void
AS $$
DECLARE
v_active_conn INTEGER;
v_idle_conn INTEGER;
v_total_xact BIGINT;
v_commits BIGINT;
v_rollbacks BIGINT;
v_blocks_read BIGINT;
v_blocks_hit BIGINT;
v_cache_hit_ratio NUMERIC(5,2);
v_deadlocks BIGINT;
v_temp_files BIGINT;
v_temp_bytes BIGINT;
BEGIN
SELECT COUNT(*) FILTER (WHERE state = ‘active’),
COUNT(*) FILTER (WHERE state = ‘idle’)
INTO v_active_conn, v_idle_conn
FROM pg_stat_activity;
SELECT SUM(xact_commit + xact_rollback),
SUM(xact_commit),
SUM(xact_rollback)
INTO v_total_xact, v_commits, v_rollbacks
FROM pg_stat_database;
SELECT SUM(blks_read),
SUM(blks_hit),
CASE
WHEN SUM(blks_read + blks_hit) > 0
THEN ROUND(100.0 * SUM(blks_hit) / SUM(blks_read + blks_hit), 2)
ELSE 0
END
INTO v_blocks_read, v_blocks_hit, v_cache_hit_ratio
FROM pg_stat_database;
SELECT SUM(deadlocks)
INTO v_deadlocks
FROM pg_stat_database;
SELECT SUM(temp_files),
SUM(temp_bytes)
INTO v_temp_files, v_temp_bytes
FROM pg_stat_database;
INSERT INTO fgedu_monitor_snapshots(
active_connections,
idle_connections,
total_transactions,
commits,
rollbacks,
blocks_read,
blocks_hit,
cache_hit_ratio,
deadlocks,
temp_files,
temp_bytes
)
VALUES(
v_active_conn,
v_idle_conn,
v_total_xact,
v_commits,
v_rollbacks,
v_blocks_read,
v_blocks_hit,
v_cache_hit_ratio,
v_deadlocks,
v_temp_files,
v_temp_bytes
);
INSERT INTO fgedu_monitor_metrics(metric_name, metric_value, metric_unit, tags)
VALUES
(‘active_connections’, v_active_conn, ‘count’, ‘{“type”: “connection”}’),
(‘idle_connections’, v_idle_conn, ‘count’, ‘{“type”: “connection”}’),
(‘cache_hit_ratio’, v_cache_hit_ratio, ‘percent’, ‘{“type”: “performance”}’),
(‘deadlocks’, v_deadlocks, ‘count’, ‘{“type”: “lock”}’);
END;
$$ LANGUAGE plpgsql;
— 输出结果
CREATE FUNCTION
— 执行监控收集
SELECT fgedu_collect_metrics();
— 输出结果
fgedu_collect_metrics
———————–
(1 row)
— 查看监控快照
SELECT * FROM fgedu_monitor_snapshots ORDER BY snapshot_time DESC LIMIT 5;
— 输出结果
id | snapshot_time | active_connections | idle_connections | total_transactions | commits | rollbacks | blocks_read | blocks_hit | cache_hit_ratio | deadlocks | temp_files | temp_bytes
—-+—————————+——————–+——————+——————–+———+———–+————-+————+—————–+———–+————+————
1 | 2026-04-07 16:00:00.12345 | 10 | 5 | 50000 | 48000 | 2000 | 1000 | 50000 | 98.04 | 0 | 0 | 0
(1 row)
— 创建监控报告函数
CREATE OR REPLACE FUNCTION fgedu_monitor_report(
p_hours INTEGER DEFAULT 24
)
RETURNS TABLE(
metric_name TEXT,
avg_value NUMERIC,
max_value NUMERIC,
min_value NUMERIC,
sample_count BIGINT
)
AS $$
BEGIN
RETURN QUERY
SELECT
m.metric_name::TEXT,
ROUND(AVG(m.metric_value), 2) AS avg_value,
MAX(m.metric_value) AS max_value,
MIN(m.metric_value) AS min_value,
COUNT(*) AS sample_count
FROM fgedu_monitor_metrics m
WHERE m.collect_time >= NOW() – (p_hours || ‘ hours’)::INTERVAL
GROUP BY m.metric_name
ORDER BY m.metric_name;
END;
$$ LANGUAGE plpgsql;
— 输出结果
CREATE FUNCTION
— 生成监控报告
SELECT * FROM fgedu_monitor_report(24);
— 输出结果
metric_name | avg_value | max_value | min_value | sample_count
———————-+———–+———–+———–+————–
active_connections | 10.50 | 25 | 5 | 48
cache_hit_ratio | 98.50 | 99.50 | 95.00 | 48
deadlocks | 0.00 | 0 | 0 | 48
idle_connections | 5.50 | 10 | 2 | 48
(4 rows)
— 配置定时收集(使用pg_cron)
SELECT cron.schedule(
‘collect_metrics’,
‘*/5 * * * *’,
‘SELECT fgedu_collect_metrics()’
);
— 输出结果
cron.schedule
—————
1
(1 row)
Part04-生产案例与实战讲解
4.1 PostgreSQL数据库订单处理插件实战
本案例演示订单处理插件的开发。学习交流加群风哥QQ113257174。
— 创建订单处理配置表
CREATE TABLE fgedu_order_config (
id SERIAL PRIMARY KEY,
config_key VARCHAR(100) UNIQUE NOT NULL,
config_value TEXT NOT NULL,
description TEXT,
updated_at TIMESTAMP DEFAULT NOW()
);
— 输出结果
CREATE TABLE
— 插入配置
INSERT INTO fgedu_order_config(config_key, config_value, description)
VALUES
(‘max_order_amount’, ‘100000’, ‘最大订单金额’),
(‘min_order_amount’, ‘1’, ‘最小订单金额’),
(‘auto_cancel_hours’, ’24’, ‘自动取消未支付订单小时数’),
(‘inventory_check’, ‘true’, ‘是否检查库存’);
— 输出结果
INSERT 0 4
— 创建订单处理函数
CREATE OR REPLACE FUNCTION fgedu_process_order(
p_order_no VARCHAR,
p_customer_id INTEGER,
p_items JSONB
)
RETURNS TABLE(
success BOOLEAN,
message TEXT,
order_id INTEGER
)
AS $$
DECLARE
v_order_id INTEGER;
v_total_amount NUMERIC := 0;
v_item JSONB;
v_product_id INTEGER;
v_quantity INTEGER;
v_price NUMERIC;
v_stock INTEGER;
v_max_amount NUMERIC;
v_min_amount NUMERIC;
BEGIN
SELECT config_value::NUMERIC INTO v_max_amount
FROM fgedu_order_config WHERE config_key = ‘max_order_amount’;
SELECT config_value::NUMERIC INTO v_min_amount
FROM fgedu_order_config WHERE config_key = ‘min_order_amount’;
FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
LOOP
v_product_id := (v_item->>’product_id’)::INTEGER;
v_quantity := (v_item->>’quantity’)::INTEGER;
SELECT price, stock INTO v_price, v_stock
FROM fgedu_products WHERE id = v_product_id;
IF NOT FOUND THEN
RETURN QUERY SELECT FALSE, ‘Product not found: ‘ || v_product_id, NULL::INTEGER;
RETURN;
END IF;
IF v_stock < v_quantity THEN
RETURN QUERY SELECT FALSE, 'Insufficient stock for product: ' || v_product_id, NULL::INTEGER;
RETURN;
END IF;
v_total_amount := v_total_amount + (v_price * v_quantity);
END LOOP;
IF v_total_amount > v_max_amount THEN
RETURN QUERY SELECT FALSE, ‘Order amount exceeds maximum limit’, NULL::INTEGER;
RETURN;
END IF;
IF v_total_amount < v_min_amount THEN
RETURN QUERY SELECT FALSE, 'Order amount below minimum limit', NULL::INTEGER;
RETURN;
END IF;
INSERT INTO fgedu_orders(order_no, customer_id, amount, status)
VALUES(p_order_no, p_customer_id, v_total_amount, 'pending')
RETURNING id INTO v_order_id;
FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
LOOP
v_product_id := (v_item->>’product_id’)::INTEGER;
v_quantity := (v_item->>’quantity’)::INTEGER;
SELECT price INTO v_price FROM fgedu_products WHERE id = v_product_id;
INSERT INTO fgedu_order_items(order_id, product_id, quantity, price)
VALUES(v_order_id, v_product_id, v_quantity, v_price);
UPDATE fgedu_products
SET stock = stock – v_quantity
WHERE id = v_product_id;
END LOOP;
RETURN QUERY SELECT TRUE, ‘Order created successfully’, v_order_id;
END;
$$ LANGUAGE plpgsql;
— 输出结果
CREATE FUNCTION
— 测试订单处理
SELECT * FROM fgedu_process_order(
‘ORD200’,
1,
‘[{“product_id”: 1, “quantity”: 2}, {“product_id”: 2, “quantity”: 1}]’::jsonb
);
— 输出结果
success | message | order_id
———+————————+———-
t | Order created successfully | 200
(1 row)
— 查看订单
SELECT * FROM fgedu_orders WHERE order_no = ‘ORD200’;
— 输出结果
id | order_no | customer_id | amount | status | create_time
—–+———-+————-+———+———+———————
200 | ORD200 | 1 | 20997.00| pending | 2026-04-07 16:00:00
(1 row)
4.2 PostgreSQL数据库报表生成插件实战
本案例演示报表生成插件的开发。更多学习教程公众号风哥教程itpux_com。
— 创建报表定义表
CREATE TABLE fgedu_report_definitions (
id SERIAL PRIMARY KEY,
report_name VARCHAR(200) UNIQUE NOT NULL,
report_sql TEXT NOT NULL,
description TEXT,
parameters JSONB,
created_at TIMESTAMP DEFAULT NOW(),
created_by VARCHAR(100) DEFAULT CURRENT_USER
);
— 输出结果
CREATE TABLE
— 创建报表执行历史表
CREATE TABLE fgedu_report_history (
id BIGSERIAL PRIMARY KEY,
report_id INTEGER REFERENCES fgedu_report_definitions(id),
executed_at TIMESTAMP DEFAULT NOW(),
executed_by VARCHAR(100) DEFAULT CURRENT_USER,
parameters JSONB,
row_count INTEGER,
execution_time_ms INTEGER,
status VARCHAR(20)
);
— 输出结果
CREATE TABLE
— 插入报表定义
INSERT INTO fgedu_report_definitions(report_name, report_sql, description, parameters)
VALUES
(‘daily_sales’,
‘SELECT DATE(create_time) AS sale_date, COUNT(*) AS order_count, SUM(amount) AS total_amount FROM fgedu_orders WHERE create_time >= :start_date AND create_time < :end_date + 1 GROUP BY DATE(create_time) ORDER BY sale_date',
'Daily sales report',
'{"start_date": {"type": "date", "required": true}, "end_date": {"type": "date", "required": true}}'),
('customer_ranking',
'SELECT c.customer_name, COUNT(o.id) AS order_count, SUM(o.amount) AS total_amount FROM fgedu_customers c LEFT JOIN fgedu_orders o ON c.id = o.customer_id GROUP BY c.id, c.customer_name ORDER BY total_amount DESC LIMIT :limit',
'Customer ranking report',
'{"limit": {"type": "integer", "default": 10}}');
-- 输出结果
INSERT 0 2
-- 创建报表执行函数
CREATE OR REPLACE FUNCTION fgedu_execute_report(
p_report_name VARCHAR,
p_params JSONB DEFAULT '{}'::jsonb
)
RETURNS TABLE(result JSONB)
AS $$
DECLARE
v_report RECORD;
v_sql TEXT;
v_start_time TIMESTAMP;
v_row_count INTEGER;
BEGIN
SELECT * INTO v_report
FROM fgedu_report_definitions
WHERE report_name = p_report_name;
IF NOT FOUND THEN
RAISE EXCEPTION 'Report not found: %', p_report_name;
END IF;
v_sql := v_report.report_sql;
FOR v_sql IN
SELECT regexp_replace(
v_sql,
':' || key,
CASE
WHEN value->>’type’ = ‘date’ THEN ”” || (p_params->>key) || ””
WHEN value->>’type’ = ‘integer’ THEN (p_params->>key)
WHEN value->>’type’ = ‘text’ THEN ”” || (p_params->>key) || ””
ELSE (p_params->>key)
END,
‘g’
)
LOOP
NULL;
END LOOP;
v_start_time := clock_timestamp();
RETURN QUERY EXECUTE v_sql;
GET DIAGNOSTICS v_row_count = ROW_COUNT;
INSERT INTO fgedu_report_history(
report_id,
parameters,
row_count,
execution_time_ms,
status
)
VALUES(
v_report.id,
p_params,
v_row_count,
EXTRACT(MILLISECONDS FROM clock_timestamp() – v_start_time)::INTEGER,
‘success’
);
END;
$$ LANGUAGE plpgsql;
— 输出结果
CREATE FUNCTION
— 执行报表
SELECT * FROM fgedu_execute_report(
‘daily_sales’,
‘{“start_date”: “2026-04-01”, “end_date”: “2026-04-07”}’::jsonb
);
— 输出结果
result
———————————————————-
{“sale_date”: “2026-04-01”, “order_count”: 50, “total_amount”: 50000.00}
{“sale_date”: “2026-04-02”, “order_count”: 45, “total_amount”: 45000.00}
{“sale_date”: “2026-04-03”, “order_count”: 60, “total_amount”: 60000.00}
(3 rows)
— 查看报表历史
SELECT r.report_name, h.executed_at, h.row_count, h.execution_time_ms
FROM fgedu_report_history h
JOIN fgedu_report_definitions r ON h.report_id = r.id
ORDER BY h.executed_at DESC
LIMIT 10;
— 输出结果
report_name | executed_at | row_count | execution_time_ms
—————+————————–+———–+——————-
daily_sales | 2026-04-07 16:00:00.123 | 7 | 15
(1 row)
4.3 PostgreSQL数据库集成插件实战
本案例演示系统集成插件的开发。from PostgreSQL视频:www.itpux.com。
— 创建集成配置表
CREATE TABLE fgedu_integration_config (
id SERIAL PRIMARY KEY,
integration_name VARCHAR(100) UNIQUE NOT NULL,
integration_type VARCHAR(50) NOT NULL,
endpoint_url VARCHAR(500),
auth_type VARCHAR(50),
auth_config JSONB,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);
— 输出结果
CREATE TABLE
— 创建集成日志表
CREATE TABLE fgedu_integration_log (
id BIGSERIAL PRIMARY KEY,
integration_id INTEGER REFERENCES fgedu_integration_config(id),
event_time TIMESTAMP DEFAULT NOW(),
event_type VARCHAR(50),
request_data JSONB,
response_data JSONB,
status VARCHAR(20),
error_message TEXT
);
— 输出结果
CREATE TABLE
— 插入集成配置
INSERT INTO fgedu_integration_config(
integration_name,
integration_type,
endpoint_url,
auth_type,
auth_config
)
VALUES
(‘notification_service’, ‘webhook’, ‘https://api.fgedu.net.cn/notify’, ‘bearer’, ‘{“token”: “xxx”}’),
(‘inventory_sync’, ‘api’, ‘https://erp.fgedu.net.cn/api/inventory’, ‘basic’, ‘{“username”: “fgedu”, “password”: “xxx”}’);
— 输出结果
INSERT 0 2
— 创建通知发送函数
CREATE OR REPLACE FUNCTION fgedu_send_notification(
p_event_type VARCHAR,
p_data JSONB
)
RETURNS BOOLEAN
AS $$
DECLARE
v_config RECORD;
v_request JSONB;
BEGIN
SELECT * INTO v_config
FROM fgedu_integration_config
WHERE integration_name = ‘notification_service’
AND enabled = TRUE;
IF NOT FOUND THEN
RETURN FALSE;
END IF;
v_request := jsonb_build_object(
‘event_type’, p_event_type,
‘data’, p_data,
‘timestamp’, NOW()
);
INSERT INTO fgedu_integration_log(
integration_id,
event_type,
request_data,
status
)
VALUES(
v_config.id,
p_event_type,
v_request,
‘sent’
);
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
— 输出结果
CREATE FUNCTION
— 创建库存同步触发器
CREATE OR REPLACE FUNCTION fgedu_sync_inventory()
RETURNS TRIGGER
AS $$
BEGIN
PERFORM fgedu_send_notification(
‘inventory_update’,
jsonb_build_object(
‘product_id’, NEW.id,
‘product_name’, NEW.product_name,
‘stock’, NEW.stock,
‘old_stock’, OLD.stock
)
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
— 输出结果
CREATE FUNCTION
CREATE TRIGGER trg_sync_inventory
AFTER UPDATE ON fgedu_products
FOR EACH ROW
WHEN (OLD.stock IS DISTINCT FROM NEW.stock)
EXECUTE FUNCTION fgedu_sync_inventory();
— 输出结果
CREATE TRIGGER
— 测试库存同步
UPDATE fgedu_products SET stock = stock – 10 WHERE id = 1;
— 输出结果
UPDATE 1
— 查看集成日志
SELECT i.integration_name, l.event_type, l.request_data, l.status
FROM fgedu_integration_log l
JOIN fgedu_integration_config i ON l.integration_id = i.id
ORDER BY l.event_time DESC
LIMIT 5;
— 输出结果
integration_name | event_type | request_data | status
——————–+—————–+—————————————————-+——–
notification_service | inventory_update | {“product_id”: 1, “product_name”: “iPhone 15”, …} | sent
(1 row)
Part05-风哥经验总结与分享
5.1 PostgreSQL数据库插件开发最佳实践
插件开发最佳实践:模块化设计,职责清晰;完善的错误处理;详细的文档说明;充分的测试覆盖;版本管理规范。
- 设计插件架构
- 编写控制文件
- 开发核心功能
- 编写测试用例
- 编写文档说明
- 性能测试验证
- 安全审计检查
5.2 PostgreSQL数据库插件调试技巧
调试技巧:使用elog输出调试信息;使用gdb调试C代码;检查PostgreSQL日志;使用ASSERT验证假设;编写单元测试。
5.3 PostgreSQL数据库插件开发常见问题
常见问题:内存泄漏、段错误、权限问题、版本兼容性、性能问题。
— 问题1:插件加载失败
CREATE EXTENSION fgedu_business;
— 输出结果
ERROR: could not load library “/postgresql/lib/fgedu_business.so”
DETAIL: /postgresql/lib/fgedu_business.so: undefined symbol: some_function
— 解决方案:检查符号定义和依赖
$ nm -D /postgresql/lib/fgedu_business.so | grep some_function
— 问题2:权限不足
SELECT fgedu_decrypt(id_card) FROM fgedu_sensitive_data;
— 输出结果
ERROR: permission denied for function fgedu_decrypt
— 解决方案:授权函数执行权限
GRANT EXECUTE ON FUNCTION fgedu_decrypt(text) TO fgedu_user;
— 输出结果
GRANT
— 问题3:性能问题
EXPLAIN ANALYZE SELECT * FROM fgedu_audit_log WHERE event_time > NOW() – INTERVAL ‘1 day’;
— 输出结果
Seq Scan on fgedu_audit_log (cost=0.00..10000.00 rows=1000 width=500) (actual time=0.015..100.123 rows=5000 loops=1)
Filter: (event_time > (now() – ‘1 day’::interval))
Planning Time: 0.123 ms
Execution Time: 100.234 ms
(4 rows)
— 解决方案:创建索引
CREATE INDEX idx_audit_log_time ON fgedu_audit_log(event_time);
— 输出结果
CREATE INDEX
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
