PostgreSQL教程FG129-libpq库性能优化:连接池/批量操作
本文档风哥主要介绍PostgreSQL的libpq库性能优化,包括连接池实现、批量操作等内容,风哥教程参考PostgreSQL官方文档libpq内容,适合开发人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。
Part01-基础概念与理论知识
1.1 libpq性能优化概述
libpq是PostgreSQL的C语言客户端库,应用程序通过libpq与PostgreSQL服务器通信。在高并发场景下,频繁创建和关闭数据库连接会带来很大的性能开销。更多视频教程www.fgedu.net.cn
- 连接池:复用数据库连接,减少连接建立开销
- 批量操作:减少网络往返次数,提高数据操作效率
- 预处理语句:提高查询执行效率
- 异步操作:提高并发处理能力
- 连接参数优化:调整连接超时、缓冲区大小等参数
1.2 连接池原理
连接池是一种创建和管理数据库连接的技术,通过维护一组可复用的连接,减少频繁创建和关闭连接的开销。
// 1. 初始化阶段:创建一定数量的连接
// 2. 使用阶段:从池中获取连接,使用完毕后归还
// 3. 维护阶段:定期检查连接状态,清理无效连接
// 4. 扩展阶段:根据负载动态调整连接数量
// 连接池核心参数
– 最小连接数:池中最少保持的连接数
– 最大连接数:池中最多允许的连接数
– 连接超时:连接的最大空闲时间
– 连接验证:验证连接是否有效的间隔时间
1.3 批量操作原理
批量操作是将多个数据操作合并为一个请求发送给数据库,减少网络往返次数,提高数据操作效率。
Part02-生产环境规划与建议
2.1 连接池设计
连接池的设计要点:
// 1. 连接池大小
– 最小连接数:根据平均负载设置,通常为5-10
– 最大连接数:根据峰值负载和数据库连接限制设置,通常为50-100
– 连接增长策略:按需增长,避免瞬间创建大量连接
// 2. 连接管理
– 连接验证:定期检查连接是否有效
– 连接回收:关闭长时间未使用的连接
– 连接重试:连接失败时自动重试
// 3. 线程安全
– 使用互斥锁保护连接池状态
– 支持多线程并发访问
– 避免死锁和竞争条件
// 4. 监控和统计
– 记录连接池使用情况
– 监控连接创建和关闭次数
– 统计连接等待时间
2.2 批量操作设计
批量操作的设计要点:
// 1. 批量大小
– 插入操作:每批1000-10000条记录
– 更新操作:每批500-5000条记录
– 删除操作:每批500-5000条记录
// 2. 事务管理
– 每批操作使用一个事务
– 批量操作失败时回滚整个批次
– 支持批量操作的部分成功处理
// 3. 错误处理
– 记录批量操作中的错误
– 支持失败重试机制
– 提供详细的错误信息
// 4. 内存管理
– 控制批量操作的内存使用
– 及时释放批量数据占用的内存
– 避免内存泄漏
2.3 性能优化最佳实践
libpq性能优化的最佳实践:
- 使用连接池:避免频繁创建和关闭连接
- 使用批量操作:减少网络往返次数
- 使用预处理语句:提高查询执行效率
- 调整连接参数:优化连接超时、缓冲区大小等
- 使用异步操作:提高并发处理能力
- 优化SQL语句:使用索引,避免全表扫描
- 监控性能指标:及时发现和解决性能问题
Part03-生产环境项目实施方案
3.1 连接池实现
3.1.1 连接池结构定义
// 文件:connection_pool.c
// from:www.itpux.com.qq113257174.wx:itpux-com
// web: http://www.fgedu.net.cn
#include #define MIN_CONNECTIONS 5 typedef struct { typedef struct { // 创建连接池 // 初始化最小连接数 printf(“连接池创建成功,初始连接数: %d\n”, pool->count); // 从连接池获取连接 // 查找可用连接 // 如果没有可用连接且未达到最大连接数,创建新连接 // 等待可用连接 // 归还连接到连接池 for (int i = 0; i < pool->count; i++) { pthread_mutex_unlock(&pool->lock); // 销毁连接池 for (int i = 0; i < pool->count; i++) { pthread_mutex_unlock(&pool->lock); printf(“连接池已销毁\n”);
#include
#include
#include
#include
#define MAX_CONNECTIONS 50
#define CONNECTION_TIMEOUT 300
#define VALIDATION_INTERVAL 60
PGconn *conn;
int in_use;
time_t last_used;
} PooledConnection;
PooledConnection *connections[MAX_CONNECTIONS];
int count;
int min_connections;
int max_connections;
char conninfo[256];
pthread_mutex_t lock;
pthread_cond_t cond;
} ConnectionPool;
ConnectionPool *create_pool(const char *conninfo, int min_conn, int max_conn) {
ConnectionPool *pool = (ConnectionPool *)malloc(sizeof(ConnectionPool));
pool->count = 0;
pool->min_connections = min_conn;
pool->max_connections = max_conn;
strcpy(pool->conninfo, conninfo);
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->cond, NULL);
for (int i = 0; i < min_conn && i < max_conn; i++) {
PGconn *conn = PQconnectdb(conninfo);
if (PQstatus(conn) == CONNECTION_OK) {
pool->connections[i] = (PooledConnection *)malloc(sizeof(PooledConnection));
pool->connections[i]->conn = conn;
pool->connections[i]->in_use = 0;
pool->connections[i]->last_used = time(NULL);
pool->count++;
} else {
fprintf(stderr, “连接初始化失败: %s\n”, PQerrorMessage(conn));
PQfinish(conn);
}
}
return pool;
}
PGconn *get_connection(ConnectionPool *pool) {
pthread_mutex_lock(&pool->lock);
for (int i = 0; i < pool->count; i++) {
if (!pool->connections[i]->in_use) {
// 验证连接是否有效
if (PQstatus(pool->connections[i]->conn) == CONNECTION_OK) {
pool->connections[i]->in_use = 1;
pool->connections[i]->last_used = time(NULL);
pthread_mutex_unlock(&pool->lock);
return pool->connections[i]->conn;
} else {
// 连接无效,重新创建
PQfinish(pool->connections[i]->conn);
pool->connections[i]->conn = PQconnectdb(pool->conninfo);
if (PQstatus(pool->connections[i]->conn) == CONNECTION_OK) {
pool->connections[i]->in_use = 1;
pool->connections[i]->last_used = time(NULL);
pthread_mutex_unlock(&pool->lock);
return pool->connections[i]->conn;
}
}
}
}
if (pool->count < pool->max_connections) {
PGconn *conn = PQconnectdb(pool->conninfo);
if (PQstatus(conn) == CONNECTION_OK) {
pool->connections[pool->count] = (PooledConnection *)malloc(sizeof(PooledConnection));
pool->connections[pool->count]->conn = conn;
pool->connections[pool->count]->in_use = 1;
pool->connections[pool->count]->last_used = time(NULL);
pool->count++;
pthread_mutex_unlock(&pool->lock);
return conn;
}
}
pthread_cond_wait(&pool->cond, &pool->lock);
pthread_mutex_unlock(&pool->lock);
return get_connection(pool);
}
void release_connection(ConnectionPool *pool, PGconn *conn) {
pthread_mutex_lock(&pool->lock);
if (pool->connections[i]->conn == conn) {
pool->connections[i]->in_use = 0;
pool->connections[i]->last_used = time(NULL);
pthread_cond_signal(&pool->cond);
break;
}
}
}
void destroy_pool(ConnectionPool *pool) {
pthread_mutex_lock(&pool->lock);
PQfinish(pool->connections[i]->conn);
free(pool->connections[i]);
}
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->cond);
free(pool);
}
3.1.2 连接池使用示例
// 文件:pool_example.c
// from:www.itpux.com.qq113257174.wx:itpux-com
// web: http://www.fgedu.net.cn
#include
#include
#include
#define NUM_THREADS 10
#define QUERIES_PER_THREAD 100
void *worker(void *arg) {
ConnectionPool *pool = (ConnectionPool *)arg;
for (int i = 0; i < QUERIES_PER_THREAD; i++) { PGconn *conn = get_connection(pool); PGresult *res = PQexec(conn, "SELECT count(*) FROM fgedu_employees"); if (PQresultStatus(res) == PGRES_TUPLES_OK) { int count = atoi(PQgetvalue(res, 0, 0)); printf("线程 %lu 查询 %d: 员工数量 = %d\n", pthread_self(), i, count); } PQclear(res); release_connection(pool, conn); } return NULL; } int main() { const char *conninfo = "fgedu.net.cn=localfgedu.net.cn port=5432 fgedudb=fgedudb fgedu=pgsql password=postgres_password"; // 创建连接池 ConnectionPool *pool = create_pool(conninfo, MIN_CONNECTIONS, MAX_CONNECTIONS); // 创建线程 pthread_t threads[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; i++) { pthread_create(&threads[i], NULL, worker, pool); } // 等待线程完成 for (int i = 0; i < NUM_THREADS; i++) { pthread_join(threads[i], NULL); } // 销毁连接池 destroy_pool(pool); return 0; } // 编译步骤 $ gcc -o pool_example pool_example.c -lpq -lpthread // 运行结果 $ ./pool_example 连接池创建成功,初始连接数: 5 线程 140737354237696 查询 0: 员工数量 = 100 线程 140737345845248 查询 0: 员工数量 = 100 ... 线程 140737354237696 查询 99: 员工数量 = 100 连接池已销毁
3.2 批量操作实现
3.2.1 批量插入实现
// 文件:batch_insert.c
// from:www.itpux.com.qq113257174.wx:itpux-com
// web: http://www.fgedu.net.cn
#include #define BATCH_SIZE 1000 // 批量插入员工数据 while (inserted < count) {
int batch_count = (count - inserted) > BATCH_SIZE ? BATCH_SIZE : (count – inserted); strcpy(query, “INSERT INTO fgedu_employees (name, age, department) VALUES “); for (int i = 0; i < batch_count; i++) {
char value[200];
sprintf(value, "('员工%d', %d, '部门%d')",
start_id + inserted + i,
20 + (i % 40),
(i % 10) + 1);
strcat(query, value);
if (i < batch_count - 1) {
strcat(query, ", ");
}
}
PGresult *res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "批量插入失败: %s\n", PQerrorMessage(conn));
PQclear(res);
free(query);
return inserted;
}
inserted += batch_count;
printf("已插入 %d 条记录\n", inserted);
PQclear(res);
}
free(query);
return inserted;
}
// 使用COPY进行批量插入
int copy_insert_employees(PGconn *conn, int start_id, int count) {
PGresult *res = PQexec(conn, "COPY fgedu_employees (name, age, department) FROM STDIN");
if (PQresultStatus(res) != PGRES_COPY_IN) {
fprintf(stderr, "COPY命令失败: %s\n", PQerrorMessage(conn));
PQclear(res);
return 0;
}
PQclear(res);
for (int i = 0; i < count; i++) {
char line[200];
sprintf(line, "员工%d\t%d\t部门%d\n",
start_id + i,
20 + (i % 40),
(i % 10) + 1);
if (PQputCopyData(conn, line, strlen(line)) != 1) {
fprintf(stderr, "COPY数据失败: %s\n", PQerrorMessage(conn));
return i;
}
if ((i + 1) % 1000 == 0) {
printf("已发送 %d 条记录\n", i + 1);
}
}
if (PQputCopyEnd(conn, NULL) != 1) {
fprintf(stderr, "COPY结束失败: %s\n", PQerrorMessage(conn));
return 0;
}
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "COPY执行失败: %s\n", PQerrorMessage(conn));
PQclear(res);
return 0;
}
PQclear(res);
printf("COPY完成,共插入 %d 条记录\n", count);
return count;
}
int main() {
const char *conninfo = "fgedu.net.cn=localfgedu.net.cn port=5432 fgedudb=fgedudb fgedu=pgsql password=postgres_password";
PGconn *conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "连接失败: %s\n", PQerrorMessage(conn));
PQfinish(conn);
return 1;
}
// 创建表
PGresult *res = PQexec(conn, "CREATE TABLE fgedu_IF NOT EXISTS fgedu_employees ("
"id SERIAL PRIMARY KEY,"
"name VARCHAR(100) NOT NULL,"
"age INTEGER NOT NULL,"
"department VARCHAR(100) NOT NULL)");
PQclear(res);
printf("开始批量插入测试...\n");
// 方法1:使用INSERT批量插入
clock_t start = clock();
int inserted1 = batch_insert_employees(conn, 1, 10000);
clock_t end = clock();
double time1 = ((double)(end - start)) / CLOCKS_PER_SEC;
printf("INSERT批量插入 %d 条记录,耗时 %.2f 秒\n", inserted1, time1);
// 清空表
res = PQexec(conn, "TRUNCATE TABLE fgedu_employees");
PQclear(res);
// 方法2:使用COPY批量插入
start = clock();
int inserted2 = copy_insert_employees(conn, 1, 10000);
end = clock();
double time2 = ((double)(end - start)) / CLOCKS_PER_SEC;
printf("COPY批量插入 %d 条记录,耗时 %.2f 秒\n", inserted2, time2);
printf("COPY比INSERT快 %.2f 倍\n", time1 / time2);
PQfinish(conn);
return 0;
}
// 编译步骤
$ gcc -o batch_insert batch_insert.c -lpq
// 运行结果
$ ./batch_insert
开始批量插入测试...
已插入 1000 条记录
已插入 2000 条记录
...
已插入 10000 条记录
INSERT批量插入 10000 条记录,耗时 5.23 秒
已发送 1000 条记录
已发送 2000 条记录
...
已发送 10000 条记录
COPY完成,共插入 10000 条记录
COPY批量插入 10000 条记录,耗时 0.89 秒
COPY比INSERT快 5.88 倍
#include
#include
#include
int batch_insert_employees(PGconn *conn, int start_id, int count) {
char *query = (char *)malloc(BATCH_SIZE * 200);
int inserted = 0;
3.3 性能测试
// 文件:performance_test.c
// from:www.itpux.com.qq113257174.wx:itpux-com
// web: http://www.fgedu.net.cn
#include // 测试单条插入性能 for (int i = 0; i < count; i++) {
char query[200];
sprintf(query, "INSERT INTO fgedu_employees (name, age, department) VALUES ('员工%d', %d, '部门%d')",
i, 20 + (i % 40), (i % 10) + 1);
PGresult *res = PQexec(conn, query);
PQclear(res);
}
clock_t end = clock();
double time = ((double)(end - start)) / CLOCKS_PER_SEC;
printf("单条插入 %d 条记录,耗时 %.2f 秒,TPS: %.2f\n", count, time, count / time);
}
// 测试事务批量插入性能
void test_transaction_batch(PGconn *conn, int count, int batch_size) {
clock_t start = clock();
PGresult *res = PQexec(conn, "BEGIN");
PQclear(res);
int inserted = 0;
while (inserted < count) {
int current_batch = (count - inserted) > batch_size ? batch_size : (count – inserted); char *query = (char *)malloc(current_batch * 200); for (int i = 0; i < current_batch; i++) {
char value[200];
sprintf(value, "('员工%d', %d, '部门%d')",
inserted + i, 20 + ((inserted + i) % 40), ((inserted + i) % 10) + 1);
strcat(query, value);
if (i < current_batch - 1) {
strcat(query, ", ");
}
}
res = PQexec(conn, query);
PQclear(res);
free(query);
inserted += current_batch;
}
res = PQexec(conn, "COMMIT");
PQclear(res);
clock_t end = clock();
double time = ((double)(end - start)) / CLOCKS_PER_SEC;
printf("事务批量插入 %d 条记录(批次大小 %d),耗时 %.2f 秒,TPS: %.2f\n",
count, batch_size, time, count / time);
}
int main() {
const char *conninfo = "fgedu.net.cn=localfgedu.net.cn port=5432 fgedudb=fgedudb fgedu=pgsql password=postgres_password";
PGconn *conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "连接失败: %s\n", PQerrorMessage(conn));
PQfinish(conn);
return 1;
}
// 创建表
PGresult *res = PQexec(conn, "CREATE TABLE fgedu_IF NOT EXISTS fgedu_employees ("
"id SERIAL PRIMARY KEY,"
"name VARCHAR(100) NOT NULL,"
"age INTEGER NOT NULL,"
"department VARCHAR(100) NOT NULL)");
PQclear(res);
printf("=== 性能测试开始 ===\n\n");
// 测试1:单条插入
printf("测试1:单条插入(1000条)\n");
test_single_insert(conn, 1000);
printf("\n");
// 清空表
res = PQexec(conn, "TRUNCATE TABLE fgedu_employees");
PQclear(res);
// 测试2:事务批量插入(批次大小100)
printf("测试2:事务批量插入(10000条,批次大小100)\n");
test_transaction_batch(conn, 10000, 100);
printf("\n");
// 清空表
res = PQexec(conn, "TRUNCATE TABLE fgedu_employees");
PQclear(res);
// 测试3:事务批量插入(批次大小1000)
printf("测试3:事务批量插入(10000条,批次大小1000)\n");
test_transaction_batch(conn, 10000, 1000);
printf("\n");
printf("=== 性能测试完成 ===\n");
PQfinish(conn);
return 0;
}
// 编译步骤
$ gcc -o performance_test performance_test.c -lpq
// 运行结果
$ ./performance_test
=== 性能测试开始 ===
测试1:单条插入(1000条)
单条插入 1000 条记录,耗时 12.45 秒,TPS: 80.32
测试2:事务批量插入(10000条,批次大小100)
事务批量插入 10000 条记录(批次大小 100),耗时 3.21 秒,TPS: 3115.26
测试3:事务批量插入(10000条,批次大小1000)
事务批量插入 10000 条记录(批次大小 1000),耗时 1.87 秒,TPS: 5347.59
=== 性能测试完成 ===
#include
#include
#include
void test_single_insert(PGconn *conn, int count) {
clock_t start = clock();
strcpy(query, “INSERT INTO fgedu_employees (name, age, department) VALUES “);
Part04-生产案例与实战讲解
4.1 性能问题
在使用libpq时,可能会遇到以下性能问题:
from oracle:www.itpux.com
4.1.1 连接问题
# 症状:每次操作都创建新连接,导致性能低下
# 原因:频繁创建和关闭数据库连接
# 解决方案:使用连接池
# 性能问题2:连接数过多
# 症状:数据库连接数达到上限,新的连接请求被拒绝
# 原因:没有限制连接数
# 解决方案:使用连接池,设置最大连接数
# 性能问题3:连接泄漏
# 症状:连接数持续增长,最终耗尽资源
# 原因:使用完连接后没有正确关闭
# 解决方案:确保连接正确归还到连接池
4.1.2 查询问题
# 症状:大量数据插入时速度很慢
# 原因:每条INSERT都作为一个独立事务
# 解决方案:使用批量插入和事务
# 性能问题5:查询速度慢
# 症状:查询执行时间过长
# 原因:缺少索引,或SQL语句未优化
# 解决方案:创建索引,优化SQL语句
# 性能问题6:网络延迟
# 症状:每次操作都有明显的网络延迟
# 原因:网络往返次数过多
# 解决方案:使用批量操作,减少网络往返
4.2 优化方案
# 1. 连接池优化
– 设置合理的连接池大小
– 实现连接验证和回收机制
– 使用线程安全的连接池实现
– 监控连接池使用情况
# 2. 批量操作优化
– 使用批量INSERT代替单条INSERT
– 使用COPY命令进行大数据量导入
– 合理设置批量大小
– 使用事务包裹批量操作
# 3. 查询优化
– 使用预处理语句
– 使用参数化查询
– 优化SQL语句
– 创建适当的索引
# 4. 连接参数优化
– 调整连接超时参数
– 设置合适的缓冲区大小
– 启用TCP keepalive
– 使用SSL连接(如果需要)
# 5. 异步操作优化
– 使用异步查询
– 使用管道模式
– 实现并发处理
– 减少阻塞等待
4.3 性能优化实战
# 1. 问题描述
# – 订单高峰期系统响应慢
# – 数据库连接数经常达到上限
# – 订单插入操作耗时过长
# 2. 优化前性能
# – 连接建立时间:50ms
# – 单条订单插入:30ms
# – 峰值TPS:100
# 3. 优化方案
## 3.1 实现连接池
# – 最小连接数:10
# – 最大连接数:50
# – 连接超时:300秒
## 3.2 批量插入订单
# – 每批100条订单
# – 使用事务包裹
# – 使用COPY命令导入历史数据
## 3.3 查询优化
# – 创建订单表索引
# – 使用预处理语句
# – 优化查询条件
# 4. 优化后性能
# – 连接获取时间:1ms
# – 批量订单插入(100条):50ms
# – 峰值TPS:2000
# – 性能提升:20倍
# 5. 监控和维护
# – 监控连接池使用情况
# – 监控数据库性能指标
# – 定期优化SQL语句
# – 定期维护索引
Part05-风哥经验总结与分享
5.1 性能优化技巧
libpq性能优化技巧:
- 使用连接池:避免频繁创建和关闭连接
- 使用批量操作:减少网络往返次数
- 使用COPY命令:大数据量导入时使用COPY
- 使用事务:批量操作使用事务包裹
- 使用预处理语句:提高查询执行效率
- 优化SQL语句:使用索引,避免全表扫描
- 调整连接参数:优化连接超时、缓冲区大小
- 使用异步操作:提高并发处理能力
- 监控性能指标:及时发现和解决性能问题
- 定期维护:定期优化SQL语句和索引
5.2 性能监控
# 1. 连接池指标
– 当前连接数
– 活跃连接数
– 空闲连接数
– 连接等待时间
– 连接创建次数
– 连接关闭次数
# 2. 查询性能指标
– 查询执行时间
– 查询吞吐量(QPS/TPS)
– 慢查询数量
– 查询错误率
# 3. 数据库指标
– CPU使用率
– 内存使用率
– 磁盘I/O
– 网络流量
– 锁等待时间
# 4. 监控工具
– pg_stat_statements
– pg_stat_activity
– pg_stat_fgedudb
– 自定义监控脚本
– 第三方监控工具(Prometheus+Grafana)
5.3 资源与工具
libpq性能优化相关资源与工具:
- PostgreSQL官方文档:https://www.postgresql.org/docs/current/libpq.html
- pg_stat_statements:查询性能分析扩展
- pgBadger:PostgreSQL日志分析工具
- pgBench:PostgreSQL基准测试工具
- Prometheus+Grafana:监控和可视化工具
- 连接池库:libpqxx、PgBouncer等
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
