1. 首页 > MySQL教程 > 正文

MySQL教程FG258-MySQL Python连接器

本文档风哥主要介绍MySQL Python连接器相关知识,包括MySQL Connector/Python的安装、配置、使用和优化等内容,风哥教程参考MySQL官方文档Connector/Python内容,适合Python开发人员和DBA人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 MySQL Connector/Python概述

MySQL Connector/Python是MySQL官方提供的Python数据库连接器,实现了Python DB API 2.0规范,允许Python应用程序连接到MySQL数据库。MySQL Connector/Python完全用Python实现,不需要任何外部依赖。学习交流加群风哥微信: itpux-com

MySQL Connector/Python的主要特点:

  • 完全支持Python DB API 2.0规范
  • 支持Python 3.7及以上版本
  • 支持MySQL 5.7、8.0、8.4及以上版本
  • 支持X DevAPI(通过X Plugin)
  • 支持SSL/TLS加密连接
  • 支持连接池
  • 支持负载均衡和故障转移
  • 支持批量操作

1.2 MySQL Connector/Python特性

MySQL Connector/Python提供了丰富的特性,以满足不同的应用场景:

# MySQL Connector/Python特性

1. DB API 2.0规范支持
– 完全实现Python DB API 2.0规范
– 支持所有标准数据类型
– 支持Connection、Cursor、异常处理
– 支持事务管理
– 支持上下文管理器

2. 连接特性
– 支持TCP/IP连接
– 支持Unix域套接字连接
– 支持SSL/TLS加密连接
– 支持连接池
– 支持负载均衡
– 支持故障转移

3. 性能特性
– 支持预编译语句
– 支持批量操作
– 支持流式结果集
– 支持压缩传输

4. 安全特性
– 支持SSL/TLS加密
– 支持证书验证
– 支持密码加密
– 支持安全认证插件

5. X DevAPI支持
– 支持NoSQL操作
– 支持文档存储
– 支持异步操作
– 支持CRUD操作

6. 数据类型支持
– 支持所有MySQL数据类型
– 支持日期时间类型
– 支持JSON类型
– 支持几何类型

1.3 MySQL Connector/Python架构

MySQL Connector/Python的架构设计遵循Python DB API 2.0规范,提供了清晰的层次结构:

# MySQL Connector/Python架构

1. 应用层
– Python应用程序
– 使用DB API 2.0进行数据库操作

2. DB API 2.0层
– Connection:数据库连接
– Cursor:游标操作
– 异常处理:错误处理

3. 连接器层
– mysql.connector模块
– 连接工厂
– 协议处理器

4. 网络层
– TCP/IP连接
– SSL/TLS加密
– 协议编解码

5. MySQL服务器层
– MySQL服务器
– 数据存储
– 查询处理

# MySQL Connector/Python核心模块

1. mysql.connector
– 主模块
– 提供connect()函数

2. mysql.connector.connection
– Connection类
– 管理数据库连接

3. mysql.connector.cursor
– Cursor类
– 执行SQL语句

4. mysql.connector.pooling
– 连接池模块
– 管理连接池

5. mysql.connector.errors
– 异常类
– 错误处理

6. mysql.connector.opentelemetry
– OpenTelemetry集成
– 分布式追踪

Part02-生产环境规划与建议

2.1 MySQL Connector/Python安装

MySQL Connector/Python的安装方式有多种,可以根据项目需求选择合适的安装方式:

# MySQL Connector/Python安装

# 方式1:使用pip安装(推荐)
pip install mysql-connector-python

# 输出示例:
# Collecting mysql-connector-python
# Downloading mysql_connector_python-8.0.33-cp39-cp39-manylinux1_x86_64.whl (12.3 MB)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 12.3/12.3 MB 5.6 MB/s eta 0:00:00
# Installing collected packages: mysql-connector-python
# Successfully installed mysql-connector-python-8.0.33

# 方式2:指定版本安装
pip install mysql-connector-python==8.0.33

# 输出示例:
# Collecting mysql-connector-python==8.0.33
# Downloading mysql_connector_python-8.0.33-cp39-cp39-manylinux1_x86_64.whl (12.3 MB)
# Installing collected packages: mysql-connector-python
# Successfully installed mysql-connector-python-8.0.33

# 方式3:从MySQL官网下载安装
wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.33.tar.gz
tar -xzf mysql-connector-python-8.0.33.tar.gz
cd mysql-connector-python-8.0.33
python setup.py install

# 输出示例:
# running install
# running bdist_egg
# running egg_info
# creating mysql_connector_python.egg-info
# …
# Installed /usr/local/lib/python3.9/site-packages/mysql_connector_python-8.0.33-py3.9.egg
# Processing dependencies for mysql-connector-python==8.0.33
# Finished processing dependencies for mysql-connector-python==8.0.33

# 方式4:使用conda安装
conda install -c anaconda mysql-connector-python

# 输出示例:
# Collecting package metadata (current_repodata.json): done
# Solving environment: done
#
# ## Package Plan ##
#
# environment location: /opt/anaconda3/envs/python39
#
# added / updated specs:
# – mysql-connector-python
#
# The following packages will be downloaded:
#
# package | build
# —————————|—————–
# mysql-connector-python-8.0.33| py39h06a4308_0 1.2 MB
#
# Preparing transaction: done
# Verifying transaction: done
# Executing transaction: done

# 验证安装
python -c “import mysql.connector; print(mysql.connector.__version__)”

# 输出示例:
# 8.0.33

# 验证安装路径
python -c “import mysql.connector; print(mysql.connector.__file__)”

# 输出示例:
# /usr/local/lib/python3.9/site-packages/mysql/connector/__init__.py

# 查看已安装版本
pip show mysql-connector-python

# 输出示例:
# Name: mysql-connector-python
# Version: 8.0.33
# Summary: MySQL driver written in Python
# Home-page: https://dev.mysql.com/doc/connector-python/en/
# Author: Oracle and/or its affiliates
# Author-email:
# License: GNU GPLv2 (with FOSS License Exception)
# Location: /usr/local/lib/python3.9/site-packages
# Requires:
# Required-by:

2.2 MySQL Connector/Python配置

MySQL Connector/Python的配置主要通过连接参数进行,以下是常用的配置方式:

# MySQL Connector/Python配置

# 基本连接配置
import mysql.connector

# 方式1:使用关键字参数
conn = mysql.connector.connect(
host=’localhost’,
port=3306,
user=’root’,
password=’password’,
database=’testdb’
)

# 方式2:使用配置字典
config = {
‘host’: ‘localhost’,
‘port’: 3306,
‘user’: ‘root’,
‘password’: ‘password’,
‘database’: ‘testdb’
}
conn = mysql.connector.connect(**config)

# 方式3:使用配置文件
# 创建配置文件
vim mysql_config.ini

# 添加以下内容:
[mysql]
host = localhost
port = 3306
user = root
password = password
database = testdb

# 读取配置文件
import configparser
config = configparser.ConfigParser()
config.read(‘mysql_config.ini’)
conn = mysql.connector.connect(**config[‘mysql’])

# 常用配置参数

# 1. 连接参数
host=’localhost’ # 主机名
port=3306 # 端口号
user=’root’ # 用户名
password=’password’ # 密码
database=’testdb’ # 数据库名

# 2. SSL/TLS参数
ssl_ca=’/path/to/ca.pem’ # CA证书
ssl_cert=’/path/to/client-cert.pem’ # 客户端证书
ssl_key=’/path/to/client-key.pem’ # 客户端密钥
ssl_verify_cert=True # 验证服务器证书

# 3. 字符集参数
charset=’utf8mb4′ # 字符集
collation=’utf8mb4_unicode_ci’ # 排序规则

# 4. 连接池参数
pool_name=’mysql_pool’ # 连接池名称
pool_size=10 # 连接池大小
pool_reset_session=True # 重置会话

# 5. 超时参数
connection_timeout=30 # 连接超时(秒)
connect_timeout=10 # 连接超时(秒)

# 6. 性能参数
autocommit=False # 自动提交
time_zone=’+08:00′ # 时区
sql_mode=’STRICT_TRANS_TABLES’ # SQL模式

# 完整配置示例
import mysql.connector
from mysql.connector import Error

def create_connection():
“””创建MySQL数据库连接”””
connection = None
try:
connection = mysql.connector.connect(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′,
collation=’utf8mb4_unicode_ci’,
autocommit=False,
connection_timeout=30,
ssl_ca=’/path/to/ca.pem’,
ssl_cert=’/path/to/client-cert.pem’,
ssl_key=’/path/to/client-key.pem’,
ssl_verify_cert=True
)
print(“Connection to MySQL DB successful”)
except Error as e:
print(f”The error ‘{e}’ occurred”)
return connection

# 测试连接
conn = create_connection()
if conn and conn.is_connected():
print(“MySQL connection is active”)
conn.close()

# 输出示例:
# Connection to MySQL DB successful
# MySQL connection is active

2.3 Python连接池配置

在生产环境中,使用连接池可以显著提高应用程序的性能和稳定性。以下是MySQL Connector/Python的连接池配置:

# Python连接池配置

import mysql.connector
from mysql.connector import pooling
from contextlib import contextmanager

# 方式1:基本连接池配置
connection_pool = pooling.MySQLConnectionPool(
pool_name=”mysql_pool”,
pool_size=10,
pool_reset_session=True,
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′
)

# 从连接池获取连接
conn = connection_pool.get_connection()
print(f”Connection from pool: {conn.is_connected()}”)
conn.close()

# 输出示例:
# Connection from pool: True

# 方式2:封装连接池管理类
class MySQLConnectionPool:
_pool = None

@classmethod
def initialize(cls, host=’localhost’, port=3306, database=’testdb’,
user=’root’, password=’password’, pool_size=10):
“””初始化连接池”””
cls._pool = pooling.MySQLConnectionPool(
pool_name=”mysql_pool”,
pool_size=pool_size,
pool_reset_session=True,
host=host,
port=port,
database=database,
user=user,
password=password,
charset=’utf8mb4′,
collation=’utf8mb4_unicode_ci’,
autocommit=False,
connection_timeout=30
)
print(f”Connection pool initialized with size {pool_size}”)

@classmethod
@contextmanager
def get_connection(cls):
“””获取数据库连接(上下文管理器)”””
if cls._pool is None:
raise RuntimeError(“Connection pool not initialized”)

conn = None
try:
conn = cls._pool.get_connection()
yield conn
conn.commit()
except Exception as e:
if conn:
conn.rollback()
raise
finally:
if conn:
conn.close()

@classmethod
def get_pool_status(cls):
“””获取连接池状态”””
if cls._pool is None:
return None
return {
‘pool_name’: cls._pool.pool_name,
‘pool_size’: cls._pool.pool_size,
}

# 初始化连接池
MySQLConnectionPool.initialize(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
pool_size=10
)

# 输出示例:
# Connection pool initialized with size 10

# 使用连接池
with MySQLConnectionPool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(“SELECT VERSION()”)
version = cursor.fetchone()
print(f”MySQL version: {version[0]}”)

# 输出示例:
# MySQL version: 8.0.33

# 方式3:使用第三方连接池(如SQLAlchemy)
pip install sqlalchemy

# 配置SQLAlchemy连接池
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 创建引擎
engine = create_engine(
‘mysql+mysqlconnector://root:password@localhost:3306/testdb’,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=3600,
echo=True
)

# 创建会话工厂
Session = sessionmaker(bind=engine)

# 使用会话
session = Session()
result = session.execute(“SELECT VERSION()”)
print(f”MySQL version: {result.fetchone()[0]}”)
session.close()

# 输出示例:
# MySQL version: 8.0.33

# 方式4:使用DBUtils连接池
pip install DBUtils

# 配置DBUtils连接池
from dbutils.pooled_db import PooledDB
import mysql.connector

# 创建连接池
pool = PooledDB(
creator=mysql.connector,
maxconnections=20,
mincached=5,
maxcached=10,
maxshared=5,
blocking=True,
host=’localhost’,
port=3306,
user=’root’,
password=’password’,
database=’testdb’,
charset=’utf8mb4′
)

# 从连接池获取连接
conn = pool.connection()
cursor = conn.cursor()
cursor.execute(“SELECT VERSION()”)
version = cursor.fetchone()
print(f”MySQL version: {version[0]}”)
conn.close()

# 输出示例:
# MySQL version: 8.0.33

Part03-生产环境项目实施方案

3.1 Python连接MySQL基本操作

以下是MySQL Connector/Python的基本使用方法,包括连接数据库、执行SQL语句和处理结果集:

# Python连接MySQL基本操作

import mysql.connector
from mysql.connector import Error

# 1. 创建数据库连接
def create_connection():
“””创建MySQL数据库连接”””
connection = None
try:
connection = mysql.connector.connect(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′
)
print(“Connection to MySQL DB successful”)
except Error as e:
print(f”The error ‘{e}’ occurred”)
return connection

# 2. 创建数据库
def create_database(connection):
“””创建数据库”””
cursor = connection.cursor()
try:
cursor.execute(“CREATE DATABASE IF NOT EXISTS testdb”)
print(“Database created successfully”)
except Error as e:
print(f”The error ‘{e}’ occurred”)

# 输出示例:
# Connection to MySQL DB successful
# Database created successfully

# 3. 创建表
def create_table(connection):
“””创建表”””
cursor = connection.cursor()
try:
cursor.execute(“””
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
age INT,
email VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
“””)
print(“Table created successfully”)
except Error as e:
print(f”The error ‘{e}’ occurred”)

# 输出示例:
# Table created successfully

# 4. 插入数据
def insert_user(connection, name, age, email):
“””插入用户数据”””
cursor = connection.cursor()
try:
cursor.execute(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
(name, age, email)
)
connection.commit()
print(f”User {name} inserted successfully, ID: {cursor.lastrowid}”)
return cursor.lastrowid
except Error as e:
connection.rollback()
print(f”The error ‘{e}’ occurred”)
return None

# 测试插入
conn = create_connection()
create_table(conn)
insert_user(conn, “张三”, 25, “zhangsan@example.com”)
insert_user(conn, “李四”, 30, “lisi@example.com”)

# 输出示例:
# Connection to MySQL DB successful
# Table created successfully
# User 张三 inserted successfully, ID: 1
# User 李四 inserted successfully, ID: 2

# 5. 批量插入数据
def batch_insert_users(connection, users):
“””批量插入用户数据”””
cursor = connection.cursor()
try:
cursor.executemany(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
users
)
connection.commit()
print(f”{cursor.rowcount} users inserted successfully”)
except Error as e:
connection.rollback()
print(f”The error ‘{e}’ occurred”)

# 测试批量插入
users = [
(“王五”, 28, “wangwu@example.com”),
(“赵六”, 35, “zhaoliu@example.com”),
(“孙七”, 40, “sunqi@example.com”)
]
batch_insert_users(conn, users)

# 输出示例:
# 3 users inserted successfully

# 6. 查询数据
def query_users(connection):
“””查询所有用户”””
cursor = connection.cursor(dictionary=True)
try:
cursor.execute(“SELECT * FROM users”)
users = cursor.fetchall()
for user in users:
print(f”ID: {user[‘id’]}, Name: {user[‘name’]}, Age: {user[‘age’]}, Email: {user[’email’]}”)
return users
except Error as e:
print(f”The error ‘{e}’ occurred”)
return []

# 测试查询
query_users(conn)

# 输出示例:
# ID: 1, Name: 张三, Age: 25, Email: zhangsan@example.com
# ID: 2, Name: 李四, Age: 30, Email: lisi@example.com
# ID: 3, Name: 王五, Age: 28, Email: wangwu@example.com
# ID: 4, Name: 赵六, Age: 35, Email: zhaoliu@example.com
# ID: 5, Name: 孙七, Age: 40, Email: sunqi@example.com

# 7. 条件查询
def query_user_by_id(connection, user_id):
“””根据ID查询用户”””
cursor = connection.cursor(dictionary=True)
try:
cursor.execute(“SELECT * FROM users WHERE id = %s”, (user_id,))
user = cursor.fetchone()
if user:
print(f”Found user: {user}”)
else:
print(f”User with ID {user_id} not found”)
return user
except Error as e:
print(f”The error ‘{e}’ occurred”)
return None

# 测试条件查询
query_user_by_id(conn, 1)

# 输出示例:
# Found user: {‘id’: 1, ‘name’: ‘张三’, ‘age’: 25, ’email’: ‘zhangsan@example.com’, ‘created_at’: datetime.datetime(2026, 4, 1, 12, 0, 0)}

# 8. 更新数据
def update_user(connection, user_id, name, age, email):
“””更新用户数据”””
cursor = connection.cursor()
try:
cursor.execute(
“UPDATE users SET name = %s, age = %s, email = %s WHERE id = %s”,
(name, age, email, user_id)
)
connection.commit()
print(f”{cursor.rowcount} user(s) updated successfully”)
except Error as e:
connection.rollback()
print(f”The error ‘{e}’ occurred”)

# 测试更新
update_user(conn, 1, “张三丰”, 100, “zhangsanfeng@example.com”)

# 输出示例:
# 1 user(s) updated successfully

# 9. 删除数据
def delete_user(connection, user_id):
“””删除用户数据”””
cursor = connection.cursor()
try:
cursor.execute(“DELETE FROM users WHERE id = %s”, (user_id,))
connection.commit()
print(f”{cursor.rowcount} user(s) deleted successfully”)
except Error as e:
connection.rollback()
print(f”The error ‘{e}’ occurred”)

# 测试删除
delete_user(conn, 5)

# 输出示例:
# 1 user(s) deleted successfully

# 关闭连接
conn.close()
print(“Connection closed”)

# 输出示例:
# Connection closed

3.2 Python连接MySQL高级操作

以下是MySQL Connector/Python的高级使用方法,包括存储过程调用、大对象处理和批量操作:

# Python连接MySQL高级操作

import mysql.connector
from mysql.connector import Error

# 1. 调用存储过程
def call_stored_procedure(connection):
“””调用存储过程”””
cursor = connection.cursor()

# 创建存储过程
try:
cursor.execute(“””
CREATE PROCEDURE IF NOT EXISTS get_user_count(OUT user_count INT)
BEGIN
SELECT COUNT(*) INTO user_count FROM users;
END
“””)
print(“Stored procedure created successfully”)
except Error as e:
print(f”Error creating procedure: {e}”)

# 调用存储过程
try:
cursor.callproc(‘get_user_count’)
result = cursor.stored_results()
for r in result:
print(f”User count: {r.fetchone()[0]}”)
except Error as e:
print(f”Error calling procedure: {e}”)

# 输出示例:
# Stored procedure created successfully
# User count: 4

# 2. 使用事务
def transaction_example(connection):
“””事务示例”””
cursor = connection.cursor()

try:
# 开始事务
connection.start_transaction()

# 执行多个SQL语句
cursor.execute(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
(“测试用户1”, 25, “test1@example.com”)
)

cursor.execute(
“UPDATE users SET age = age + 1 WHERE name = %s”,
(“张三丰”,)
)

# 提交事务
connection.commit()
print(“Transaction committed successfully”)

except Error as e:
# 回滚事务
connection.rollback()
print(f”Transaction rolled back: {e}”)

# 输出示例:
# Transaction committed successfully

# 3. 使用游标类型
def cursor_types_example(connection):
“””游标类型示例”””

# 普通游标(返回元组)
cursor = connection.cursor()
cursor.execute(“SELECT * FROM users LIMIT 3”)
print(“普通游标结果:”)
for row in cursor:
print(f” {row}”)
cursor.close()

# 字典游标(返回字典)
cursor = connection.cursor(dictionary=True)
cursor.execute(“SELECT * FROM users LIMIT 3”)
print(“\n字典游标结果:”)
for row in cursor:
print(f” {row}”)
cursor.close()

# 命名元组游标
cursor = connection.cursor(named_tuple=True)
cursor.execute(“SELECT * FROM users LIMIT 3”)
print(“\n命名元组游标结果:”)
for row in cursor:
print(f” ID: {row.id}, Name: {row.name}, Age: {row.age}”)
cursor.close()

# 输出示例:
# 普通游标结果:
# (1, ‘张三丰’, 101, ‘zhangsanfeng@example.com’, datetime.datetime(2026, 4, 1, 12, 0, 0))
# (2, ‘李四’, 30, ‘lisi@example.com’, datetime.datetime(2026, 4, 1, 12, 0, 0))
# (3, ‘王五’, 28, ‘wangwu@example.com’, datetime.datetime(2026, 4, 1, 12, 0, 0))
#
# 字典游标结果:
# {‘id’: 1, ‘name’: ‘张三丰’, ‘age’: 101, ’email’: ‘zhangsanfeng@example.com’, ‘created_at’: datetime.datetime(2026, 4, 1, 12, 0, 0)}
# {‘id’: 2, ‘name’: ‘李四’, ‘age’: 30, ’email’: ‘lisi@example.com’, ‘created_at’: datetime.datetime(2026, 4, 1, 12, 0, 0)}
# {‘id’: 3, ‘name’: ‘王五’, ‘age’: 28, ’email’: ‘wangwu@example.com’, ‘created_at’: datetime.datetime(2026, 4, 1, 12, 0, 0)}
#
# 命名元组游标结果:
# ID: 1, Name: 张三丰, Age: 101
# ID: 2, Name: 李四, Age: 30
# ID: 3, Name: 王五, Age: 28

# 4. 批量操作优化
def batch_operations(connection):
“””批量操作优化”””
cursor = connection.cursor()

# 批量插入
users = [(f”用户{i}”, 20 + i % 30, f”user{i}@example.com”) for i in range(1, 101)]

try:
cursor.executemany(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
users
)
connection.commit()
print(f”Batch inserted {cursor.rowcount} users”)
except Error as e:
connection.rollback()
print(f”Error: {e}”)

# 批量更新
updates = [(25, “用户1”), (26, “用户2”), (27, “用户3”)]

try:
cursor.executemany(
“UPDATE users SET age = %s WHERE name = %s”,
updates
)
connection.commit()
print(f”Batch updated {cursor.rowcount} users”)
except Error as e:
connection.rollback()
print(f”Error: {e}”)

# 输出示例:
# Batch inserted 100 users
# Batch updated 3 users

# 5. 处理大结果集
def fetch_large_result_set(connection):
“””处理大结果集”””
cursor = connection.cursor()

# 使用fetchmany分批获取
cursor.execute(“SELECT * FROM users”)
batch_size = 10
total = 0

while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
total += len(rows)
print(f”Fetched {len(rows)} rows, total: {total}”)

print(f”Total rows fetched: {total}”)

# 输出示例:
# Fetched 10 rows, total: 10
# Fetched 10 rows, total: 20
# Fetched 10 rows, total: 30
# …
# Total rows fetched: 104

# 6. 获取元数据
def get_metadata(connection):
“””获取元数据”””
cursor = connection.cursor()

# 获取数据库列表
cursor.execute(“SHOW DATABASES”)
print(“Databases:”)
for db in cursor:
print(f” {db[0]}”)

# 获取表列表
cursor.execute(“SHOW TABLES”)
print(“\nTables:”)
for table in cursor:
print(f” {table[0]}”)

# 获取表结构
cursor.execute(“DESCRIBE users”)
print(“\nUsers table structure:”)
for column in cursor:
print(f” {column}”)

# 输出示例:
# Databases:
# information_schema
# mysql
# performance_schema
# sys
# testdb
#
# Tables:
# users
#
# Users table structure:
# (‘id’, ‘int’, ‘NO’, ‘PRI’, None, ‘auto_increment’)
# (‘name’, ‘varchar(255)’, ‘NO’, ”, None, ”)
# (‘age’, ‘int’, ‘YES’, ”, None, ”)
# (’email’, ‘varchar(255)’, ‘YES’, ”, None, ”)
# (‘created_at’, ‘timestamp’, ‘YES’, ”, ‘CURRENT_TIMESTAMP’, ”)

3.3 Python事务管理

以下是MySQL Connector/Python的事务管理方法,包括事务控制、隔离级别设置和保存点使用:

# Python事务管理

import mysql.connector
from mysql.connector import Error

# 1. 基本事务控制
def basic_transaction():
“””基本事务控制”””
conn = mysql.connector.connect(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′,
autocommit=False # 禁用自动提交
)

cursor = conn.cursor()

try:
# 执行多个SQL语句
cursor.execute(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
(“事务用户1”, 25, “tx1@example.com”)
)

cursor.execute(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
(“事务用户2”, 30, “tx2@example.com”)
)

# 提交事务
conn.commit()
print(“Transaction committed successfully”)

except Error as e:
# 回滚事务
conn.rollback()
print(f”Transaction rolled back: {e}”)

finally:
cursor.close()
conn.close()

# 输出示例:
# Transaction committed successfully

# 2. 使用上下文管理器
def transaction_with_context_manager():
“””使用上下文管理器管理事务”””
conn = mysql.connector.connect(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′,
autocommit=False
)

try:
# 使用上下文管理器
with conn.cursor() as cursor:
cursor.execute(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
(“上下文用户”, 28, “context@example.com”)
)
conn.commit()
print(“Transaction with context manager committed successfully”)
except Error as e:
conn.rollback()
print(f”Transaction rolled back: {e}”)
finally:
conn.close()

# 输出示例:
# Transaction with context manager committed successfully

# 3. 设置隔离级别
def set_isolation_level():
“””设置隔离级别”””
conn = mysql.connector.connect(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′,
autocommit=False
)

cursor = conn.cursor()

# 查看当前隔离级别
cursor.execute(“SELECT @@transaction_isolation”)
print(f”Current isolation level: {cursor.fetchone()[0]}”)

# 设置隔离级别
cursor.execute(“SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED”)
print(“Isolation level set to READ COMMITTED”)

# 验证隔离级别
cursor.execute(“SELECT @@transaction_isolation”)
print(f”New isolation level: {cursor.fetchone()[0]}”)

cursor.close()
conn.close()

# 输出示例:
# Current isolation level: REPEATABLE-READ
# Isolation level set to READ COMMITTED
# New isolation level: READ-COMMITTED

# 4. 使用保存点
def savepoint_example():
“””保存点示例”””
conn = mysql.connector.connect(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′,
autocommit=False
)

cursor = conn.cursor()

try:
# 插入第一条记录
cursor.execute(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
(“保存点用户1”, 25, “sp1@example.com”)
)
print(“Inserted user 1”)

# 创建保存点
cursor.execute(“SAVEPOINT savepoint1”)
print(“Created savepoint1”)

# 插入第二条记录
cursor.execute(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
(“保存点用户2”, 30, “sp2@example.com”)
)
print(“Inserted user 2”)

# 创建第二个保存点
cursor.execute(“SAVEPOINT savepoint2”)
print(“Created savepoint2”)

# 插入第三条记录
cursor.execute(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
(“保存点用户3”, 35, “sp3@example.com”)
)
print(“Inserted user 3”)

# 回滚到第二个保存点
cursor.execute(“ROLLBACK TO SAVEPOINT savepoint2”)
print(“Rolled back to savepoint2”)

# 提交事务
conn.commit()
print(“Transaction committed”)

except Error as e:
conn.rollback()
print(f”Transaction rolled back: {e}”)

finally:
cursor.close()
conn.close()

# 输出示例:
# Inserted user 1
# Created savepoint1
# Inserted user 2
# Created savepoint2
# Inserted user 3
# Rolled back to savepoint2
# Transaction committed

# 5. 事务装饰器
def transaction_decorator(func):
“””事务装饰器”””
def wrapper(*args, **kwargs):
conn = mysql.connector.connect(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′,
autocommit=False
)

try:
result = func(conn, *args, **kwargs)
conn.commit()
print(“Transaction committed”)
return result
except Error as e:
conn.rollback()
print(f”Transaction rolled back: {e}”)
raise
finally:
conn.close()

return wrapper

@transaction_decorator
def transfer_money(conn, from_user, to_user, amount):
“””转账操作”””
cursor = conn.cursor()

# 扣款
cursor.execute(
“UPDATE accounts SET balance = balance – %s WHERE user_id = %s”,
(amount, from_user)
)

# 存款
cursor.execute(
“UPDATE accounts SET balance = balance + %s WHERE user_id = %s”,
(amount, to_user)
)

print(f”Transferred {amount} from user {from_user} to user {to_user}”)

# 输出示例:
# Transferred 100 from user 1 to user 2
# Transaction committed

Part04-生产案例与实战讲解

4.1 Django集成MySQL实战

以下是Django框架集成MySQL的实战案例,包括配置和使用方法:

# Django集成MySQL实战

# 1. 安装依赖
pip install django mysql-connector-python

# 输出示例:
# Collecting django
# Downloading Django-4.2.5-py3-none-any.whl (8.0 MB)
# Collecting mysql-connector-python
# Downloading mysql_connector_python-8.0.33-cp39-cp39-manylinux1_x86_64.whl (12.3 MB)
# Installing collected packages: django, mysql-connector-python
# Successfully installed django-4.2.5 mysql-connector-python-8.0.33

# 2. 创建Django项目
django-admin startproject myproject
cd myproject

# 输出示例:
# (项目目录创建成功)

# 3. 配置数据库
# 编辑settings.py
vim myproject/settings.py

# 修改DATABASES配置:
DATABASES = {
‘default’: {
‘ENGINE’: ‘django.db.backends.mysql’,
‘NAME’: ‘testdb’,
‘USER’: ‘root’,
‘PASSWORD’: ‘password’,
‘HOST’: ‘localhost’,
‘PORT’: ‘3306’,
‘OPTIONS’: {
‘charset’: ‘utf8mb4’,
‘init_command’: “SET sql_mode=’STRICT_TRANS_TABLES'”,
},
}
}

# 4. 创建应用
python manage.py startapp myapp

# 输出示例:
# (应用目录创建成功)

# 5. 定义模型
# 编辑myapp/models.py
vim myapp/models.py

# 添加以下内容:
from django.db import models

class User(models.Model):
name = models.CharField(max_length=255)
age = models.IntegerField(null=True, blank=True)
email = models.EmailField(max_length=255, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)

class Meta:
db_table = ‘users’

def __str__(self):
return self.name

# 6. 注册应用
# 编辑settings.py
vim myproject/settings.py

# 添加到INSTALLED_APPS:
INSTALLED_APPS = [
‘django.contrib.admin’,
‘django.contrib.auth’,
‘django.contrib.contenttypes’,
‘django.contrib.sessions’,
‘django.contrib.messages’,
‘django.contrib.staticfiles’,
‘myapp’,
]

# 7. 执行数据库迁移
python manage.py makemigrations

# 输出示例:
# Migrations for ‘myapp’:
# myapp/migrations/0001_initial.py
# – Create model User

python manage.py migrate

# 输出示例:
# Operations to perform:
# Apply all migrations: admin, auth, contenttypes, myapp, sessions
# Running migrations:
# Applying contenttypes.0001_initial… OK
# Applying auth.0001_initial… OK
# Applying admin.0001_initial… OK
# Applying admin.0002_logentry_remove_auto_add… OK
# Applying admin.0003_logentry_add_action_flag_choices… OK
# Applying contenttypes.0002_remove_content_type_name… OK
# Applying auth.0002_alter_permission_name_max_length… OK
# Applying auth.0003_alter_user_email_max_length… OK
# Applying auth.0004_alter_user_username_opts… OK
# Applying auth.0005_alter_user_last_login_null… OK
# Applying auth.0006_require_contenttypes_0002… OK
# Applying auth.0007_alter_validators_add_error_messages… OK
# Applying auth.0008_alter_user_username_max_length… OK
# Applying auth.0009_alter_user_last_name_max_length… OK
# Applying auth.0010_alter_group_name_max_length… OK
# Applying auth.0011_update_proxy_permissions… OK
# Applying auth.0012_alter_user_first_name_max_length… OK
# Applying myapp.0001_initial… OK
# Applying sessions.0001_initial… OK

# 8. 创建视图
# 编辑myapp/views.py
vim myapp/views.py

# 添加以下内容:
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from .models import User
import json

@require_http_methods([“GET”])
def get_users(request):
“””获取所有用户”””
users = User.objects.all().values(‘id’, ‘name’, ‘age’, ’email’, ‘created_at’)
return JsonResponse({‘users’: list(users)})

@require_http_methods([“POST”])
def create_user(request):
“””创建用户”””
data = json.loads(request.body)
user = User.objects.create(
name=data[‘name’],
age=data.get(‘age’),
email=data.get(’email’)
)
return JsonResponse({‘id’: user.id, ‘name’: user.name}, status=201)

@require_http_methods([“GET”])
def get_user(request, user_id):
“””获取单个用户”””
try:
user = User.objects.get(id=user_id)
return JsonResponse({
‘id’: user.id,
‘name’: user.name,
‘age’: user.age,
’email’: user.email,
‘created_at’: user.created_at
})
except User.DoesNotExist:
return JsonResponse({‘error’: ‘User not found’}, status=404)

@require_http_methods([“PUT”])
def update_user(request, user_id):
“””更新用户”””
try:
user = User.objects.get(id=user_id)
data = json.loads(request.body)
user.name = data.get(‘name’, user.name)
user.age = data.get(‘age’, user.age)
user.email = data.get(’email’, user.email)
user.save()
return JsonResponse({‘id’: user.id, ‘name’: user.name})
except User.DoesNotExist:
return JsonResponse({‘error’: ‘User not found’}, status=404)

@require_http_methods([“DELETE”])
def delete_user(request, user_id):
“””删除用户”””
try:
user = User.objects.get(id=user_id)
user.delete()
return JsonResponse({‘message’: ‘User deleted’})
except User.DoesNotExist:
return JsonResponse({‘error’: ‘User not found’}, status=404)

# 9. 配置URL
# 编辑myproject/urls.py
vim myproject/urls.py

# 添加以下内容:
from django.urls import path
from myapp.views import get_users, create_user, get_user, update_user, delete_user

urlpatterns = [
path(‘api/users/’, get_users),
path(‘api/users/’, create_user),
path(‘api/users//’, get_user),
path(‘api/users//’, update_user),
path(‘api/users//’, delete_user),
]

# 10. 运行服务器
python manage.py runserver

# 输出示例:
# Watching for file changes with StatReloader
# Performing system checks…
#
# System check identified no issues (0 silenced).
# April 01, 2026 – 12:00:00
# Django version 4.2.5, using settings ‘myproject.settings’
# Starting development server at http://127.0.0.1:8000/
# Quit the server with CONTROL-C.

# 11. 测试API
# 创建用户
curl -X POST http://127.0.0.1:8000/api/users/ \
-H “Content-Type: application/json” \
-d ‘{“name”: “张三”, “age”: 25, “email”: “zhangsan@example.com”}’

# 输出示例:
# {“id”: 1, “name”: “张三”}

# 获取所有用户
curl http://127.0.0.1:8000/api/users/

# 输出示例:
# {“users”: [{“id”: 1, “name”: “张三”, “age”: 25, “email”: “zhangsan@example.com”, “created_at”: “2026-04-01T12:00:00Z”}]}

# 获取单个用户
curl http://127.0.0.1:8000/api/users/1/

# 输出示例:
# {“id”: 1, “name”: “张三”, “age”: 25, “email”: “zhangsan@example.com”, “created_at”: “2026-04-01T12:00:00Z”}

# 更新用户
curl -X PUT http://127.0.0.1:8000/api/users/1/ \
-H “Content-Type: application/json” \
-d ‘{“name”: “张三丰”, “age”: 100}’

# 输出示例:
# {“id”: 1, “name”: “张三丰”}

# 删除用户
curl -X DELETE http://127.0.0.1:8000/api/users/1/

# 输出示例:
# {“message”: “User deleted”}

4.2 Flask集成MySQL实战

以下是Flask框架集成MySQL的实战案例,包括配置和使用方法:

# Flask集成MySQL实战

# 1. 安装依赖
pip install flask mysql-connector-python

# 输出示例:
# Collecting flask
# Downloading flask-2.3.3-py3-none-any.whl (96 kB)
# Collecting mysql-connector-python
# Downloading mysql_connector_python-8.0.33-cp39-cp39-manylinux1_x86_64.whl (12.3 MB)
# Installing collected packages: flask, mysql-connector-python
# Successfully installed flask-2.3.3 mysql-connector-python-8.0.33

# 2. 创建Flask应用
vim app.py

# 添加以下内容:
from flask import Flask, request, jsonify
import mysql.connector
from mysql.connector import pooling
from contextlib import contextmanager

app = Flask(__name__)

# 配置连接池
connection_pool = pooling.MySQLConnectionPool(
pool_name=”mysql_pool”,
pool_size=10,
pool_reset_session=True,
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′
)

@contextmanager
def get_db_connection():
“””获取数据库连接”””
conn = connection_pool.get_connection()
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise
finally:
conn.close()

@app.route(‘/api/users’, methods=[‘GET’])
def get_users():
“””获取所有用户”””
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(“SELECT * FROM users”)
users = cursor.fetchall()
return jsonify({‘users’: users})

@app.route(‘/api/users’, methods=[‘POST’])
def create_user():
“””创建用户”””
data = request.get_json()
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
“INSERT INTO users (name, age, email) VALUES (%s, %s, %s)”,
(data[‘name’], data.get(‘age’), data.get(’email’))
)
return jsonify({‘id’: cursor.lastrowid, ‘name’: data[‘name’]}), 201

@app.route(‘/api/users/‘, methods=[‘GET’])
def get_user(user_id):
“””获取单个用户”””
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(“SELECT * FROM users WHERE id = %s”, (user_id,))
user = cursor.fetchone()
if user:
return jsonify(user)
return jsonify({‘error’: ‘User not found’}), 404

@app.route(‘/api/users/‘, methods=[‘PUT’])
def update_user(user_id):
“””更新用户”””
data = request.get_json()
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
“UPDATE users SET name = %s, age = %s, email = %s WHERE id = %s”,
(data.get(‘name’), data.get(‘age’), data.get(’email’), user_id)
)
if cursor.rowcount > 0:
return jsonify({‘id’: user_id, ‘name’: data.get(‘name’)})
return jsonify({‘error’: ‘User not found’}), 404

@app.route(‘/api/users/‘, methods=[‘DELETE’])
def delete_user(user_id):
“””删除用户”””
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(“DELETE FROM users WHERE id = %s”, (user_id,))
if cursor.rowcount > 0:
return jsonify({‘message’: ‘User deleted’})
return jsonify({‘error’: ‘User not found’}), 404

if __name__ == ‘__main__’:
app.run(debug=True, host=’0.0.0.0′, port=5000)

# 3. 运行Flask应用
python app.py

# 输出示例:
# * Serving Flask app ‘app’
# * Debug mode: on
# * Running on all addresses (0.0.0.0)
# * Running on http://127.0.0.1:5000
# * Running on http://192.168.1.100:5000
# * Restarting with stat
# * Debugger is active!
# * Debugger PIN: 123-456-789

# 4. 测试API
# 创建用户
curl -X POST http://127.0.0.1:5000/api/users \
-H “Content-Type: application/json” \
-d ‘{“name”: “李四”, “age”: 30, “email”: “lisi@example.com”}’

# 输出示例:
# {“id”: 1, “name”: “李四”}

# 获取所有用户
curl http://127.0.0.1:5000/api/users

# 输出示例:
# {“users”: [{“age”: 30, “created_at”: “2026-04-01 12:00:00”, “email”: “lisi@example.com”, “id”: 1, “name”: “李四”}]}

# 获取单个用户
curl http://127.0.0.1:5000/api/users/1

# 输出示例:
# {“age”: 30, “created_at”: “2026-04-01 12:00:00”, “email”: “lisi@example.com”, “id”: 1, “name”: “李四”}

# 更新用户
curl -X PUT http://127.0.0.1:5000/api/users/1 \
-H “Content-Type: application/json” \
-d ‘{“name”: “李四光”, “age”: 35}’

# 输出示例:
# {“id”: 1, “name”: “李四光”}

# 删除用户
curl -X DELETE http://127.0.0.1:5000/api/users/1

# 输出示例:
# {“message”: “User deleted”}

4.3 Python数据分析实战

以下是使用Python进行MySQL数据分析的实战案例,包括使用pandas和SQLAlchemy:

# Python数据分析实战

# 1. 安装依赖
pip install pandas sqlalchemy matplotlib

# 输出示例:
# Collecting pandas
# Downloading pandas-2.0.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
# Collecting sqlalchemy
# Downloading SQLAlchemy-2.0.20-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.7 MB)
# Collecting matplotlib
# Downloading matplotlib-3.7.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.6 MB)
# Installing collected packages: pandas, sqlalchemy, matplotlib
# Successfully installed pandas-2.0.3 sqlalchemy-2.0.20 matplotlib-3.7.2

# 2. 使用pandas读取MySQL数据
import pandas as pd
import mysql.connector
from sqlalchemy import create_engine
import matplotlib.pyplot as plt

# 方式1:使用mysql.connector
conn = mysql.connector.connect(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′
)

# 读取数据到DataFrame
df = pd.read_sql(“SELECT * FROM users”, conn)
print(df.head())

# 输出示例:
# id name age email created_at
# 0 1 张三 25 zhangsan@example.com 2026-04-01 12:00:00
# 1 2 李四 30 lisi@example.com 2026-04-01 12:00:00
# 2 3 王五 28 wangwu@example.com 2026-04-01 12:00:00
# 3 4 赵六 35 zhaoliu@example.com 2026-04-01 12:00:00
# 4 5 孙七 40 sunqi@example.com 2026-04-01 12:00:00

conn.close()

# 方式2:使用SQLAlchemy
engine = create_engine(‘mysql+mysqlconnector://root:password@localhost:3306/testdb’)

# 读取数据
df = pd.read_sql(“SELECT * FROM users”, engine)
print(df.info())

# 输出示例:
#
# RangeIndex: 100 entries, 0 to 99
# Data columns (total 5 columns):
# # Column Non-Null Count Dtype
# — —— ————– —–
# 0 id 100 non-null int64
# 1 name 100 non-null object
# 2 age 100 non-null int64
# 3 email 100 non-null object
# 4 created_at 100 non-null datetime64[ns]
# dtypes: datetime64[ns](1), int64(2), object(2)
# memory usage: 4.0+ KB

# 3. 数据分析
# 基本统计
print(df.describe())

# 输出示例:
# id age
# count 100.000000 100.000000
# mean 50.500000 34.500000
# std 29.011492 8.660254
# min 1.000000 20.000000
# 25% 25.750000 27.750000
# 50% 50.500000 34.500000
# 75% 75.250000 41.250000
# max 100.000000 49.000000

# 年龄分布
age_distribution = df[‘age’].value_counts().sort_index()
print(age_distribution)

# 输出示例:
# 20 5
# 21 5
# 22 5
# …
# 48 5
# 49 5
# Name: age, dtype: int64

# 4. 数据可视化
plt.figure(figsize=(12, 6))

# 年龄分布直方图
plt.subplot(1, 2, 1)
plt.hist(df[‘age’], bins=20, edgecolor=’black’)
plt.title(‘Age Distribution’)
plt.xlabel(‘Age’)
plt.ylabel(‘Count’)

# 年龄分布箱线图
plt.subplot(1, 2, 2)
plt.boxplot(df[‘age’])
plt.title(‘Age Boxplot’)
plt.ylabel(‘Age’)

plt.tight_layout()
plt.savefig(‘age_analysis.png’)
print(“Chart saved to age_analysis.png”)

# 输出示例:
# Chart saved to age_analysis.png

# 5. 数据写入MySQL
# 创建新DataFrame
new_users = pd.DataFrame({
‘name’: [‘新用户1’, ‘新用户2’, ‘新用户3’],
‘age’: [22, 28, 35],
’email’: [‘new1@example.com’, ‘new2@example.com’, ‘new3@example.com’]
})

# 写入MySQL
new_users.to_sql(‘users’, engine, if_exists=’append’, index=False)
print(f”Inserted {len(new_users)} new users”)

# 输出示例:
# Inserted 3 new users

# 6. 复杂查询分析
# 使用pandas进行复杂分析
query = “””
SELECT
DATE(created_at) as date,
COUNT(*) as user_count,
AVG(age) as avg_age
FROM users
GROUP BY DATE(created_at)
ORDER BY date
“””

daily_stats = pd.read_sql(query, engine)
print(daily_stats)

# 输出示例:
# date user_count avg_age
# 0 2026-04-01 103 34.504854

# 7. 数据导出
# 导出到CSV
df.to_csv(‘users_export.csv’, index=False, encoding=’utf-8′)
print(“Data exported to users_export.csv”)

# 输出示例:
# Data exported to users_export.csv

# 导出到Excel
df.to_excel(‘users_export.xlsx’, index=False, engine=’openpyxl’)
print(“Data exported to users_export.xlsx”)

# 输出示例:
# Data exported to users_export.xlsx

Part05-风哥经验总结与分享

5.1 MySQL Python最佳实践

通过多年的MySQL数据库开发和运维经验,我总结了以下MySQL Python最佳实践:

风哥提示:MySQL Python连接器的正确使用对于Python应用程序的性能和稳定性至关重要,需要根据实际场景选择合适的配置。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息