1. 首页 > MySQL教程 > 正文

MySQL教程FG256-MySQL连接器与APIs概述

本文档风哥主要介绍MySQL连接器与APIs相关知识,包括MySQL连接器的概念、MySQL连接器类型、MySQL
APIs概述、MySQL连接器安装与配置、MySQL连接器使用实战等内容,风哥教程参考MySQL官方文档Connectors and
APIs内容编写,适合DBA人员和开发人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 MySQL连接器概述

MySQL连接器(MySQL
Connectors)是MySQL官方提供的用于连接MySQL数据库的驱动程序和API库,允许各种编程语言和应用程序与MySQL数据库进行交互。MySQL连接器提供了标准的数据库访问接口,使开发人员能够在不同的编程环境中使用MySQL数据库。学习交流加群风哥微信:
itpux-com

MySQL连接器的主要功能:

  • 提供数据库连接功能
  • 支持SQL语句执行
  • 支持事务处理
  • 支持结果集处理
  • 支持连接池管理
  • 支持SSL/TLS加密连接
  • 支持负载均衡和高可用

1.2 MySQL连接器类型

MySQL提供了多种类型的连接器,以支持不同的编程语言和应用场景:

# MySQL连接器类型

1. MySQL Connector/J
– 语言:Java
– 类型:JDBC驱动
– 特点:完全支持JDBC 4.2规范,支持X DevAPI
– 适用场景:Java应用程序、Java EE应用服务器

2. MySQL Connector/Python
– 语言:Python
– 类型:Python驱动
– 特点:支持Python DB API 2.0规范,支持X DevAPI
– 适用场景:Python应用程序、数据分析、机器学习

3. MySQL Connector/C++
– 语言:C++
– 类型:C++驱动
– 特点:支持JDBC 4.0规范,支持X DevAPI
– 适用场景:C++应用程序、高性能应用

4. MySQL Connector/C
– 语言:C
– 类型:C驱动
– 特点:基于MySQL C API,支持X DevAPI
– 适用场景:C应用程序、嵌入式系统

5. MySQL Connector/NET
– 语言:.NET (C#, VB.NET等)
– 类型:ADO.NET驱动
– 特点:支持ADO.NET 2.0规范,支持Entity Framework
– 适用场景:.NET应用程序、ASP.NET应用

6. MySQL Connector/ODBC
– 语言:多种语言
– 类型:ODBC驱动
– 特点:支持ODBC 3.5规范
– 适用场景:需要ODBC接口的应用程序

7. MySQL Connector/Node.js
– 语言:Node.js
– 类型:Node.js驱动
– 特点:支持X DevAPI,支持异步操作
– 适用场景:Node.js应用程序、Web应用

8. MySQL Connector/Go
– 语言:Go
– 类型:Go驱动
– 特点:支持database/sql接口
– 适用场景:Go应用程序、云原生应用

1.3 MySQL APIs概述

MySQL提供了多种API接口,以满足不同的开发需求:

# MySQL APIs概述

1. MySQL C API
– 描述:MySQL原生C语言API
– 特点:性能最高,功能最全
– 适用场景:C/C++应用程序、需要高性能的场景

2. MySQL X DevAPI
– 描述:MySQL 8.0引入的新API
– 特点:支持NoSQL和SQL操作,支持异步操作
– 适用场景:需要NoSQL功能的场景、现代应用开发

3. MySQL X Plugin
– 描述:MySQL服务器插件
– 特点:支持X Protocol,支持X DevAPI
– 适用场景:需要X DevAPI的场景

4. MySQL Protocol
– 描述:MySQL客户端/服务器协议
– 特点:底层通信协议
– 适用场景:需要实现MySQL协议的场景

5. MySQL Replication API
– 描述:复制API
– 特点:支持复制功能
– 适用场景:需要实现复制功能的场景

6. MySQL Admin API
– 描述:管理API
– 特点:支持管理功能
– 适用场景:需要管理MySQL的场景

Part02-生产环境规划与建议

2.1 MySQL连接器选择策略

选择合适的MySQL连接器对于应用程序的性能和稳定性至关重要。以下是选择MySQL连接器的策略:

MySQL连接器选择策略:

  • 编程语言匹配:选择与应用程序编程语言匹配的连接器
  • 功能需求:根据功能需求选择支持所需功能的连接器
  • 性能要求:对于高性能要求的应用,选择性能优化的连接器
  • 稳定性:选择经过充分测试和验证的稳定版本
  • 社区支持:选择有良好社区支持和文档的连接器
  • 兼容性:确保连接器与MySQL服务器版本兼容

2.2 MySQL连接器配置优化

MySQL连接器的配置优化对于应用程序的性能和稳定性至关重要。以下是MySQL连接器配置优化的建议:

# MySQL连接器配置优化

1. 连接池配置
– 最大连接数:根据应用并发需求设置
– 最小连接数:根据应用负载设置
– 连接超时:设置合理的连接超时时间
– 空闲连接超时:设置合理的空闲连接超时时间

2. 连接参数优化
– 字符集:设置合适的字符集
– 时区:设置正确的时区
– SSL/TLS:启用SSL/TLS加密连接
– 自动重连:启用自动重连功能

3. 性能参数优化
– 批量操作:使用批量操作提高性能
– 预编译语句:使用预编译语句提高性能
– 结果集缓存:启用结果集缓存提高性能
– 网络缓冲区:设置合适的网络缓冲区大小

4. 错误处理配置
– 重试策略:设置合理的重试策略
– 错误日志:启用错误日志记录
– 异常处理:实现完善的异常处理机制

2.3 MySQL连接器安全配置

MySQL连接器的安全配置对于保护数据库安全至关重要。以下是MySQL连接器安全配置的建议:

# MySQL连接器安全配置

1. 连接安全
– 启用SSL/TLS加密连接
– 使用证书验证服务器身份
– 禁用不安全的连接方式

2. 认证安全
– 使用强密码
– 使用多因素认证
– 定期更换密码

3. 权限控制
– 使用最小权限原则
– 限制用户访问范围
– 定期审计用户权限

4. 数据安全
– 加密敏感数据
– 使用数据脱敏
– 实施数据备份策略

5. 审计日志
– 启用审计日志
– 记录关键操作
– 定期审查审计日志

Part03-生产环境项目实施方案

3.1 MySQL Connector/J安装与配置

MySQL Connector/J是MySQL官方提供的Java连接器,支持JDBC规范。以下是MySQL Connector/J的安装与配置步骤:

# MySQL Connector/J安装与配置

# 步骤1:下载MySQL Connector/J
# 访问MySQL官网下载页面
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-j-8.0.33.tar.gz

# 步骤2:解压安装包
tar -xzf mysql-connector-j-8.0.33.tar.gz
cd mysql-connector-j-8.0.33

# 步骤3:将JAR文件复制到应用程序的类路径
cp mysql-connector-j-8.0.33.jar /path/to/your/application/lib/

# 步骤4:配置JDBC连接
# 在Java代码中配置JDBC连接
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class MySQLConnection {
private static final String URL = “jdbc:mysql://localhost:3306/testdb?useSSL=true&serverTimezone=UTC”;
private static final String USER = “root”;
private static final String PASSWORD = “password”;

public static Connection getConnection() throws SQLException {
return DriverManager.getConnection(URL, USER, PASSWORD);
}
}

# 步骤5:测试连接
# 编译并运行测试程序
javac MySQLConnection.java
java MySQLConnection

# 输出示例:
# Connection successful!
# Connection closed.

# 步骤6:配置连接池(使用HikariCP)
# 添加HikariCP依赖
# 在pom.xml中添加:

com.zaxxer
HikariCP
5.0.1

# 配置HikariCP连接池
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

public class MySQLConnectionPool {
private static HikariDataSource dataSource;

static {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(“jdbc:mysql://localhost:3306/testdb?useSSL=true&serverTimezone=UTC”);
config.setUsername(“root”);
config.setPassword(“password”);
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
dataSource = new HikariDataSource(config);
}

public static Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
}

# 步骤7:验证连接池配置
# 测试连接池
public class TestConnectionPool {
public static void main(String[] args) {
try (Connection conn = MySQLConnectionPool.getConnection()) {
System.out.println(“Connection from pool successful!”);
System.out.println(“Connection valid: ” + conn.isValid(5));
} catch (SQLException e) {
e.printStackTrace();
}
}
}

# 输出示例:
# Connection from pool successful!
# Connection valid: true

3.2 MySQL Connector/Python安装与配置

MySQL Connector/Python是MySQL官方提供的Python连接器,支持Python DB API 2.0规范。以下是MySQL Connector/Python的安装与配置步骤:

# MySQL Connector/Python安装与配置

# 步骤1:安装MySQL Connector/Python
# 使用pip安装
pip install mysql-connector-python

# 输出示例:
# Collecting mysql-connector-python
# Downloading mysql_connector_python-8.0.33-cp39-cp39-manylinux1_x86_64.whl (12.3 MB)
# Installing collected packages: mysql-connector-python
# Successfully installed mysql-connector-python-8.0.33

# 步骤2:验证安装
python -c “import mysql.connector; print(mysql.connector.__version__)”

# 输出示例:
# 8.0.33

# 步骤3:配置MySQL连接
# 创建Python脚本
vim mysql_connection.py

# 添加以下内容:
import mysql.connector
from mysql.connector import Error

def create_connection():
“””创建MySQL数据库连接”””
connection = None
try:
connection = mysql.connector.connect(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′,
collation=’utf8mb4_unicode_ci’,
ssl_ca=’/path/to/ca.pem’,
ssl_cert=’/path/to/client-cert.pem’,
ssl_key=’/path/to/client-key.pem’
)
print(“Connection to MySQL DB successful”)
except Error as e:
print(f”The error ‘{e}’ occurred”)
return connection

def execute_query(connection, query):
“””执行SQL查询”””
cursor = connection.cursor()
try:
cursor.execute(query)
connection.commit()
print(“Query executed successfully”)
except Error as e:
print(f”The error ‘{e}’ occurred”)

def execute_read_query(connection, query):
“””执行SQL查询并返回结果”””
cursor = connection.cursor()
result = None
try:
cursor.execute(query)
result = cursor.fetchall()
return result
except Error as e:
print(f”The error ‘{e}’ occurred”)

# 步骤4:测试连接
# 创建测试脚本
vim test_mysql_connection.py

# 添加以下内容:
from mysql_connection import create_connection, execute_query, execute_read_query

# 创建连接
connection = create_connection()

# 创建表
create_users_table = “””
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT,
name TEXT NOT NULL,
age INT,
PRIMARY KEY (id)
) ENGINE = InnoDB
“””
execute_query(connection, create_users_table)

# 插入数据
insert_user = “INSERT INTO users (name, age) VALUES (‘张三’, 25)”
execute_query(connection, insert_user)

# 查询数据
select_users = “SELECT * FROM users”
users = execute_read_query(connection, select_users)
for user in users:
print(user)

# 输出示例:
# Connection to MySQL DB successful
# Query executed successfully
# Query executed successfully
# (1, ‘张三’, 25)

# 步骤5:配置连接池
# 创建连接池脚本
vim mysql_connection_pool.py

# 添加以下内容:
import mysql.connector
from mysql.connector import pooling

# 创建连接池
connection_pool = pooling.MySQLConnectionPool(
pool_name=”mysql_pool”,
pool_size=5,
pool_reset_session=True,
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
charset=’utf8mb4′
)

# 从连接池获取连接
def get_connection():
return connection_pool.get_connection()

# 测试连接池
for i in range(5):
conn = get_connection()
print(f”Connection {i+1} from pool: {conn.is_connected()}”)
conn.close()

# 输出示例:
# Connection 1 from pool: True
# Connection 2 from pool: True
# Connection 3 from pool: True
# Connection 4 from pool: True
# Connection 5 from pool: True

3.3 MySQL Connector/C安装与配置

MySQL Connector/C是MySQL官方提供的C语言连接器,基于MySQL C API。以下是MySQL Connector/C的安装与配置步骤:

# MySQL Connector/C安装与配置

# 步骤1:安装MySQL Connector/C
# 在CentOS/RHEL上安装
yum install -y mysql-connector-c-devel

# 在Ubuntu/Debian上安装
apt-get install -y libmysqlclient-dev

# 输出示例:
# Installed:
# mysql-connector-c-devel.x86_64 0:8.0.33-1.el7

# 步骤2:验证安装
mysql_config –version

# 输出示例:
# 8.0.33

# 步骤3:编写C程序连接MySQL
# 创建C源文件
vim mysql_connect.c

# 添加以下内容:
#include
#include
#include

int main() {
MYSQL *conn;
MYSQL_RES *res;
MYSQL_ROW row;

// 初始化MySQL连接
conn = mysql_init(NULL);
if (conn == NULL) {
fprintf(stderr, “mysql_init() failed\n”);
return 1;
}

// 连接到MySQL服务器
if (mysql_real_connect(conn, “localhost”, “root”, “password”, “testdb”, 3306, NULL, 0) == NULL)
{
fprintf(stderr, “mysql_real_connect() failed: %s\n”, mysql_error(conn));
mysql_close(conn);
return 1;
}

printf(“Connected to MySQL successfully!\n”);

// 执行SQL查询
if (mysql_query(conn, “SELECT VERSION()”)) {
fprintf(stderr, “SELECT VERSION() failed: %s\n”, mysql_error(conn));
mysql_close(conn);
return 1;
}

// 获取查询结果
res = mysql_store_result(conn);
if (res == NULL) {
fprintf(stderr, “mysql_store_result() failed\n”);
mysql_close(conn);
return 1;
}

// 输出查询结果
while ((row = mysql_fetch_row(res)) != NULL) {
printf(“MySQL version: %s\n”, row[0]);
}

// 释放结果集
mysql_free_result(res);

// 关闭连接
mysql_close(conn);

printf(“Connection closed.\n”);
return 0;
}

# 步骤4:编译C程序
gcc -o mysql_connect mysql_connect.c $(mysql_config –cflags –libs)

# 输出示例:
# (无输出表示编译成功)

# 步骤5:运行程序
./mysql_connect

# 输出示例:
# Connected to MySQL successfully!
# MySQL version: 8.0.33
# Connection closed.

# 步骤6:配置SSL连接
# 修改C程序以支持SSL连接
vim mysql_connect_ssl.c

# 添加以下内容:
#include
#include
#include

int main() {
MYSQL *conn;
MYSQL_RES *res;
MYSQL_ROW row;

// 初始化MySQL连接
conn = mysql_init(NULL);
if (conn == NULL) {
fprintf(stderr, “mysql_init() failed\n”);
return 1;
}

// 配置SSL
mysql_options(conn, MYSQL_OPT_SSL_CA, “/path/to/ca.pem”);
mysql_options(conn, MYSQL_OPT_SSL_CERT, “/path/to/client-cert.pem”);
mysql_options(conn, MYSQL_OPT_SSL_KEY, “/path/to/client-key.pem”);
mysql_options(conn, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, “1”);

// 连接到MySQL服务器
if (mysql_real_connect(conn, “localhost”, “root”, “password”, “testdb”, 3306, NULL,
CLIENT_SSL) == NULL) {
fprintf(stderr, “mysql_real_connect() failed: %s\n”, mysql_error(conn));
mysql_close(conn);
return 1;
}

printf(“Connected to MySQL with SSL successfully!\n”);

// 检查SSL连接状态
const char *cipher = mysql_get_ssl_cipher(conn);
if (cipher != NULL) {
printf(“SSL cipher: %s\n”, cipher);
} else {
printf(“SSL is not enabled.\n”);
}

// 关闭连接
mysql_close(conn);

printf(“Connection closed.\n”);
return 0;
}

# 编译并运行
gcc -o mysql_connect_ssl mysql_connect_ssl.c $(mysql_config –cflags –libs)
./mysql_connect_ssl

# 输出示例:
# Connected to MySQL with SSL successfully!
# SSL cipher: TLS_AES_256_GCM_SHA384
# Connection closed.

Part04-生产案例与实战讲解

4.1 Java应用连接MySQL实战案例

以下是Java应用程序连接MySQL数据库的实战案例,包括连接池配置、事务管理和批量操作:

# Java应用连接MySQL实战案例

# 步骤1:创建Maven项目
mvn archetype:generate -DgroupId=com.example -DartifactId=mysql-demo
-DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

# 输出示例:
# [INFO] —————————————————————————-
# [INFO] Using following parameters for creating project from Old (1.x) Archetype:
maven-archetype-quickstart:1.0
# [INFO] —————————————————————————-
# [INFO] Parameter: groupId, Value: com.example
# [INFO] Parameter: artifactId, Value: mysql-demo
# [INFO] Parameter: packageName, Value: com.example
# [INFO] Parameter: package, Value: com.example
# [INFO] Parameter: version, Value: 1.0-SNAPSHOT
# [INFO] project created from Old (1.x) Archetype in dir: /path/to/mysql-demo
# [INFO] ————————————————————————
# [INFO] BUILD SUCCESS
# [INFO] ————————————————————————

# 步骤2:添加依赖
cd mysql-demo
vim pom.xml

# 添加以下依赖:


mysql
mysql-connector-java
8.0.33


com.zaxxer
HikariCP
5.0.1


org.slf4j
slf4j-simple
2.0.7

# 步骤3:创建数据库连接池类
vim src/main/java/com/example/DatabaseConnection.java

# 添加以下内容:
package com.example;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.SQLException;

public class DatabaseConnection {
private static HikariDataSource dataSource;

static {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(“jdbc:mysql://localhost:3306/testdb?useSSL=true&serverTimezone=UTC&useUnicode=true&characterEncoding=utf8mb4”);
config.setUsername(“root”);
config.setPassword(“password”);
config.setDriverClassName(“com.mysql.cj.jdbc.Driver”);
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
config.setAutoCommit(false);
config.setConnectionTestQuery(“SELECT 1”);
config.addDataSourceProperty(“cachePrepStmts”, “true”);
config.addDataSourceProperty(“prepStmtCacheSize”, “250”);
config.addDataSourceProperty(“prepStmtCacheSqlLimit”, “2048”);
dataSource = new HikariDataSource(config);
}

public static Connection getConnection() throws SQLException {
return dataSource.getConnection();
}

public static void closeDataSource() {
if (dataSource != null) {
dataSource.close();
}
}
}

# 步骤4:创建用户DAO类
vim src/main/java/com/example/UserDAO.java

# 添加以下内容:
package com.example;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class UserDAO {

public void createUser(String name, int age) throws SQLException {
String sql = “INSERT INTO users (name, age) VALUES (?, ?)”;
try (Connection conn = DatabaseConnection.getConnection();
PreparedStatement pstmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
pstmt.setString(1, name);
pstmt.setInt(2, age);
int affectedRows = pstmt.executeUpdate();
if (affectedRows > 0) {
try (ResultSet rs = pstmt.getGeneratedKeys()) {
if (rs.next()) {
System.out.println(“Created user with ID: ” + rs.getInt(1));
}
}
}
conn.commit();
}
}

public List getAllUsers() throws SQLException {
List users = new ArrayList<>();
String sql = “SELECT id, name, age FROM users”;
try (Connection conn = DatabaseConnection.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
while (rs.next()) {
User user = new User();
user.setId(rs.getInt(“id”));
user.setName(rs.getString(“name”));
user.setAge(rs.getInt(“age”));
users.add(user);
}
}
return users;
}

public void batchInsertUsers(List users) throws SQLException {
String sql = “INSERT INTO users (name, age) VALUES (?, ?)”;
try (Connection conn = DatabaseConnection.getConnection();
PreparedStatement pstmt = conn.prepareStatement(sql)) {
for (User user : users) {
pstmt.setString(1, user.getName());
pstmt.setInt(2, user.getAge());
pstmt.addBatch();
}
int[] results = pstmt.executeBatch();
conn.commit();
System.out.println(“Batch insert completed. Rows affected: ” + results.length);
}
}
}

# 步骤5:创建用户实体类
vim src/main/java/com/example/User.java

# 添加以下内容:
package com.example;

public class User {
private int id;
private String name;
private int age;

// Getters and Setters
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }

@Override
public String toString() {
return “User{id=” + id + “, name='” + name + “‘, age=” + age + “}”;
}
}

# 步骤6:创建主程序
vim src/main/java/com/example/App.java

# 添加以下内容:
package com.example;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

public class App {
public static void main(String[] args) {
try {
// 创建用户
UserDAO userDAO = new UserDAO();
userDAO.createUser(“张三”, 25);
userDAO.createUser(“李四”, 30);

// 查询所有用户
List users = userDAO.getAllUsers();
System.out.println(“All users:”);
for (User user : users) {
System.out.println(user);
}

// 批量插入用户
List batchUsers = new ArrayList<>();
for (int i = 1; i <= 10; i++) { User user=new User(); user.setName("用户" + i); user.setAge(20 + i); batchUsers.add(user); } userDAO.batchInsertUsers(batchUsers); // 再次查询所有用户 users=userDAO.getAllUsers(); System.out.println("All users after batch insert:"); for (User user : users) { System.out.println(user); } } catch (SQLException e) { e.printStackTrace(); } finally { DatabaseConnection.closeDataSource(); } } } # 步骤7:编译并运行 mvn clean package java -cp target/mysql-demo-1.0-SNAPSHOT.jar:target/dependency/* com.example.App # 输出示例: # Created user with ID: 1 # Created user with ID: 2 # All users: # User{id=1, name='张三' , age=25} # User{id=2, name='李四' , age=30} # Batch insert completed. Rows affected: 10 # All users after batch insert: # User{id=1, name='张三' , age=25} # User{id=2, name='李四' , age=30} # User{id=3, name='用户1' , age=21} # User{id=4, name='用户2' , age=22} # User{id=5, name='用户3' , age=23} # User{id=6, name='用户4' , age=24} # User{id=7, name='用户5' , age=25} # User{id=8, name='用户6' , age=26} # User{id=9, name='用户7' , age=27} # User{id=10, name='用户8' , age=28} # User{id=11, name='用户9' , age=29} # User{id=12, name='用户10' , age=30}

4.2 Python应用连接MySQL实战案例

以下是Python应用程序连接MySQL数据库的实战案例,包括连接池配置、事务管理和批量操作:

# Python应用连接MySQL实战案例

# 步骤1:安装依赖
pip install mysql-connector-python pandas sqlalchemy

# 输出示例:
# Collecting mysql-connector-python
# Downloading
mysql_connector_python-8.0.33-cp39-cp39-manylinux1_x86_64.whl (12.3 MB)
# Collecting pandas
# Downloading
pandas-2.0.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
(12.3 MB)
# Collecting sqlalchemy
# Downloading
SQLAlchemy-2.0.20-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
(2.7 MB)
# Installing collected packages: mysql-connector-python, pandas,
sqlalchemy
# Successfully installed mysql-connector-python-8.0.33 pandas-2.0.3
sqlalchemy-2.0.20

# 步骤2:创建数据库连接模块
vim db_connection.py

# 添加以下内容:
import mysql.connector
from mysql.connector import pooling
from contextlib import contextmanager
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MySQLConnectionPool:
_pool = None

@classmethod
def initialize_pool(cls, host=’localhost’, port=3306, database=’testdb’,
user=’root’, password=’password’, pool_size=10):
“””初始化连接池”””
cls._pool = pooling.MySQLConnectionPool(
pool_name=”mysql_pool”,
pool_size=pool_size,
pool_reset_session=True,
host=host,
port=port,
database=database,
user=user,
password=password,
charset=’utf8mb4′,
collation=’utf8mb4_unicode_ci’,
autocommit=False
)
logger.info(f”Connection pool initialized with size {pool_size}”)

@classmethod
@contextmanager
def get_connection(cls):
“””获取数据库连接”””
if cls._pool is None:
raise RuntimeError(“Connection pool not initialized”)

conn = None
try:
conn = cls._pool.get_connection()
yield conn
conn.commit()
except Exception as e:
if conn:
conn.rollback()
logger.error(f”Database error: {e}”)
raise
finally:
if conn:
conn.close()

# 步骤3:创建用户DAO类
vim user_dao.py

# 添加以下内容:
from db_connection import MySQLConnectionPool
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)

class UserDAO:

def create_user(self, name: str, age: int) -> int:
“””创建用户”””
sql = “INSERT INTO users (name, age) VALUES (%s, %s)”
with MySQLConnectionPool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, (name, age))
user_id = cursor.lastrowid
logger.info(f”Created user with ID: {user_id}”)
return user_id

def get_all_users(self) -> List[Dict[str, Any]]:
“””获取所有用户”””
sql = “SELECT id, name, age FROM users”
with MySQLConnectionPool.get_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(sql)
users = cursor.fetchall()
return users

def get_user_by_id(self, user_id: int) -> Dict[str, Any]:
“””根据ID获取用户”””
sql = “SELECT id, name, age FROM users WHERE id = %s”
with MySQLConnectionPool.get_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(sql, (user_id,))
user = cursor.fetchone()
return user

def update_user(self, user_id: int, name: str, age: int) -> bool:
“””更新用户”””
sql = “UPDATE users SET name = %s, age = %s WHERE id = %s”
with MySQLConnectionPool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, (name, age, user_id))
affected_rows = cursor.rowcount
logger.info(f”Updated {affected_rows} user(s)”)
return affected_rows > 0

def delete_user(self, user_id: int) -> bool:
“””删除用户”””
sql = “DELETE FROM users WHERE id = %s”
with MySQLConnectionPool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, (user_id,))
affected_rows = cursor.rowcount
logger.info(f”Deleted {affected_rows} user(s)”)
return affected_rows > 0

def batch_insert_users(self, users: List[Dict[str, Any]]) -> int:
“””批量插入用户”””
sql = “INSERT INTO users (name, age) VALUES (%s, %s)”
with MySQLConnectionPool.get_connection() as conn:
cursor = conn.cursor()
values = [(user[‘name’], user[‘age’]) for user in users]
cursor.executemany(sql, values)
affected_rows = cursor.rowcount
logger.info(f”Batch inserted {affected_rows} user(s)”)
return affected_rows

# 步骤4:创建主程序
vim main.py

# 添加以下内容:
from db_connection import MySQLConnectionPool
from user_dao import UserDAO
import logging

logging.basicConfig(level=logging.INFO)

def main():
# 初始化连接池
MySQLConnectionPool.initialize_pool(
host=’localhost’,
port=3306,
database=’testdb’,
user=’root’,
password=’password’,
pool_size=10
)

# 创建用户DAO实例
user_dao = UserDAO()

# 创建用户
user_id1 = user_dao.create_user(“张三”, 25)
user_id2 = user_dao.create_user(“李四”, 30)

# 查询所有用户
users = user_dao.get_all_users()
print(“All users:”)
for user in users:
print(f” {user}”)

# 根据ID查询用户
user = user_dao.get_user_by_id(user_id1)
print(f”User with ID {user_id1}: {user}”)

# 更新用户
user_dao.update_user(user_id1, “张三丰”, 100)
user = user_dao.get_user_by_id(user_id1)
print(f”Updated user: {user}”)

# 批量插入用户
batch_users = [
{‘name’: f’用户{i}’, ‘age’: 20 + i}
for i in range(1, 11)
]
user_dao.batch_insert_users(batch_users)

# 再次查询所有用户
users = user_dao.get_all_users()
print(“All users after batch insert:”)
for user in users:
print(f” {user}”)

# 删除用户
user_dao.delete_user(user_id2)

# 最终查询所有用户
users = user_dao.get_all_users()
print(“All users after deletion:”)
for user in users:
print(f” {user}”)

if __name__ == “__main__”:
main()

# 步骤5:运行程序
python main.py

# 输出示例:
# INFO:db_connection:Connection pool initialized with size 10
# INFO:user_dao:Created user with ID: 1
# INFO:user_dao:Created user with ID: 2
# All users:
# {‘id’: 1, ‘name’: ‘张三’, ‘age’: 25}
# {‘id’: 2, ‘name’: ‘李四’, ‘age’: 30}
# User with ID 1: {‘id’: 1, ‘name’: ‘张三’, ‘age’: 25}

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息