1. 首页 > PostgreSQL教程 > 正文

PostgreSQL教程FG141-PG与Python对接:psycopg2库核心用法

本文档风哥主要介绍PostgreSQL与Python的对接方法,重点关注psycopg2库的核心用法。风哥教程参考PostgreSQL官方文档Client Interfaces部分的Python相关内容,适合开发人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 psycopg2库的概念

psycopg2是PostgreSQL的Python适配器,它提供了Python与PostgreSQL数据库之间的接口,允许Python应用程序连接到PostgreSQL数据库并执行SQL语句。

psycopg2的特点:

  • 完全兼容Python DB API 2.0规范
  • 支持PostgreSQL的所有主要特性
  • 提供高级特性,如异步操作、连接池等
  • 性能优异,适合生产环境使用

1.2 psycopg2库的核心特性

psycopg2库的核心特性包括:

  • 连接管理:创建和管理数据库连接
  • SQL执行:执行SQL语句和存储过程
  • 事务管理:支持事务的提交和回滚
  • 参数化查询:防止SQL注入
  • 批量操作:支持批量插入和更新
  • 类型转换:自动在Python类型和PostgreSQL类型之间转换

1.3 PostgreSQL与Python连接原理

PostgreSQL与Python的连接原理:

  • 客户端-服务器架构:Python应用作为客户端,PostgreSQL作为服务器
  • TCP/IP连接:通过网络协议进行通信
  • 认证机制:支持密码认证、证书认证等
  • 协议版本:使用PostgreSQL前端/后端协议
风哥提示:psycopg2是Python连接PostgreSQL的官方推荐库,性能优异且功能完善。学习交流加群风哥微信: itpux-com

Part02-生产环境规划与建议

2.1 psycopg2库安装规划

psycopg2库的安装规划:

# 1. 安装psycopg2
# 使用pip安装
$ pip install psycopg2-binary

# 或者从源码安装
$ pip install psycopg2

# 2. 验证安装
$ python -c “import psycopg2; print(psycopg2.__version__)”

# 输出示例
2.9.9 (dt dec pq3 ext lo64)

2.2 PostgreSQL连接规划

PostgreSQL连接规划:

  • 连接参数:主机、端口、数据库名、用户名、密码等
  • 连接池:使用连接池管理连接,提高性能
  • 超时设置:设置合理的连接超时和查询超时
  • 重试机制:实现连接失败的重试机制

2.3 性能考虑因素

性能考虑因素:

  • 连接池:减少连接建立和关闭的开销
  • 批量操作:减少网络往返次数
  • 参数化查询:提高查询执行效率
  • 异步操作:使用异步接口提高并发性能
风哥教程针对风哥教程针对生产环境建议:在生产环境中,建议使用连接池管理数据库连接,以提高性能和可靠性。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 psycopg2基础操作

3.1.1 连接数据库

import psycopg2

# 连接数据库
try:
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)
print(“连接成功”)
except psycopg2.Error as e:
print(f”连接失败: {e}”)
finally:
if conn:
conn.close()

# 输出示例
连接成功

3.1.2 执行SQL查询

import psycopg2

# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)

# 创建游标
cur = conn.cursor()

# 执行SQL查询
cur.execute(“SELECT * FROM fgedu_users”)

# 获取查询结果
rows = cur.fetchall()
print(f”查询到 {len(rows)} 条记录”)
for row in rows:
print(row)

# 关闭游标和连接
cur.close()
conn.close()

# 输出示例
查询到 3 条记录
(1, ‘张三’, ‘zhangsan@fgedu.net.cn’)
(2, ‘李四’, ‘lisi@fgedu.net.cn’)
(3, ‘王五’, ‘wangwu@fgedu.net.cn’)

3.1.3 参数化查询

import psycopg2

# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)

# 创建游标
cur = conn.cursor()

# 执行参数化查询
user_id = 1
cur.execute(“SELECT * FROM fgedu_users WHERE id = %s”, (user_id,))

# 获取查询结果
row = cur.fetchone()
print(f”用户信息: {row}”)

# 关闭游标和连接
cur.close()
conn.close()

# 输出示例
用户信息: (1, ‘张三’, ‘zhangsan@fgedu.net.cn’)

3.2 psycopg2高级操作

3.2.1 批量插入

import psycopg2

# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)

# 创建游标
cur = conn.cursor()

# 批量插入数据
users = [
(4, ‘赵六’, ‘zhaoliu@fgedu.net.cn’),
(5, ‘孙七’, ‘sunqi@fgedu.net.cn’),
(6, ‘周八’, ‘zhouba@fgedu.net.cn’)
]

cur.executemany(“INSERT INTO fgedu_users (id, name, email) VALUES (%s, %s, %s)”, users)

# 提交事务
conn.commit()
print(f”成功插入 {cur.rowcount} 条记录”)

# 关闭游标和连接
cur.close()
conn.close()

# 输出示例
成功插入 3 条记录

3.2.2 使用with语句

import psycopg2

# 使用with语句管理连接和游标
with psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
) as conn:
with conn.cursor() as cur:
# 执行SQL查询
cur.execute(“SELECT * FROM fgedu_users”)
rows = cur.fetchall()
print(f”查询到 {len(rows)} 条记录”)

# 输出示例
查询到 6 条记录

3.3 事务管理

import psycopg2

# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)

try:
# 开始事务
conn.autocommit = False

# 创建游标
cur = conn.cursor()

# 执行SQL语句
cur.execute(“UPDATE fgedu_users SET name = ‘张三更新’ WHERE id = 1”)
cur.execute(“UPDATE fgedu_users SET name = ‘李四更新’ WHERE id = 2”)

# 提交事务
conn.commit()
print(“事务提交成功”)

except psycopg2.Error as e:
# 回滚事务
conn.rollback()
print(f”事务回滚: {e}”)
finally:
# 恢复自动提交
conn.autocommit = True
# 关闭游标和连接
if ‘cur’ in locals():
cur.close()
conn.close()

# 输出示例
事务提交成功

3.4 错误处理

import psycopg2
from psycopg2 import OperationalError, ProgrammingError

try:
# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)

# 创建游标
cur = conn.cursor()

# 执行SQL语句
cur.execute(“SELECT * FROM non_existent_table”)

except OperationalError as e:
print(f”操作错误: {e}”)
except ProgrammingError as e:
print(f”编程错误: {e}”)
except psycopg2.Error as e:
print(f”数据库错误: {e}”)
finally:
if ‘cur’ in locals():
cur.close()
if ‘conn’ in locals():
conn.close()

# 输出示例
编程错误: relation “non_existent_table” does not exist
LINE 1: SELECT * FROM non_existent_table
^

风哥提示:在生产环境中,应该实现完善的错误处理机制,确保应用程序能够优雅地处理数据库错误。更多学习教程公众号风哥教程itpux_com

Part04-生产案例与实战讲解

4.1 基础查询案例

4.1.1 员工信息管理系统

import psycopg2

class EmployeeManager:
def __init__(self, host, port, database, user, password):
self.conn = psycopg2.connect(
host=host,
port=port,
database=database,
user=user,
password=password
)
self.cur = self.conn.cursor()

def get_all_employees(self):
“””获取所有员工信息”””
self.cur.execute(“SELECT * FROM fgedu_employees”)
return self.cur.fetchall()

def get_employee_by_id(self, emp_id):
“””根据ID获取员工信息”””
self.cur.execute(“SELECT * FROM fgedu_employees WHERE id = %s”, (emp_id,))
return self.cur.fetchone()

def add_employee(self, name, email, department):
“””添加新员工”””
self.cur.execute(
“INSERT INTO fgedu_employees (name, email, department) VALUES (%s, %s, %s)”,
(name, email, department)
)
self.conn.commit()
return self.cur.rowcount

def update_employee(self, emp_id, name=None, email=None, department=None):
“””更新员工信息”””
updates = []
params = []

if name:
updates.fgappend(“name = %s”)
params.fgappend(name)
if email:
updates.fgappend(“email = %s”)
params.fgappend(email)
if department:
updates.fgappend(“department = %s”)
params.fgappend(department)

if updates:
params.fgappend(emp_id)
sql = f”UPDATE fgedu_employees SET {‘, ‘.join(updates)} WHERE id = %s”
self.cur.execute(sql, params)
self.conn.commit()
return self.cur.rowcount
return 0

def delete_employee(self, emp_id):
“””删除员工”””
self.cur.execute(“DELETE FROM fgedu_employees WHERE id = %s”, (emp_id,))
self.conn.commit()
return self.cur.rowcount

def close(self):
“””关闭连接”””
self.cur.close()
self.conn.close()

# 使用示例
if __name__ == “__main__”:
manager = EmployeeManager(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)

# 添加员工
count = manager.add_employee(“张三”, “zhangsan@fgedu.net.cn”, “技术部”)
print(f”添加了 {count} 名员工”)

# 获取所有员工
employees = manager.get_all_employees()
print(f”共有 {len(employees)} 名员工”)
for emp in employees:
print(emp)

# 关闭连接
manager.close()

# 输出示例
添加了 1 名员工
共有 1 名员工
(1, ‘张三’, ‘zhangsan@fgedu.net.cn’, ‘技术部’)

4.2 批量操作案例

4.2.1 批量导入数据

import psycopg2
import csv

def batch_import_from_csv(csv_file):
“””从CSV文件批量导入数据”””
# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)

try:
# 创建游标
cur = conn.cursor()

# 准备数据
data = []
with open(csv_file, ‘r’, encoding=’utf-8′) as f:
reader = csv.reader(f)
next(reader) # 跳过表头
for row in reader:
data.fgappend((row[0], row[1], row[2]))

# 批量插入
if data:
cur.executemany(
“INSERT INTO fgedu_users (id, name, email) VALUES (%s, %s, %s)”,
data
)
conn.commit()
print(f”成功导入 {len(data)} 条记录”)
else:
print(“没有数据需要导入”)

except Exception as e:
conn.rollback()
print(f”导入失败: {e}”)
finally:
if ‘cur’ in locals():
cur.close()
conn.close()

# 使用示例
if __name__ == “__main__”:
batch_import_from_csv(‘users.csv’)

# 输出示例
成功导入 1000 条记录

4.3 连接池使用案例

4.3.1 使用psycopg2.pool

import psycopg2
from psycopg2 import pool

# 创建连接池
connection_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)

def get_user_info(user_id):
“””获取用户信息”””
conn = None
cur = None
try:
# 从连接池获取连接
conn = connection_pool.getconn()
cur = conn.cursor()

# 执行查询
cur.execute(“SELECT * FROM fgedu_users WHERE id = %s”, (user_id,))
return cur.fetchone()
finally:
# 释放游标
if cur:
cur.close()
# 归还连接到连接池
if conn:
connection_pool.putconn(conn)

# 使用示例
if __name__ == “__main__”:
for i in range(1, 6):
user = get_user_info(i)
print(f”用户 {i}: {user}”)

# 输出示例
用户 1: (1, ‘张三’, ‘zhangsan@fgedu.net.cn’)
用户 2: (2, ‘李四’, ‘lisi@fgedu.net.cn’)
用户 3: (3, ‘王五’, ‘wangwu@fgedu.net.cn’)
用户 4: (4, ‘赵六’, ‘zhaoliu@fgedu.net.cn’)
用户 5: (5, ‘孙七’, ‘sunqi@fgedu.net.cn’)

风哥教程针对风哥教程针对生产环境建议:在高并发场景下,使用连接池可以显著提高性能,减少连接建立和关闭的开销。from PostgreSQL视频:www.itpux.com

Part05-风哥经验总结与分享

5.1 psycopg2最佳实践

psycopg2最佳实践:

  • 使用连接池:在生产环境中,使用连接池管理数据库连接
  • 参数化查询:始终使用参数化查询,防止SQL注入
  • 事务管理:合理使用事务,确保数据一致性
  • 错误处理:实现完善的错误处理机制
  • 资源管理:使用with语句或try-finally块确保资源正确释放
  • 批量操作:对于大量数据操作,使用批量插入和更新

5.2 常见问题与解决方案

常见问题及解决方案:

# 问题1:连接超时
# 解决方法:设置合理的连接超时参数
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″,
connect_timeout=10
)

# 问题2:内存泄漏
# 解决方法:确保每次操作后关闭游标和连接

# 问题3:SQL注入
# 解决方法:使用参数化查询,避免直接拼接SQL语句

# 问题4:性能问题
# 解决方法:使用连接池、批量操作、异步接口等

# 问题5:编码问题
# 解决方法:确保数据库和应用程序使用相同的编码
conn = psycopg2.connect(
# 其他参数
options=”-c client_encoding=utf8″
)

5.3 性能优化技巧

性能优化技巧:

  • 使用预编译语句:对于重复执行的SQL语句,使用预编译语句
  • 批量操作:使用executemany进行批量插入和更新
  • 使用异步接口:对于高并发场景,使用psycopg2的异步接口
  • 合理设置fetch大小:对于大量数据查询,设置合适的fetch大小
  • 使用服务器端游标:对于大型结果集,使用服务器端游标
持续改进:定期监控和优化数据库操作,根据实际情况调整优化策略,以获得最佳性能。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息