Python操作mysql

Python操作mysql

一、python对MySQL的基本操作

1.连接数据
import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='root123', charset='utf8')
cursor = conn.cursor()
2.创建数据库
cursor.execute('create database db01 default charset utf8 collate utf8_general_ci')
conn.commit()
3.进入数据库创建表
cursor.execute('use db01')
sql_create_table_l01= """
create table l01(
    id int not null primary key auto_increment,
    name varchar(32) not null 
)default charset=utf8;
"""
cursor.execute(sql_create_table_l01)
conn.commit()
4. 查看数据库中的表
cursor.execute('show tables')
result = cursor.fetchall()
print(result)
5.新增数据
cursor.execute("insert into l01(id,name) values('1','流水')")
conn.commit()
6.删除
cursor.execute("delete from l01 where id=1")
conn.commit()
7.修改数据
cursor.execute("update tb1 set name='xx' where id=1")
conn.commit()
8.查询数据
cursor.execute("select * from tb where id>10")
data = cursor.fetchone() # cursor.fetchall()
print(data)
9.关闭数据库
cursor.close()
conn.close()
10.执行存储过程
import pymysql

conn = pymysql.connect(host='', port=3306, user='', password='', charset=utf8)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

# 执行存储过程
cursor.callproc('存储过程名称')
result = cursor.fetchall()  # 得到执行存储过中的结果集 

# 执行带参数存储过程
cursor.callproc('存储过程名称', args=('参数1','参数2', '参数3'))
# 获取执行完存储的参数
cursor.execute('select @_名称_0')
result = cursor.fetchall()  # {@_名称_0: 参数} 


cursor.close()
conn.close()

print(result)

其他略

二、python操作MySQL的应用

1.登录注册
import pymysql

def register():
    print("用户注册")

    user = input("请输入用户名:")
    password = input("请输入密码:")

    # 连接指定数据
    conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb")
    cursor = conn.cursor()
    sql = 'insert into users(name,password) values("{}","{}")'.format(user, password)
    
    cursor.execute(sql)
    conn.commit()

    # 关闭数据库连接
    cursor.close()
    conn.close()

    print("注册成功,用户名:{},密码:{}".format(user, password))


def login():
    print("用户登录")

    user = input("请输入用户名:")
    password = input("请输入密码:")

    # 连接指定数据
    conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb")
    cursor = conn.cursor()
    sql = "select * from users where name='{}' and password='{}'".format(user, password)
    cursor.execute(sql)
    result = cursor.fetchone() # 去向mysql获取结果
    # 关闭数据库连接
    cursor.close()
    conn.close()

    if result:
        print("登录成功", result)
    else:
        print("登录失败")


def run():
    choice = input("1.注册;2.登录")
    if choice == '1':
        register()
    elif choice == '2':
        login()
    else:
        print("输入错误")


if __name__ == '__main__':
    run()
2.防止sql注入
1.注入示例
import pymysql

# 输入用户名和密码
user = input("请输入用户名:") # ' or 1=1 -- 
pwd = input("请输入密码:") # 123


conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8",db='usersdb')
cursor = conn.cursor()

# 基于字符串格式化来 拼接SQL语句
# sql = "select * from users where name='liu' and password='123'"
# sql = "select * from users where name='' or 1=1 -- ' and password='123'"
sql = "select * from users where name='{}' and password='{}'".format(user, pwd)
cursor.execute(sql)

result = cursor.fetchone()
print(result) # None,不是None

cursor.close()
conn.close()

如果用户在输入user时,输入了: ’ or 1=1 – ,这样即使用户输入的密码不存在,也会可以通过验证。
原因:
在SQL拼接时,拼接后的结果是:

select * from users where name='' or 1=1 -- ' and password='123'

注意: 在mysql中 – 表示注释
切记, sql语句不要在使用Python的字符串格式化, 而是用pymysql的execute方法

2.防止sql注入示例
import pymysql

# 输入用户名和密码
user_name = input('请输入用户名: ')
user_pwd = input('请输入密码: ')

conn = pymysql.connect(host='', password=3306, user='', passwd='', charset='utf8',db='userdb')
cursor = conn.cursor()

cursor.execute('select * from users name=s% and password=s%',[user_name, user_pwd])
# 或者
cursor.execute('select * from users where name=%(n1)s and password=%(n2)s',{'n1':user_name, 'n2':user_pwd})

result = cursor.fetchone()
print(result)

cursor.close()
conn.close()
3.数据库连接池

在操作数控是需要使用数据库连接池

import pymysql
import threading

from dbutils.pooled_db import PooledDB

MYSQL_DB_POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=3,  # 链接池中最多闲置的链接,0和None不限制
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。
    # 如:0 = None = never, 1 = default = whenever it is requested, 
    # 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='',
    port=3306,
    user='',
    password='',
    database='',
    charset=''
)

def task():
    # 去连接池获取一个连接
    conn = MYSQL_DB_POOL()
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    
    cursor.execut('select sleep(2)')
    result = cursor.fetchall()
    print(result)
    cursor.close()
    conn.close()

def run():
    for i in range(10):
        t = threading.Thread(target=task)
        t.start()


if __name__ == '__main__':
    run()
4.sql工具类

基于数据库连接池开发一个功能sql操作类,方便以后操作数据库

  • 单列和方法
import pymysql
from dbutils.pooled_db import PooledDB


class DBHelper(object):

    def __init__(self):
        # TODO 此处配置,可以去配置文件中读取。
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            maxcached=3,  # 链接池中最多闲置的链接,0和None不限制
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
            ping=0,
            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='root123',
            database='userdb',
            charset='utf8'
        )

    def get_conn_cursor(self):
        conn = self.pool.connection()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        return conn, cursor

    def close_conn_cursor(self, *args):
        for item in args:
            item.close()

    def exec(self, sql, **kwargs):
        conn, cursor = self.get_conn_cursor()

        cursor.execute(sql, kwargs)
        conn.commit()

        self.close_conn_cursor(conn, cursor)

    def fetch_one(self, sql, **kwargs):
        conn, cursor = self.get_conn_cursor()

        cursor.execute(sql, kwargs)
        result = cursor.fetchone()

        self.close_conn_cursor(conn, cursor)
        return result

    def fetch_all(self, sql, **kwargs):
        conn, cursor = self.get_conn_cursor()

        cursor.execute(sql, kwargs)
        result = cursor.fetchall()

        self.close_conn_cursor(conn, cursor)

        return result

db = DBHelper()
from db import db

db.exec("insert into d1(name) values(%(name)s)", name="Kevin")

ret = db.fetch_one("select * from d1")
print(ret)

ret = db.fetch_one("select * from d1 where id=%(nid)s", nid=3)
print(ret)

ret = db.fetch_all("select * from d1")
print(ret)

ret = db.fetch_all("select * from d1 where id>%(nid)s", nid=2)
print(ret)
  • 上下文管理
import threading
import pymysql
from dbutils.pooled_db import PooledDB


POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=3,  # 链接池中最多闲置的链接,0和None不限制
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    host='127.0.0.1',
    port=3306,
    user='root',
    password='root123',
    database='userdb',
    charset='utf8'
)


class Connect(object):
    def __init__(self):
        self.conn = conn = POOL.connection()
        self.cursor = conn.cursor(pymysql.cursors.DictCursor)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()

    def exec(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        self.conn.commit()

    def fetch_one(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchone()
        return result

    def fetch_all(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchall()
        return result
from db_context import Connect

with Connect() as obj:
    # print(obj.conn)
    # print(obj.cursor)
    ret = obj.fetch_one("select * from d1")
    print(ret)

    ret = obj.fetch_one("select * from d1 where id=%(id)s", id=3)
    print(ret)

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
xiaoxingxing的头像xiaoxingxing管理团队
上一篇 2023年11月30日
下一篇 2023年11月30日

相关推荐