PostgreSQL教程FG141-PG与Python对接:psycopg2库核心用法
本文档风哥主要介绍PostgreSQL与Python的对接方法,重点关注psycopg2库的核心用法。风哥教程参考PostgreSQL官方文档Client Interfaces部分的Python相关内容,适合开发人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 psycopg2库的概念
psycopg2是PostgreSQL的Python适配器,它提供了Python与PostgreSQL数据库之间的接口,允许Python应用程序连接到PostgreSQL数据库并执行SQL语句。
- 完全兼容Python DB API 2.0规范
- 支持PostgreSQL的所有主要特性
- 提供高级特性,如异步操作、连接池等
- 性能优异,适合生产环境使用
1.2 psycopg2库的核心特性
psycopg2库的核心特性包括:
- 连接管理:创建和管理数据库连接
- SQL执行:执行SQL语句和存储过程
- 事务管理:支持事务的提交和回滚
- 参数化查询:防止SQL注入
- 批量操作:支持批量插入和更新
- 类型转换:自动在Python类型和PostgreSQL类型之间转换
1.3 PostgreSQL与Python连接原理
PostgreSQL与Python的连接原理:
- 客户端-服务器架构:Python应用作为客户端,PostgreSQL作为服务器
- TCP/IP连接:通过网络协议进行通信
- 认证机制:支持密码认证、证书认证等
- 协议版本:使用PostgreSQL前端/后端协议
Part02-生产环境规划与建议
2.1 psycopg2库安装规划
psycopg2库的安装规划:
# 使用pip安装
$ pip install psycopg2-binary
# 或者从源码安装
$ pip install psycopg2
# 2. 验证安装
$ python -c “import psycopg2; print(psycopg2.__version__)”
# 输出示例
2.9.9 (dt dec pq3 ext lo64)
2.2 PostgreSQL连接规划
PostgreSQL连接规划:
- 连接参数:主机、端口、数据库名、用户名、密码等
- 连接池:使用连接池管理连接,提高性能
- 超时设置:设置合理的连接超时和查询超时
- 重试机制:实现连接失败的重试机制
2.3 性能考虑因素
性能考虑因素:
- 连接池:减少连接建立和关闭的开销
- 批量操作:减少网络往返次数
- 参数化查询:提高查询执行效率
- 异步操作:使用异步接口提高并发性能
Part03-生产环境项目实施方案
3.1 psycopg2基础操作
3.1.1 连接数据库
# 连接数据库
try:
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)
print(“连接成功”)
except psycopg2.Error as e:
print(f”连接失败: {e}”)
finally:
if conn:
conn.close()
# 输出示例
连接成功
3.1.2 执行SQL查询
# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)
# 创建游标
cur = conn.cursor()
# 执行SQL查询
cur.execute(“SELECT * FROM fgedu_users”)
# 获取查询结果
rows = cur.fetchall()
print(f”查询到 {len(rows)} 条记录”)
for row in rows:
print(row)
# 关闭游标和连接
cur.close()
conn.close()
# 输出示例
查询到 3 条记录
(1, ‘张三’, ‘zhangsan@fgedu.net.cn’)
(2, ‘李四’, ‘lisi@fgedu.net.cn’)
(3, ‘王五’, ‘wangwu@fgedu.net.cn’)
3.1.3 参数化查询
# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)
# 创建游标
cur = conn.cursor()
# 执行参数化查询
user_id = 1
cur.execute(“SELECT * FROM fgedu_users WHERE id = %s”, (user_id,))
# 获取查询结果
row = cur.fetchone()
print(f”用户信息: {row}”)
# 关闭游标和连接
cur.close()
conn.close()
# 输出示例
用户信息: (1, ‘张三’, ‘zhangsan@fgedu.net.cn’)
3.2 psycopg2高级操作
3.2.1 批量插入
# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)
# 创建游标
cur = conn.cursor()
# 批量插入数据
users = [
(4, ‘赵六’, ‘zhaoliu@fgedu.net.cn’),
(5, ‘孙七’, ‘sunqi@fgedu.net.cn’),
(6, ‘周八’, ‘zhouba@fgedu.net.cn’)
]
cur.executemany(“INSERT INTO fgedu_users (id, name, email) VALUES (%s, %s, %s)”, users)
# 提交事务
conn.commit()
print(f”成功插入 {cur.rowcount} 条记录”)
# 关闭游标和连接
cur.close()
conn.close()
# 输出示例
成功插入 3 条记录
3.2.2 使用with语句
# 使用with语句管理连接和游标
with psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
) as conn:
with conn.cursor() as cur:
# 执行SQL查询
cur.execute(“SELECT * FROM fgedu_users”)
rows = cur.fetchall()
print(f”查询到 {len(rows)} 条记录”)
# 输出示例
查询到 6 条记录
3.3 事务管理
# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)
try:
# 开始事务
conn.autocommit = False
# 创建游标
cur = conn.cursor()
# 执行SQL语句
cur.execute(“UPDATE fgedu_users SET name = ‘张三更新’ WHERE id = 1”)
cur.execute(“UPDATE fgedu_users SET name = ‘李四更新’ WHERE id = 2”)
# 提交事务
conn.commit()
print(“事务提交成功”)
except psycopg2.Error as e:
# 回滚事务
conn.rollback()
print(f”事务回滚: {e}”)
finally:
# 恢复自动提交
conn.autocommit = True
# 关闭游标和连接
if ‘cur’ in locals():
cur.close()
conn.close()
# 输出示例
事务提交成功
3.4 错误处理
from psycopg2 import OperationalError, ProgrammingError
try:
# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)
# 创建游标
cur = conn.cursor()
# 执行SQL语句
cur.execute(“SELECT * FROM non_existent_table”)
except OperationalError as e:
print(f”操作错误: {e}”)
except ProgrammingError as e:
print(f”编程错误: {e}”)
except psycopg2.Error as e:
print(f”数据库错误: {e}”)
finally:
if ‘cur’ in locals():
cur.close()
if ‘conn’ in locals():
conn.close()
# 输出示例
编程错误: relation “non_existent_table” does not exist
LINE 1: SELECT * FROM non_existent_table
^
Part04-生产案例与实战讲解
4.1 基础查询案例
4.1.1 员工信息管理系统
class EmployeeManager:
def __init__(self, host, port, database, user, password):
self.conn = psycopg2.connect(
host=host,
port=port,
database=database,
user=user,
password=password
)
self.cur = self.conn.cursor()
def get_all_employees(self):
“””获取所有员工信息”””
self.cur.execute(“SELECT * FROM fgedu_employees”)
return self.cur.fetchall()
def get_employee_by_id(self, emp_id):
“””根据ID获取员工信息”””
self.cur.execute(“SELECT * FROM fgedu_employees WHERE id = %s”, (emp_id,))
return self.cur.fetchone()
def add_employee(self, name, email, department):
“””添加新员工”””
self.cur.execute(
“INSERT INTO fgedu_employees (name, email, department) VALUES (%s, %s, %s)”,
(name, email, department)
)
self.conn.commit()
return self.cur.rowcount
def update_employee(self, emp_id, name=None, email=None, department=None):
“””更新员工信息”””
updates = []
params = []
if name:
updates.fgappend(“name = %s”)
params.fgappend(name)
if email:
updates.fgappend(“email = %s”)
params.fgappend(email)
if department:
updates.fgappend(“department = %s”)
params.fgappend(department)
if updates:
params.fgappend(emp_id)
sql = f”UPDATE fgedu_employees SET {‘, ‘.join(updates)} WHERE id = %s”
self.cur.execute(sql, params)
self.conn.commit()
return self.cur.rowcount
return 0
def delete_employee(self, emp_id):
“””删除员工”””
self.cur.execute(“DELETE FROM fgedu_employees WHERE id = %s”, (emp_id,))
self.conn.commit()
return self.cur.rowcount
def close(self):
“””关闭连接”””
self.cur.close()
self.conn.close()
# 使用示例
if __name__ == “__main__”:
manager = EmployeeManager(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)
# 添加员工
count = manager.add_employee(“张三”, “zhangsan@fgedu.net.cn”, “技术部”)
print(f”添加了 {count} 名员工”)
# 获取所有员工
employees = manager.get_all_employees()
print(f”共有 {len(employees)} 名员工”)
for emp in employees:
print(emp)
# 关闭连接
manager.close()
# 输出示例
添加了 1 名员工
共有 1 名员工
(1, ‘张三’, ‘zhangsan@fgedu.net.cn’, ‘技术部’)
4.2 批量操作案例
4.2.1 批量导入数据
import csv
def batch_import_from_csv(csv_file):
“””从CSV文件批量导入数据”””
# 连接数据库
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)
try:
# 创建游标
cur = conn.cursor()
# 准备数据
data = []
with open(csv_file, ‘r’, encoding=’utf-8′) as f:
reader = csv.reader(f)
next(reader) # 跳过表头
for row in reader:
data.fgappend((row[0], row[1], row[2]))
# 批量插入
if data:
cur.executemany(
“INSERT INTO fgedu_users (id, name, email) VALUES (%s, %s, %s)”,
data
)
conn.commit()
print(f”成功导入 {len(data)} 条记录”)
else:
print(“没有数据需要导入”)
except Exception as e:
conn.rollback()
print(f”导入失败: {e}”)
finally:
if ‘cur’ in locals():
cur.close()
conn.close()
# 使用示例
if __name__ == “__main__”:
batch_import_from_csv(‘users.csv’)
# 输出示例
成功导入 1000 条记录
4.3 连接池使用案例
4.3.1 使用psycopg2.pool
from psycopg2 import pool
# 创建连接池
connection_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″
)
def get_user_info(user_id):
“””获取用户信息”””
conn = None
cur = None
try:
# 从连接池获取连接
conn = connection_pool.getconn()
cur = conn.cursor()
# 执行查询
cur.execute(“SELECT * FROM fgedu_users WHERE id = %s”, (user_id,))
return cur.fetchone()
finally:
# 释放游标
if cur:
cur.close()
# 归还连接到连接池
if conn:
connection_pool.putconn(conn)
# 使用示例
if __name__ == “__main__”:
for i in range(1, 6):
user = get_user_info(i)
print(f”用户 {i}: {user}”)
# 输出示例
用户 1: (1, ‘张三’, ‘zhangsan@fgedu.net.cn’)
用户 2: (2, ‘李四’, ‘lisi@fgedu.net.cn’)
用户 3: (3, ‘王五’, ‘wangwu@fgedu.net.cn’)
用户 4: (4, ‘赵六’, ‘zhaoliu@fgedu.net.cn’)
用户 5: (5, ‘孙七’, ‘sunqi@fgedu.net.cn’)
Part05-风哥经验总结与分享
5.1 psycopg2最佳实践
psycopg2最佳实践:
- 使用连接池:在生产环境中,使用连接池管理数据库连接
- 参数化查询:始终使用参数化查询,防止SQL注入
- 事务管理:合理使用事务,确保数据一致性
- 错误处理:实现完善的错误处理机制
- 资源管理:使用with语句或try-finally块确保资源正确释放
- 批量操作:对于大量数据操作,使用批量插入和更新
5.2 常见问题与解决方案
常见问题及解决方案:
# 解决方法:设置合理的连接超时参数
conn = psycopg2.connect(
host=”fgedu.localhost”,
port=”5432″,
database=”fgedudb”,
user=”fgedu”,
password=”fgedu123″,
connect_timeout=10
)
# 问题2:内存泄漏
# 解决方法:确保每次操作后关闭游标和连接
# 问题3:SQL注入
# 解决方法:使用参数化查询,避免直接拼接SQL语句
# 问题4:性能问题
# 解决方法:使用连接池、批量操作、异步接口等
# 问题5:编码问题
# 解决方法:确保数据库和应用程序使用相同的编码
conn = psycopg2.connect(
# 其他参数
options=”-c client_encoding=utf8″
)
5.3 性能优化技巧
性能优化技巧:
- 使用预编译语句:对于重复执行的SQL语句,使用预编译语句
- 批量操作:使用executemany进行批量插入和更新
- 使用异步接口:对于高并发场景,使用psycopg2的异步接口
- 合理设置fetch大小:对于大量数据查询,设置合适的fetch大小
- 使用服务器端游标:对于大型结果集,使用服务器端游标
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
