1. 首页 > DB2教程 > 正文

DB2教程FG046-DB2与Python集成实战

风哥教程参考DB2官方文档IBM Data Server Driver for Python、Application Development Guide等内容,详细介绍DB2与Python的集成方法、常见库的使用、最佳实践。更多视频教程www.fgedu.net.cn

目录大纲

Part01-环境配置

1.1 安装IBM Data Server Driver

# 使用pip安装ibm_db
pip install ibm_db

# 或者安装ibm-db-sa (SQLAlchemy支持)
pip install ibm-db-sa

# 验证安装
python -c “import ibm_db; print(ibm_db.__version__)”

$ pip install ibm_db
Collecting ibm_db
Downloading ibm_db-3.2.0-cp310-cp310-linux_x86_64.whl (10.1 MB)
|████████████████████████████████| 10.1 MB 10.1 MB/s
Installing collected packages: ibm_db
Successfully installed ibm_db-3.2.0

$ python -c “import ibm_db; print(ibm_db.__version__)”
3.2.0

1.2 连接数据库

import ibm_db
import ibm_db_dbi

# 连接参数
database = “fgedb”
hostname = “db2server.fgedu.net.cn”
port = 50000
protocol = “TCPIP”
uid = “db2inst1”
pwd = “password”

# 构建连接字符串
conn_str = (
f”DATABASE={database};”
f”HOSTNAME={hostname};”
f”PORT={port};”
f”PROTOCOL={protocol};”
f”UID={uid};”
f”PWD={pwd};”
“CURRENTSCHEMA=FGEDU;”
)

try:
# 使用ibm_db连接
conn = ibm_db.connect(conn_str, “”, “”)
print(“连接成功!”)

# 使用ibm_db_dbi连接 (DB API 2.0)
conn_dbi = ibm_db_dbi.connect(conn_str, “”, “”)
print(“DB API 2.0连接成功!”)

# 关闭连接
ibm_db.close(conn)
conn_dbi.close()
print(“连接已关闭”)

except Exception as e:
print(f”连接失败: {e}”)

$ python db2_connect.py
连接成功!
DB API 2.0连接成功!
连接已关闭

Part02-基础CRUD操作

2.1 查询操作

import ibm_db_dbi

def query_orders(min_amount):
conn_str = (
“DATABASE=fgedb;”
“HOSTNAME=db2server.fgedu.net.cn;”
“PORT=50000;”
“PROTOCOL=TCPIP;”
“UID=db2inst1;”
“PWD=password;”
)

sql = “””
SELECT id, name, order_date, order_amount
FROM fgedu_order
WHERE order_amount > ?
ORDER BY order_date DESC
“””

try:
conn = ibm_db_dbi.connect(conn_str, “”, “”)
cursor = conn.cursor()

cursor.execute(sql, (min_amount,))

results = cursor.fetchall()
print(f”找到 {len(results)} 条记录:”)
print(“-” * 60)
print(f”{‘ID’:<5} {'订单名称':<20} {'日期':<15} {'金额':<10}") print("-" * 60) for row in results: order_id, name, order_date, amount = row print(f"{order_id:<5} {name:<20} {order_date:<15} {amount:<10}") cursor.close() conn.close() return results except Exception as e: print(f"查询失败: {e}") return None if __name__ == "__main__": query_orders(1000)

2.2 插入操作

import ibm_db_dbi
from datetime import datetime

def insert_order(order_id, name, amount):
conn_str = (
“DATABASE=fgedb;”
“HOSTNAME=db2server.fgedu.net.cn;”
“PORT=50000;”
“PROTOCOL=TCPIP;”
“UID=db2inst1;”
“PWD=password;”
)

sql = “””
INSERT INTO fgedu_order (id, name, order_date, order_amount)
VALUES (?, ?, ?, ?)
“””

try:
conn = ibm_db_dbi.connect(conn_str, “”, “”)
cursor = conn.cursor()

conn.autocommit = False

order_date = datetime.now().date()

cursor.execute(sql, (order_id, name, order_date, amount))
conn.commit()

print(f”成功插入订单: {name}”)

cursor.close()
conn.close()

except Exception as e:
print(f”插入失败: {e}”)
if conn:
conn.rollback()
raise

if __name__ == “__main__”:
insert_order(3001, “Python测试订单”, 5000.00)

2.3 更新操作

import ibm_db_dbi

def update_order_amount(order_id, new_amount):
conn_str = (
“DATABASE=fgedb;”
“HOSTNAME=db2server.fgedu.net.cn;”
“PORT=50000;”
“PROTOCOL=TCPIP;”
“UID=db2inst1;”
“PWD=password;”
)

sql = “UPDATE fgedu_order SET order_amount = ? WHERE id = ?”

try:
conn = ibm_db_dbi.connect(conn_str, “”, “”)
cursor = conn.cursor()

cursor.execute(sql, (new_amount, order_id))
conn.commit()

print(f”更新了 {cursor.rowcount} 条记录”)

cursor.close()
conn.close()

except Exception as e:
print(f”更新失败: {e}”)
raise

if __name__ == “__main__”:
update_order_amount(3001, 5500.00)

2.4 删除操作

import ibm_db_dbi

def delete_order(order_id):
conn_str = (
“DATABASE=fgedb;”
“HOSTNAME=db2server.fgedu.net.cn;”
“PORT=50000;”
“PROTOCOL=TCPIP;”
“UID=db2inst1;”
“PWD=password;”
)

sql = “DELETE FROM fgedu_order WHERE id = ?”

try:
conn = ibm_db_dbi.connect(conn_str, “”, “”)
cursor = conn.cursor()

cursor.execute(sql, (order_id,))
conn.commit()

print(f”删除了 {cursor.rowcount} 条记录”)

cursor.close()
conn.close()

except Exception as e:
print(f”删除失败: {e}”)
raise

if __name__ == “__main__”:
delete_order(3001)

Part03-连接池与ORM

3.1 使用SQLAlchemy

from sqlalchemy import create_engine, Column, Integer, String, Date, Numeric
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 创建引擎
DATABASE_URL = “db2+ibm_db://db2inst1:password@db2server.fgedu.net.cn:50000/fgedb”
engine = create_engine(DATABASE_URL, echo=True)

# 创建基类
Base = declarative_base()

# 定义模型
class Order(Base):
__tablename__ = ‘fgedu_order’

id = Column(Integer, primary_key=True)
name = Column(String(100))
order_date = Column(Date)
order_amount = Column(Numeric(10, 2))

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 查询
orders = session.query(Order).filter(Order.order_amount > 1000).all()
for order in orders:
print(f”{order.id}: {order.name} – {order.order_amount}”)

# 插入
new_order = Order(id=4001, name=”SQLAlchemy订单”, order_date=”2026-04-08″, order_amount=3000.00)
session.add(new_order)
session.commit()

# 更新
order = session.query(Order).get(4001)
order.order_amount = 3500.00
session.commit()

# 删除
session.delete(order)
session.commit()

session.close()

3.2 使用pandas处理数据

import pandas as pd
import ibm_db_dbi

# 连接数据库
conn_str = (
“DATABASE=fgedb;”
“HOSTNAME=db2server.fgedu.net.cn;”
“PORT=50000;”
“PROTOCOL=TCPIP;”
“UID=db2inst1;”
“PWD=password;”
)

conn = ibm_db_dbi.connect(conn_str, “”, “”)

# 读取数据到DataFrame
sql = “SELECT * FROM fgedu_order”
df = pd.read_sql(sql, conn)

print(“前5条记录:”)
print(df.head())

print(“\n数据统计:”)
print(df.describe())

# 数据分析
print(“\n按日期统计订单金额:”)
daily_stats = df.groupby(‘order_date’)[‘order_amount’].sum()
print(daily_stats)

# 写入数据
new_data = pd.DataFrame({
‘id’: [5001, 5002],
‘name’: [‘Pandas订单1’, ‘Pandas订单2’],
‘order_date’: [‘2026-04-08’, ‘2026-04-08’],
‘order_amount’: [2000.00, 3000.00]
})

new_data.to_sql(‘fgedu_order’, conn, if_exists=’append’, index=False)

conn.close()

Part04-高级特性

4.1 批量操作

import ibm_db_dbi
from datetime import datetime

def batch_insert_orders(orders):
conn_str = (
“DATABASE=fgedb;”
“HOSTNAME=db2server.fgedu.net.cn;”
“PORT=50000;”
“PROTOCOL=TCPIP;”
“UID=db2inst1;”
“PWD=password;”
)

sql = “””
INSERT INTO fgedu_order (id, name, order_date, order_amount)
VALUES (?, ?, ?, ?)
“””

try:
conn = ibm_db_dbi.connect(conn_str, “”, “”)
cursor = conn.cursor()

conn.autocommit = False

# 批量执行
cursor.executemany(sql, orders)
conn.commit()

print(f”成功批量插入 {len(orders)} 条记录”)

cursor.close()
conn.close()

except Exception as e:
print(f”批量插入失败: {e}”)
if conn:
conn.rollback()
raise

if __name__ == “__main__”:
orders_data = [
(6001, “批量订单1”, datetime.now().date(), 1000.00),
(6002, “批量订单2”, datetime.now().date(), 2000.00),
(6003, “批量订单3”, datetime.now().date(), 3000.00),
(6004, “批量订单4”, datetime.now().date(), 4000.00),
(6005, “批量订单5”, datetime.now().date(), 5000.00),
]
batch_insert_orders(orders_data)

4.2 调用存储过程

import ibm_db_dbi

def call_stored_procedure(order_id):
conn_str = (
“DATABASE=fgedb;”
“HOSTNAME=db2server.fgedu.net.cn;”
“PORT=50000;”
“PROTOCOL=TCPIP;”
“UID=db2inst1;”
“PWD=password;”
)

try:
conn = ibm_db_dbi.connect(conn_str, “”, “”)
cursor = conn.cursor()

# 调用存储过程
cursor.callproc(‘fgedu.sp_get_order’, (order_id, None, None))

# 获取输出参数
result = cursor.fetchall()
print(f”存储过程结果: {result}”)

cursor.close()
conn.close()

except Exception as e:
print(f”调用存储过程失败: {e}”)
raise

if __name__ == “__main__”:
call_stored_procedure(1)

4.3 事务管理

import ibm_db_dbi

def transfer_amount(from_id, to_id, amount):
conn_str = (
“DATABASE=fgedb;”
“HOSTNAME=db2server.fgedu.net.cn;”
“PORT=50000;”
“PROTOCOL=TCPIP;”
“UID=db2inst1;”
“PWD=password;”
)

sql1 = “UPDATE fgedu_order SET order_amount = order_amount – ? WHERE id = ?”
sql2 = “UPDATE fgedu_order SET order_amount = order_amount + ? WHERE id = ?”

try:
conn = ibm_db_dbi.connect(conn_str, “”, “”)
cursor = conn.cursor()

conn.autocommit = False

cursor.execute(sql1, (amount, from_id))
cursor.execute(sql2, (amount, to_id))

conn.commit()
print(“转账成功”)

cursor.close()
conn.close()

except Exception as e:
print(f”转账失败: {e}”)
if conn:
conn.rollback()
print(“事务已回滚”)
raise

if __name__ == “__main__”:
transfer_amount(1, 2, 100.00)

Part05-风哥经验总结与分享

5.1 Python集成最佳实践

  • 使用上下文管理器(with语句)管理连接
  • 使用参数化查询防止SQL注入
  • 合理设置autocommit
  • 及时关闭连接和游标
  • 使用批量操作提高性能
  • 使用连接池管理连接
  • 异常处理和日志记录

5.2 常见问题与解决方案

问题 原因 解决方案
连接失败 网络或认证问题 检查连接字符串、网络、用户名密码
编码问题 字符集不匹配 设置正确的字符集
性能慢 逐条操作 使用批量操作executemany
内存泄漏 未关闭连接 使用with语句或try-finally

5.3 性能优化建议

  • 使用批量操作代替单条操作
  • 合理使用索引
  • 避免在循环中执行SQL
  • 使用fetchmany分批获取数据
  • 优化SQL查询
  • 使用连接池
  • 使用pandas处理大量数据
更多视频教程www.fgedu.net.cn
学习交流加群风哥微信: itpux-com
风哥Oracle/MySQL/PostgreSQL/Greenplum/DB2/Redis等数据库培训课程,10年一线实战经验,企业级培训,真正掌握数据库核心技术!

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息