Python优雅地操作Oracle数据库

方案一:
直接套用脚本,需可以看懂一些脚本逻辑

改代码中 类名为 OracleConnector,它可以同时连接多个 Oracle 数据库,并提供执行增删改查操作的方法。

import cx_Oracle

class OracleConnector:
    def __init__(self, databases):
        self.connections = {}
        for db in databases:
            conn = cx_Oracle.connect(
                user=db['username'],
                password=db['password'],
                dsn=db['dsn'],
                encoding=db.get('encoding', 'UTF-8')
            )
            self.connections[db['name']] = conn

    def execute_query(self, database_name, query, params=None):
        with self.connections[database_name].cursor() as cursor:
            cursor.execute(query, params)
            result = cursor.fetchall()
        return result
    
    def execute_non_query(self, database_name, query, params=None):
        with self.connections[database_name].cursor() as cursor:
            cursor.execute(query, params)
            self.connections[database_name].commit()
            return cursor.rowcount
        
    def execute_query_all(self, databases, query, params=None):
        results = {}
        for db in databases:
            database_name = db['name']
            results[database_name] = self.execute_query(database_name, query, params)
        return results
    
    def execute_non_query_all(self, databases, query, params=None):
        results = {}
        for db in databases:
            database_name = db['name']
            try:
                res = self.execute_non_query(database_name, query, params)
                results[database_name] = res
            except cx_Oracle.DatabaseError as e:
                print(f"Error occurred when running query on {database_name}: {e}")
                self.connections[database_name].rollback()
        return results

以下是代码解释:

import cx_Oracle:导入 cx_Oracle 库,用于与 Oracle 数据库进行连接和交互。

class OracleConnector::定义一个名为 OracleConnector 的类。

def __init__(self, databases)::类的初始化方法,接受一个包含多个数据库信息的列表作为参数。

self.connections = {}:初始化一个字典,用于存储数据库连接。

for db in databases::遍历数据库列表中的每个数据库信息。

conn = cx_Oracle.connect(...):根据数据库信息创建数据库连接对象。user=db['username']:连接数据库的用户名。password=db['password']:连接数据库的密码。dsn=db['dsn']:Oracle 数据库的 DSN(数据源名称)。encoding=db.get('encoding', 'UTF-8'):可选的编码方式,默认为 UTF-8。

self.connections[db['name']] = conn:将数据库连接对象添加到字典中,以数据库名称作为键。

def execute_query(self, database_name, query, params=None)::执行查询语句的方法。database_name:数据库名称。query:要执行的查询语句。params=None:可选的查询参数。

with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。

cursor.execute(query, params):执行查询语句。

result = cursor.fetchall():获取查询结果集。

return result:返回查询结果。

def execute_non_query(self, database_name, query, params=None)::执行非查询语句的方法,比如插入、更新和删除操作。

database_name:数据库名称。

query:要执行的非查询语句。

params=None:可选的参数。

with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。

cursor.execute(query, params):执行非查询语句,比如插入、更新和删除操作。

self.connections[database_name].commit():提交事务,将修改操作永久保存到数据库中。

return cursor.rowcount:返回受影响的行数,即执行非查询操作后所影响的行数。

def execute_query_all(self, databases, query, params=None)::在多个数据库上执行相同的查询语句,并返回结果。

databases:要执行查询的数据库列表。

query:要执行的查询语句。

params=None:可选的查询参数。

results = {}:初始化一个字典,用于存储查询结果。

for db in databases::遍历数据库列表中的每个数据库。

database_name = db['name']:获取数据库名称。

results[database_name] = self.execute_query(database_name, query, params):执行查询,并将结果存储到字典中。

return results:返回包含查询结果的字典。

def execute_non_query_all(self, databases, query, params=None)::在多个数据库上执行相同的非查询语句,并返回结果。

databases:要执行非查询操作的数据库列表。

query:要执行的非查询语句。

params=None:可选的参数。

results = {}:初始化一个字典,用于存储操作结果。

for db in databases::遍历数据库列表中的每个数据库。

database_name = db['name']:获取数据库名称。

try::捕获可能发生的异常。

res = self.execute_non_query(database_name, query, params):执行非查询操作。

results[database_name] = res:将操作结果存储到字典中。

except cx_Oracle.DatabaseError as e::捕获 Oracle 数据库相关的异常。

print(f"Error occurred when running query on {database_name}: {e}"):打印出错信息。

self.connections[database_name].rollback():执行回滚操作,撤销之前的操作。

return results:返回包含操作结果的字典。

这个封装类使得可以同时连接多个 Oracle 数据库,并在它们上执行增删改查操作。而在执行非查询操作时,如果遇到错误,将会进行回滚操作,保证数据的一致性。

方案二:直接调用封装脚本(写用例,执行脚本即可)
脚本实现封装后,只需要在Oracle.yaml文件中写用例即可,此后执行Oracle.py脚本即实现数据库的增删改查操作

目录介绍:

Oracle.yaml 编写基本数据库信息

Orcale:
  username: user
  password:  pwd
  ip: 127.0.0.1
  port: 1521
  sid: orcl

PublicConfig.py脚本: 配置读取信息,方便调用

import os
from Public_Utils.util_yaml import YamlReader

class YamlPath:
    def __init__(self):
        current = os.path.abspath(__file__)
        self.base_dir = os.path.dirname(os.path.dirname(current))
        self._config_path = self.base_dir + os.sep + "Public_Config\Public_yaml"

    def get_oracle_file(self):
        self._config_file = self._config_path + os.sep + "Oracle.yaml"
        return self._config_file

class ConfigYaml:
    def __init__(self):   #初始yaml读取配置文件
        self.oracle_config = YamlReader(YamlPath().get_oracle_file()).yaml_data()

    def get_oracle_yaml(self):
        return self.oracle_config['Orcale']

if __name__ == '__main__':
    pass

Oracle.py执行脚本

#  coding=utf-8
import cx_Oracle
import os
import json
from Public_Config.PublicConfig import ConfigYaml
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'

class Oracle:

    def __init__(self):
        user = ConfigYaml().get_oracle_yaml()['username']
        pwd = ConfigYaml().get_oracle_yaml()['password']
        ip = ConfigYaml().get_oracle_yaml()['ip']
        port = ConfigYaml().get_oracle_yaml()['port']
        sid = ConfigYaml().get_oracle_yaml()['sid']
        self.connect = cx_Oracle.connect(str(user) + "/" + str(pwd) + "@" + str(ip) + ":" + str(port) + "/" + str(sid))
        # self.connect = cx_Oracle.connect(f'{user}/{pwd}@{ip}:{port}/{sid}')   # 这里的顺序是 用户名/密码@oracleserver的ip地址/数据库名字
        self.cursor = self.connect.cursor()   # 使用cursor()方法获取操作游标

    def GetData(self,sql):
        self.cursor.execute(sql)
        # 使用Rowfactory更改查询结果,更直观查看数据
        columns = [col[0] for col in self.cursor.description]
        self.cursor.rowfactory = lambda *args: dict(zip(columns, args))
        # fetchall()一次取完所有结果
        # fetchone()一次取一行结果
        data = self.cursor.fetchall()
        return data

    def select(self, sql):   #查询
        list = []
        self.cursor.execute(sql)            # 使用execute方法执行SQL语句
        result = self.cursor.fetchall()     # fetchall()一次取完所有结果,fetchone()一次取一行结果
        col_name = self.cursor.description
        for row in result:
            dict = {}
            for col in range(len(col_name)):
                key = col_name[col][0]
                value = row[col]
                dict[key] = value
            list.append(dict)
        js = json.dumps(list, ensure_ascii=False, indent=2, separators=(',', ':'))
        '''json.dumps() 是把python对象转换成json对象的一个过程,生成的是字符串
           json.dump() 是把python对象转换成json对象生成一个fp的文件流,和文件相关'''
        return js #将结果返回为一个字符串

    def selectlist(self,sql):
        list = []
        self.cursor.execute(sql)
        result = self.cursor.fetchall()
        col_name = self.cursor.description
        for row in result:
            dict = {}
            for col in range(len(col_name)):
                key = col_name[col][0]
                value = row[col]
                dict[key] = value
            list.append(dict)
        return list   #将结果以列表返回

    def disconnect(self):   #未连接
        self.cursor.close()
        self.connect.close()

    def insert(self, sql, list_param):   #插入
        try:
            self.cursor.executemany(sql, list_param)
            self.connect.commit()
            print("插入ok")
        except Exception as e:
            print(e)
        finally:
            self.disconnect()

    def update(self, sql):   #更新
        try:
            self.cursor.execute(sql)
            self.connect.commit()
        except Exception as e:
            print(e)
        finally:
            self.disconnect()

    def delete(self, sql):   #删除
        try:
            self.cursor.execute(sql)
            self.connect.commit()
            print("delete ok")
        except Exception as e:
            print(e)
        finally:
            self.disconnect()

if __name__ == '__main__':
    print(Oracle().select(sql="select * from cifaccount a where a.cif_account ='310400009590' "))
    pass

util_yaml.py

import os
import yaml

class YamlReader:
    #初始化,判断文件是否存在
    def __init__(self,yaml_file):
        if os.path.exists(yaml_file):
            self.yaml_file = yaml_file
        else:
            raise FileNotFoundError("yaml文件不存在")
        self._data = None
        self._data_all = None

    def yaml_data(self):  #yaml文件读取 --单个文档读取
        #第一次调用data,读取yaml文档,如果不是,直接返回之前保存的数据
        if not self._data:
            with open(self.yaml_file,'rb') as f:
                self._data = yaml.safe_load(f)
        return self._data

    def yaml_data_all(self):  #多个文档的读取
        if not self._data_all:
            with open(self.yaml_file,'rb') as f:
                self._data_all = yaml.safe_load_all(f)
        return self._data_all

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
心中带点小风骚的头像心中带点小风骚普通用户
上一篇 2023年11月14日
下一篇 2023年11月14日

相关推荐