1. 首页 > PostgreSQL教程 > 正文

PostgreSQL教程FG179-PG系统函数定制:基于SPI的开发实操

本文档风哥主要介绍PostgreSQL数据库系统函数定制开发,包括基于SPI的标量函数、集合返回函数、聚合函数开发等内容,风哥教程参考PostgreSQL官方文档Server Programming Interface、C-Language Functions等内容,适合高级开发人员在生产环境中开发自定义系统函数。

Part01-基础概念与理论知识

1.1 PostgreSQL数据库系统函数定制概述

系统函数定制是指使用C语言和SPI接口开发高性能的PostgreSQL函数。更多视频教程www.fgedu.net.cn。相比PL/pgSQL函数,C函数具有更高的执行效率、更低的资源消耗,适合处理复杂计算、大量数据操作等场景。通过SPI接口,C函数可以执行SQL语句、访问数据库对象,实现与PL/pgSQL相同的功能,但性能更优。

PostgreSQL数据库C函数优势:

  • 执行效率高,接近原生性能
  • 资源消耗低,内存管理精细
  • 功能强大,可访问底层API
  • 可移植性好,跨平台编译
  • 适合复杂计算和大数据处理

1.2 PostgreSQL数据库SPI开发要点

SPI开发要点:正确管理SPI连接,确保connect和finish配对;正确处理错误,使用PG_TRY/PG_CATCH;正确管理内存,使用palloc/pfree;正确处理NULL值,使用PG_ARGISNULL检查;正确返回结果,使用PG_RETURN_*宏。学习交流加群风哥微信: itpux-com。

1.3 PostgreSQL数据库函数类型分类

函数类型包括:标量函数(返回单个值)、集合返回函数(返回多行)、聚合函数(实现自定义聚合)、窗口函数(实现窗口计算)、触发器函数(实现触发器逻辑)。

Part02-生产环境规划与建议

2.1 PostgreSQL数据库系统函数设计原则

函数设计原则:单一职责,每个函数只做一件事;参数验证,检查输入参数有效性;错误处理,提供清晰的错误信息;文档完善,说明函数用途和参数;性能优先,避免不必要的操作。

2.2 PostgreSQL数据库函数安全开发

安全开发要点:验证所有输入参数;防止缓冲区溢出;正确处理NULL值;使用安全的字符串操作函数;避免SQL注入风险。

2.3 PostgreSQL数据库函数性能优化

性能优化要点:使用预编译计划减少解析开销;批量处理减少函数调用次数;合理使用内存避免频繁分配;避免在循环中执行SQL。

风哥提示:C函数开发需要深入理解PostgreSQL内部机制。建议在开发前充分学习SPI接口和内存管理,遵循PostgreSQL编码规范,并进行充分的测试。

Part03-生产环境项目实施方案

3.1 PostgreSQL数据库标量函数开发

3.1.1 基本标量函数开发

— 开发标量函数示例

— C源代码 fgedu_scalar.c
#include “postgres.h”
#include “fmgr.h”
#include “utils/builtins.h”
#include “utils/numeric.h”
#include “utils/date.h”
#include “utils/timestamp.h”

PG_MODULE_MAGIC;

/* 字符串处理函数 */
PG_FUNCTION_INFO_V1(fgedu_string_mask);
Datum
fgedu_string_mask(PG_FUNCTION_ARGS)
{
text *input_text;
text *result_text;
char *input_str;
char *result_str;
int len;
int mask_len;
int i;

if (PG_ARGISNULL(0))
PG_RETURN_NULL();

input_text = PG_GETARG_TEXT_PP(0);
input_str = VARDATA_ANY(input_text);
len = VARSIZE_ANY_EXHDR(input_text);

/* 保留前3个和后4个字符,中间用*替换 */
if (len <= 7) { mask_len = len; } else { mask_len = len - 7; } result_text = (text *) palloc(VARHDRSZ + len); SET_VARSIZE(result_text, VARHDRSZ + len); result_str = VARDATA(result_text); for (i = 0; i < len; i++) { if (i < 3 || i >= len – 4)
result_str[i] = input_str[i];
else
result_str[i] = ‘*’;
}

PG_RETURN_TEXT_P(result_text);
}

/* 数值处理函数 */
PG_FUNCTION_INFO_V1(fgedu_safe_divide);
Datum
fgedu_safe_divide(PG_FUNCTION_ARGS)
{
float8 dividend;
float8 divisor;
float8 result;

if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
PG_RETURN_NULL();

dividend = PG_GETARG_FLOAT8(0);
divisor = PG_GETARG_FLOAT8(1);

if (divisor == 0.0)
{
ereport(WARNING,
(errcode(ERRCODE_DIVISION_BY_ZERO),
errmsg(“division by zero”),
errdetail(“Returning NULL instead.”)));
PG_RETURN_NULL();
}

result = dividend / divisor;
PG_RETURN_FLOAT8(result);
}

/* 日期处理函数 */
PG_FUNCTION_INFO_V1(fgedu_workdays_between);
Datum
fgedu_workdays_between(PG_FUNCTION_ARGS)
{
DateADT start_date;
DateADT end_date;
int32 days;
int32 workdays = 0;
int32 i;
int day_of_week;

if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
PG_RETURN_NULL();

start_date = PG_GETARG_DATEADT(0);
end_date = PG_GETARG_DATEADT(1);

if (start_date > end_date)
{
DateADT temp = start_date;
start_date = end_date;
end_date = temp;
}

days = end_date – start_date + 1;

for (i = 0; i < days; i++) { day_of_week = (start_date + i + 1) % 7; if (day_of_week >= 1 && day_of_week <= 5) workdays++; } PG_RETURN_INT32(workdays); } -- SQL定义文件 CREATE FUNCTION fgedu_string_mask(text) RETURNS text AS 'fgedu_scalar', 'fgedu_string_mask' LANGUAGE C IMMUTABLE STRICT; CREATE FUNCTION fgedu_safe_divide(float8, float8) RETURNS float8 AS 'fgedu_scalar', 'fgedu_safe_divide' LANGUAGE C; CREATE FUNCTION fgedu_workdays_between(date, date) RETURNS integer AS 'fgedu_scalar', 'fgedu_workdays_between' LANGUAGE C IMMUTABLE STRICT; -- 测试函数 SELECT fgedu_string_mask('1234567890123456'); SELECT fgedu_safe_divide(100.0, 3.0); SELECT fgedu_workdays_between('2026-04-01', '2026-04-30'); -- 输出结果 fgedu_string_mask ------------------- 123********3456 (1 row) fgedu_safe_divide ------------------- 33.333333 (1 row) fgedu_workdays_between ------------------------ 22 (1 row)

3.2 PostgreSQL数据库集合返回函数开发

3.2.1 返回多行的函数

— 开发集合返回函数

— C源代码 fgedu_set.c
#include “postgres.h”
#include “fmgr.h”
#include “funcapi.h”
#include “executor/spi.h”
#include “utils/builtins.h”
#include “utils/array.h”
#include “catalog/pg_type.h”

PG_MODULE_MAGIC;

/* 返回数字序列 */
PG_FUNCTION_INFO_V1(fgedu_generate_series_custom);
Datum
fgedu_generate_series_custom(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
int64 *current;
int64 start;
int64 end;
int64 step;

if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;

funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

start = PG_GETARG_INT64(0);
end = PG_GETARG_INT64(1);
step = PG_GETARG_INT64(2);

if (step == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg(“step size cannot be zero”)));

current = (int64 *) palloc(sizeof(int64));
*current = start;

funcctx->user_fctx = current;
funcctx->max_calls = (step > 0) ?
((end – start) / step + 1) :
((start – end) / (-step) + 1);

MemoryContextSwitchTo(oldcontext);
}

funcctx = SRF_PERCALL_SETUP();
current = (int64 *) funcctx->user_fctx;

if (funcctx->call_cntr < funcctx->max_calls)
{
int64 result = *current;
*current += PG_GETARG_INT64(2);
SRF_RETURN_NEXT(funcctx, Int64GetDatum(result));
}
else
{
SRF_RETURN_DONE(funcctx);
}
}

/* 返回复合类型 */
PG_FUNCTION_INFO_V1(fgedu_get_table_stats);
Datum
fgedu_get_table_stats(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
HeapTuple tuple;
TupleDesc tupdesc;
AttInMetadata *attinmeta;
char *values[5];
int ret;

if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;

funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

/* 建立SPI连接 */
ret = SPI_connect();
if (ret != SPI_OK_CONNECT)
elog(ERROR, “SPI_connect failed”);

/* 执行查询 */
ret = SPI_execute(
“SELECT schemaname, tablename, ”
” pg_size_pretty(pg_total_relation_size(schemaname||’.’||tablename)) as size, ”
” (SELECT count(*) FROM pg_indexes WHERE schemaname = t.schemaname AND tablename = t.tablename) as index_count, ”
” (SELECT count(*) FROM pg_trigger WHERE tgrelid = (schemaname||’.’||tablename)::regclass) as trigger_count ”
“FROM pg_tables t ”
“WHERE schemaname NOT IN (‘pg_catalog’, ‘information_schema’) ”
“ORDER BY pg_total_relation_size(schemaname||’.’||tablename) DESC”,
true, 0);

if (ret != SPI_OK_SELECT)
elog(ERROR, “SPI_execute failed”);

funcctx->user_fctx = SPI_tuptable;
funcctx->max_calls = SPI_processed;

/* 创建返回类型描述 */
tupdesc = CreateTemplateTupleDesc(5);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, “schema_name”, TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, “table_name”, TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, “size”, TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, “index_count”, INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, “trigger_count”, INT4OID, -1, 0);

attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;

MemoryContextSwitchTo(oldcontext);
}

funcctx = SRF_PERCALL_SETUP();

if (funcctx->call_cntr < funcctx->max_calls)
{
SPITupleTable *tuptable = (SPITupleTable *) funcctx->user_fctx;
HeapTuple result_tuple;
Datum result;
int i;

tuple = tuptable->vals[funcctx->call_cntr];

for (i = 0; i < 5; i++) { values[i] = SPI_getvalue(tuple, tuptable->tupdesc, i + 1);
}

result_tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
result = HeapTupleGetDatum(result_tuple);

for (i = 0; i < 5; i++) { if (values[i]) pfree(values[i]); } SRF_RETURN_NEXT(funcctx, result); } else { SPI_finish(); SRF_RETURN_DONE(funcctx); } } -- SQL定义 CREATE FUNCTION fgedu_generate_series_custom(start bigint, end bigint, step bigint) RETURNS SETOF bigint AS 'fgedu_set', 'fgedu_generate_series_custom' LANGUAGE C IMMUTABLE STRICT; CREATE FUNCTION fgedu_get_table_stats() RETURNS TABLE(schema_name text, table_name text, size text, index_count integer, trigger_count integer) AS 'fgedu_set', 'fgedu_get_table_stats' LANGUAGE C; -- 测试 SELECT * FROM fgedu_generate_series_custom(1, 10, 2); SELECT * FROM fgedu_get_table_stats() LIMIT 5; -- 输出结果 fgedu_generate_series_custom ------------------------------ 1 3 5 7 9 (5 rows) schema_name | table_name | size | index_count | trigger_count -------------+-----------------+---------+-------------+--------------- public | fgedu_orders | 128 MB | 3 | 2 public | fgedu_products | 64 MB | 2 | 1 public | fgedu_customers | 32 MB | 2 | 0 (3 rows)

3.3 PostgreSQL数据库聚合函数开发

3.3.1 自定义聚合函数

— 开发自定义聚合函数

— C源代码 fgedu_aggregate.c
#include “postgres.h”
#include “fmgr.h”
#include “utils/builtins.h”
#include “utils/array.h”
#include “catalog/pg_type.h”
#include “utils/lsyscache.h”

PG_MODULE_MAGIC;

/* 字符串连接聚合 */
typedef struct StringAggState
{
char *result;
int length;
int capacity;
char *delimiter;
int delimiter_len;
} StringAggState;

PG_FUNCTION_INFO_V1(fgedu_string_agg_transfn);
Datum
fgedu_string_agg_transfn(PG_FUNCTION_ARGS)
{
MemoryContext agg_context;
StringAggState *state;
text *value;
text *delimiter;
char *str;
int str_len;

if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, “fgedu_string_agg_transfn called in non-aggregate context”);

if (PG_ARGISNULL(0))
{
/* 第一次调用,初始化状态 */
state = (StringAggState *) MemoryContextAlloc(agg_context, sizeof(StringAggState));
state->capacity = 1024;
state->result = (char *) MemoryContextAlloc(agg_context, state->capacity);
state->result[0] = ‘\0’;
state->length = 0;

if (!PG_ARGISNULL(1))
{
delimiter = PG_GETARG_TEXT_PP(1);
state->delimiter = text_to_cstring(delimiter);
state->delimiter_len = strlen(state->delimiter);
}
else
{
state->delimiter = “,”;
state->delimiter_len = 1;
}
}
else
{
state = (StringAggState *) PG_GETARG_POINTER(0);
}

if (!PG_ARGISNULL(2))
{
value = PG_GETARG_TEXT_PP(2);
str = text_to_cstring(value);
str_len = strlen(str);

/* 如果不是第一个元素,添加分隔符 */
if (state->length > 0)
{
if (state->length + state->delimiter_len + str_len + 1 > state->capacity)
{
state->capacity = (state->length + state->delimiter_len + str_len + 1) * 2;
state->result = (char *) repalloc(state->result, state->capacity);
}
memcpy(state->result + state->length, state->delimiter, state->delimiter_len);
state->length += state->delimiter_len;
}

if (state->length + str_len + 1 > state->capacity)
{
state->capacity = (state->length + str_len + 1) * 2;
state->result = (char *) repalloc(state->result, state->capacity);
}

memcpy(state->result + state->length, str, str_len);
state->length += str_len;
state->result[state->length] = ‘\0’;

pfree(str);
}

PG_RETURN_POINTER(state);
}

PG_FUNCTION_INFO_V1(fgedu_string_agg_finalfn);
Datum
fgedu_string_agg_finalfn(PG_FUNCTION_ARGS)
{
StringAggState *state;

if (PG_ARGISNULL(0))
PG_RETURN_NULL();

state = (StringAggState *) PG_GETARG_POINTER(0);
PG_RETURN_TEXT_P(cstring_to_text(state->result));
}

/* 中位数计算聚合 */
PG_FUNCTION_INFO_V1(fgedu_median_transfn);
Datum
fgedu_median_transfn(PG_FUNCTION_ARGS)
{
MemoryContext agg_context;
ArrayType *array;
float8 value;
int nelems;
float8 *values;

if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, “fgedu_median_transfn called in non-aggregate context”);

if (PG_ARGISNULL(0))
{
/* 第一次调用,创建数组 */
if (PG_ARGISNULL(1))
PG_RETURN_NULL();

values = (float8 *) MemoryContextAlloc(agg_context, sizeof(float8));
values[0] = PG_GETARG_FLOAT8(1);
array = construct_array((Datum *)values, 1, FLOAT8OID, 8, true, ‘d’);
PG_RETURN_ARRAYTYPE_P(array);
}
else
{
array = PG_GETARG_ARRAYTYPE_P(0);

if (PG_ARGISNULL(1))
PG_RETURN_ARRAYTYPE_P(array);

value = PG_GETARG_FLOAT8(1);
nelems = ARR_DIMS(array)[0];

values = (float8 *) ARR_DATA_PTR(array);
values = (float8 *) repalloc(values, (nelems + 1) * sizeof(float8));
values[nelems] = value;

array = construct_array((Datum *)values, nelems + 1, FLOAT8OID, 8, true, ‘d’);
PG_RETURN_ARRAYTYPE_P(array);
}
}

PG_FUNCTION_INFO_V1(fgedu_median_finalfn);
Datum
fgedu_median_finalfn(PG_FUNCTION_ARGS)
{
ArrayType *array;
float8 *values;
int nelems;
float8 median;
int i, j;
float8 temp;

if (PG_ARGISNULL(0))
PG_RETURN_NULL();

array = PG_GETARG_ARRAYTYPE_P(0);
values = (float8 *) ARR_DATA_PTR(array);
nelems = ARR_DIMS(array)[0];

/* 简单排序 */
for (i = 0; i < nelems - 1; i++) { for (j = i + 1; j < nelems; j++) { if (values[i] > values[j])
{
temp = values[i];
values[i] = values[j];
values[j] = temp;
}
}
}

/* 计算中位数 */
if (nelems % 2 == 0)
median = (values[nelems/2 – 1] + values[nelems/2]) / 2.0;
else
median = values[nelems/2];

PG_RETURN_FLOAT8(median);
}

— SQL定义
CREATE FUNCTION fgedu_string_agg_transfn(internal, text, text)
RETURNS internal
AS ‘fgedu_aggregate’, ‘fgedu_string_agg_transfn’
LANGUAGE C;

CREATE FUNCTION fgedu_string_agg_finalfn(internal)
RETURNS text
AS ‘fgedu_aggregate’, ‘fgedu_string_agg_finalfn’
LANGUAGE C;

CREATE AGGREGATE fgedu_string_agg(text, text) (
SFUNC = fgedu_string_agg_transfn,
STYPE = internal,
FINALFUNC = fgedu_string_agg_finalfn
);

CREATE FUNCTION fgedu_median_transfn(float8[], float8)
RETURNS float8[]
AS ‘fgedu_aggregate’, ‘fgedu_median_transfn’
LANGUAGE C;

CREATE FUNCTION fgedu_median_finalfn(float8[])
RETURNS float8
AS ‘fgedu_aggregate’, ‘fgedu_median_finalfn’
LANGUAGE C;

CREATE AGGREGATE fgedu_median(float8) (
SFUNC = fgedu_median_transfn,
STYPE = float8[],
FINALFUNC = fgedu_median_finalfn
);

— 测试
SELECT fgedu_string_agg(name, ‘, ‘) FROM fgedu_users;
SELECT fgedu_median(value) FROM fgedu_metrics;

— 输出结果
fgedu_string_agg
———————————–
user1, user2, user3, user4, user5
(1 row)

fgedu_median
————–
125.5
(1 row)

Part04-生产案例与实战讲解

4.1 PostgreSQL数据库监控函数开发实战

本案例演示如何开发数据库监控函数。学习交流加群风哥QQ113257174。

— 开发监控函数

— C源代码 fgedu_monitor.c
#include “postgres.h”
#include “fmgr.h”
#include “funcapi.h”
#include “executor/spi.h”
#include “utils/builtins.h”
#include “storage/proc.h”
#include “storage/procarray.h”
#include “utils/timestamp.h”

PG_MODULE_MAGIC;

/* 获取会话统计 */
PG_FUNCTION_INFO_V1(fgedu_get_session_stats);
Datum
fgedu_get_session_stats(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
HeapTuple tuple;
TupleDesc tupdesc;
AttInMetadata *attinmeta;
int ret;

if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;

funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

ret = SPI_connect();
if (ret != SPI_OK_CONNECT)
elog(ERROR, “SPI_connect failed”);

ret = SPI_execute(
“SELECT ”
” pid, ”
” usename, ”
” fgapplication_name, ”
” client_addr::text, ”
” state, ”
” EXTRACT(EPOCH FROM (now() – query_start))::int as duration_sec, ”
” query ”
“FROM pg_stat_activity ”
“WHERE state IS NOT NULL ”
“ORDER BY query_start”,
true, 0);

if (ret != SPI_OK_SELECT)
elog(ERROR, “SPI_execute failed”);

funcctx->user_fctx = SPI_tuptable;
funcctx->max_calls = SPI_processed;

tupdesc = CreateTemplateTupleDesc(7);
TupleDescInitEntry(tupdesc, 1, “pid”, INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, 2, “username”, TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, 3, “fgapplication”, TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, 4, “client_addr”, TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, 5, “state”, TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, 6, “duration_sec”, INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, 7, “query”, TEXTOID, -1, 0);

attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;

MemoryContextSwitchTo(oldcontext);
}

funcctx = SRF_PERCALL_SETUP();

if (funcctx->call_cntr < funcctx->max_calls)
{
SPITupleTable *tuptable = (SPITupleTable *) funcctx->user_fctx;
HeapTuple result_tuple;
Datum result;
char *values[7];
int i;

tuple = tuptable->vals[funcctx->call_cntr];

for (i = 0; i < 7; i++) values[i] = SPI_getvalue(tuple, tuptable->tupdesc, i + 1);

result_tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
result = HeapTupleGetDatum(result_tuple);

for (i = 0; i < 7; i++) if (values[i]) pfree(values[i]); SRF_RETURN_NEXT(funcctx, result); } else { SPI_finish(); SRF_RETURN_DONE(funcctx); } } -- SQL定义 CREATE FUNCTION fgedu_get_session_stats() RETURNS TABLE( pid integer, username text, fgapplication text, client_addr text, state text, duration_sec integer, query text ) AS 'fgedu_monitor', 'fgedu_get_session_stats' LANGUAGE C; -- 测试 SELECT * FROM fgedu_get_session_stats(); -- 输出结果 pid | username | fgapplication | client_addr | state | duration_sec | query -------+----------+-----------------+----------------+---------+--------------+---------------------------- 12345 | fgedu | psql | 192.168.1.100 | active | 0 | SELECT * FROM fgedu_get... 12346 | fgedu | pgAdmin | 192.168.1.101 | idle | 30 | (2 rows)

4.2 PostgreSQL数据库管理函数开发实战

本案例演示如何开发数据库管理函数。更多学习教程公众号风哥教程itpux_com。

— 开发管理函数

— C源代码 fgedu_admin.c
#include “postgres.h”
#include “fmgr.h”
#include “executor/spi.h”
#include “utils/builtins.h”
#include “commands/vacuum.h”
#include “commands/analyze.h”
#include “access/heapam.h”

PG_MODULE_MAGIC;

/* 批量VACUUM分析 */
PG_FUNCTION_INFO_V1(fgedu_vacuum_analyze_tables);
Datum
fgedu_vacuum_analyze_tables(PG_FUNCTION_ARGS)
{
int ret;
int processed = 0;
text *schema_name;
char *schema_str;
char *query;

if (PG_ARGISNULL(0))
schema_str = “public”;
else
{
schema_name = PG_GETARG_TEXT_PP(0);
schema_str = text_to_cstring(schema_name);
}

SPI_connect();

query = psprintf(
“SELECT tablename FROM pg_tables WHERE schemaname = ‘%s'”,
schema_str
);

ret = SPI_execute(query, true, 0);
pfree(query);

if (ret == SPI_OK_SELECT && SPI_processed > 0)
{
for (int i = 0; i < SPI_processed; i++) { HeapTuple tuple = SPI_tuptable->vals[i];
char *table_name = SPI_getvalue(tuple, SPI_tuptable->tupdesc, 1);

query = psprintf(“VACUUM ANALYZE %s.%s”, schema_str, table_name);
ret = SPI_execute(query, false, 0);
pfree(query);
pfree(table_name);

if (ret == SPI_OK_UTILITY)
processed++;
}
}

SPI_finish();

PG_RETURN_INT32(processed);
}

/* 重建索引 */
PG_FUNCTION_INFO_V1(fgedu_reindex_tables);
Datum
fgedu_reindex_tables(PG_FUNCTION_ARGS)
{
int ret;
int processed = 0;
text *schema_name;
char *schema_str;
char *query;

if (PG_ARGISNULL(0))
schema_str = “public”;
else
{
schema_name = PG_GETARG_TEXT_PP(0);
schema_str = text_to_cstring(schema_name);
}

SPI_connect();

query = psprintf(
“SELECT tablename FROM pg_tables WHERE schemaname = ‘%s'”,
schema_str
);

ret = SPI_execute(query, true, 0);
pfree(query);

if (ret == SPI_OK_SELECT && SPI_processed > 0)
{
for (int i = 0; i < SPI_processed; i++) { HeapTuple tuple = SPI_tuptable->vals[i];
char *table_name = SPI_getvalue(tuple, SPI_tuptable->tupdesc, 1);

query = psprintf(“REINDEX TABLE %s.%s”, schema_str, table_name);
ret = SPI_execute(query, false, 0);
pfree(query);
pfree(table_name);

if (ret == SPI_OK_UTILITY)
processed++;
}
}

SPI_finish();

PG_RETURN_INT32(processed);
}

— SQL定义
CREATE FUNCTION fgedu_vacuum_analyze_tables(schema_name text DEFAULT ‘public’)
RETURNS integer
AS ‘fgedu_admin’, ‘fgedu_vacuum_analyze_tables’
LANGUAGE C;

CREATE FUNCTION fgedu_reindex_tables(schema_name text DEFAULT ‘public’)
RETURNS integer
AS ‘fgedu_admin’, ‘fgedu_reindex_tables’
LANGUAGE C;

— 测试
SELECT fgedu_vacuum_analyze_tables(‘public’);
SELECT fgedu_reindex_tables(‘public’);

— 输出结果
fgedu_vacuum_analyze_tables
—————————–
15
(1 row)

fgedu_reindex_tables
———————-
15
(1 row)

4.3 PostgreSQL数据库工具函数开发实战

本案例演示如何开发数据库工具函数。from PostgreSQL视频:www.itpux.com。

— 开发工具函数

— C源代码 fgedu_util.c
#include “postgres.h”
#include “fmgr.h”
#include “utils/builtins.h”
#include “utils/formatting.h”
#include “mb/pg_wchar.h”
#include “lib/stringinfo.h”

PG_MODULE_MAGIC;

/* 格式化字节大小 */
PG_FUNCTION_INFO_V1(fgedu_format_bytes);
Datum
fgedu_format_bytes(PG_FUNCTION_ARGS)
{
int64 bytes;
char *result;
const char *units[] = {“B”, “KB”, “MB”, “GB”, “TB”, “PB”};
int unit_index = 0;
float64 value;

if (PG_ARGISNULL(0))
PG_RETURN_NULL();

bytes = PG_GETARG_INT64(0);
value = (float64) bytes;

while (value >= 1024.0 && unit_index < 5) { value /= 1024.0; unit_index++; } if (unit_index == 0) result = psprintf("%ld %s", bytes, units[unit_index]); else result = psprintf("%.2f %s", value, units[unit_index]); PG_RETURN_TEXT_P(cstring_to_text(result)); } /* 解析时间间隔 */ PG_FUNCTION_INFO_V1(fgedu_parse_interval); Datum fgedu_parse_interval(PG_FUNCTION_ARGS) { text *interval_text; char *interval_str; int64 seconds = 0; int value; char unit[10]; if (PG_ARGISNULL(0)) PG_RETURN_NULL(); interval_text = PG_GETARG_TEXT_PP(0); interval_str = text_to_cstring(interval_text); if (sscanf(interval_str, "%d%9s", &value, unit) == 2) { if (strcmp(unit, "s") == 0 || strcmp(unit, "sec") == 0 || strcmp(unit, "second") == 0) seconds = value; else if (strcmp(unit, "m") == 0 || strcmp(unit, "min") == 0 || strcmp(unit, "minute") == 0) seconds = value * 60; else if (strcmp(unit, "h") == 0 || strcmp(unit, "hour") == 0) seconds = value * 3600; else if (strcmp(unit, "d") == 0 || strcmp(unit, "day") == 0) seconds = value * 86400; else if (strcmp(unit, "w") == 0 || strcmp(unit, "week") == 0) seconds = value * 604800; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid interval unit: %s", unit))); } else { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid interval format: %s", interval_str))); } pfree(interval_str); PG_RETURN_INT64(seconds); } -- SQL定义 CREATE FUNCTION fgedu_format_bytes(bigint) RETURNS text AS 'fgedu_util', 'fgedu_format_bytes' LANGUAGE C IMMUTABLE STRICT; CREATE FUNCTION fgedu_parse_interval(text) RETURNS bigint AS 'fgedu_util', 'fgedu_parse_interval' LANGUAGE C IMMUTABLE STRICT; -- 测试 SELECT fgedu_format_bytes(1234567890); SELECT fgedu_parse_interval('2h30m'); -- 输出结果 fgedu_format_bytes -------------------- 1.15 GB (1 row) fgedu_parse_interval ---------------------- 9000 (1 row)

Part05-风哥经验总结与分享

5.1 PostgreSQL数据库系统函数开发最佳实践

系统函数开发最佳实践:遵循PostgreSQL编码规范;正确管理内存和SPI连接;使用PG_TRY/PG_CATCH处理错误;编写完整的测试用例;提供详细的文档说明。

C函数开发检查清单:

  • 包含必要的头文件
  • 声明PG_MODULE_MAGIC
  • 使用PG_FUNCTION_INFO_V1声明函数
  • 检查NULL参数
  • 正确使用PG_RETURN_*宏
  • 管理内存分配和释放
  • 处理错误和异常

5.2 PostgreSQL数据库函数调试技巧

调试技巧:使用elog输出调试信息;使用gdb调试C代码;检查PostgreSQL日志;使用ASSERT验证假设;编写单元测试。

5.3 PostgreSQL数据库函数开发常见问题

常见问题:内存泄漏、段错误、SPI连接错误、NULL值处理错误、类型转换错误。

风哥提示:C函数开发需要深入理解PostgreSQL内部机制。建议在开发前充分学习SPI接口和内存管理,遵循PostgreSQL编码规范,并进行充分的测试。生产环境中要注意错误处理和资源管理,避免内存泄漏和系统崩溃。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息