# -*- coding: utf-8 -*- # @Author: Gree # @Date: 2021-05-31 18:26:52 # @Last Modified by: Gree # @Last Modified time: 2021-06-01 14:19:52 import os import re import configparser from datetime import datetime import pymysql class ConnSql(object): """ 1. 构建一个连接数据库的父类:读取配置文件,连接数据库。 """ def __init__(self): """ __init__ 函数: input: output: features: 定义初始变量 step1: 定义 self.dirName、self.fileName、self.cf 等对象 """ # 获取 self.dirName self.dirName = os.path.split(os.path.realpath(__file__))[0] # 获取 self.fileName self.fileName = os.path.join(self.dirName, "sql.conf") # 生成 cf 对象 self.cf = configparser.ConfigParser() def conn_sql(self): """ conn_sql 函数: input: output: conn features: 读取 sql.conf 配置文件,连接数据库 step1: 读取 sql.conf 配置文件,并利用相关信息,连接数据库 step2: 返回 conn 对象 """ # 读取 sql.conf 配置文件 self.cf.read(self.fileName) # 异常捕获 try: # 连接数据库 conn = pymysql.connect( host = str(self.cf.get("log_on", "host")), port = int(self.cf.get("log_on", "port")), user = str(self.cf.get("log_on", "user")), passwd = str(self.cf.get("log_on", 'passwd')), db = str(self.cf.get("log_on", "db")) ) # 输出 log 信息 print("Database connection is successful!") # 返回 conn 对象 return conn except Exception as e: print("The error of conn_sql():", e) class DbRun(ConnSql): """ 1. 对于父类 ConnSql 的继承,继承了父类的属性和方法,并拥有自己的方法。 2. DbRun 类主要进行数据库的操作。 """ def table_exists(self, table_name, sql): """ table_exists 函数: input: table_name, sql output: True/False features: 该方法用来判断我们所要的表是否存在当前数据库中 step1: 判断我们所要的表是否存在当前数据库中 step2: return True/False """ # 输出进入模块的 log 信息 print("Loading the module of table_exists ...") # 调用父类方法获取 conn 对象 conn = self.conn_sql() # 捕获异常 try: # 构建 cursor 对象 with conn.cursor() as cursor: # 执行 sql 语句 cursor.execute(sql) # 获取 tables tables = [cursor.fetchall()] # 获取 table_list table_list = re.findall('(\'.*?\')', str(tables)) # 数据清洗 table_list = [re.sub("'",'',each) for each in table_list] # 条件判断 if table_name in table_list: # 输出 log 信息 print("The table of %s is exists!" % table_name) # 存在返回 True return True else: # 输出 log 信息 print("The table of %s is not exists!" % table_name) # 不存在返回 False return False except Exception as e: print("The error of table_exists:", e) finally: # 关闭 conn conn.close() # 输出 log 信息 print("The dealing of table_exists finished!") def new_table(self, table_name, sql): """ new_table 函数: input: table_name, sql output: features: 在 mysql 新建一张数据表 step1: 在 mysql 新建一张数据表 """ # 输出进入模块的 log 信息 print("Loading the module of new_table ...") # 调用父类方法获取 conn 对象 conn = self.conn_sql() # 捕获异常 try: """ 1. sql 语句新建一个所需要字段的 data sheet。 """ # 构建 cursor 对象 with conn.cursor() as cursor: # 执行 sql 语句 cursor.execute(sql) # 事务的手动提交 conn.commit() # 输出执行 sql 语句的 log 信息 print("Data sheet of %s is established!" % table_name) except Exception as e: # 输出 log 信息 print("The error of new_table:", e) # 事务回滚 conn.rollback() finally: # 关闭 conn conn.close() # 输出 log 信息 print("The dealing of new_table finished!") def initial_data(self, sql): """ initial_data 函数: input: sql output: result features: 获取需要进行自动化标注的最原始数据 step1: 从相关数据表中获取相应数据 """ # 输出 log 信息 print("Loading the module of initial_data ...") # 调用父类方法获取 conn 对象 conn = self.conn_sql() # 进行异常捕获 try: # 构建 cursor 对象 with conn.cursor() as cursor: # 执行 sql cursor.execute(sql) # 获取数据 result = cursor.fetchall() except Exception as e: print("The error of initial_data(): ", e) else: # 输出 log 日志 print("Get initial data successfully!") # 返回 result return result finally: # 关闭 conn conn.close() # 输出 log 信息 print("Exiting the module of initial_data ...") def insert_data(self, sql, df): """ insert_data 函数: input: sql, df output: 数据插入 mysql features: 将自动化分类的最终数据插入相应的数据表中 step1: 数据插入 """ # 输出 log 信息 print("Loading the module of insert_data ...") # 获取插入 sql 语句的各个字段,用于批量插入数据 sql_lst = [] # 进行异常捕获 try: # 遍历获取待插入数据 for i in range(0, len(df)): # 将时间类型转为字符串 time = datetime.strftime(df.iloc[i, 0], "%Y-%m-%d %H:%M:%S") # truple sql_truple = (time, df.iloc[i, 1], df.iloc[i, 2], df.iloc[i, 3], df.iloc[i, 4], df.iloc[i, 5], df.iloc[i, 6], df.iloc[i, 7], df.iloc[i, 8], df.iloc[i, 9], df.iloc[i, 10]) # 列表添加元素 sql_lst.append(sql_truple) except Exception as e: print("The error of getting sql_lst: ", e) # 进行异常捕获 try: # sql 语句 # sql = "INSERT INTO " + table_name + " (date_time, request_id, mac_wifi, user_id, query, domain, intent, response_text, domain_is_right, intent_is_right, response_is_right) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" # 调用父类方法获取 conn 对象 conn = self.conn_sql() # 条件判断 if len(sql_lst) > 100000: # 获取 num num = len(sql_lst) // 100000 # 定义 start start = 0 # 定义 end end = 0 # for 循环 for i in range(0, num): # 累加 end += 100000 # 切片操作 insert_data = sql_lst[start:end] # 输出 log 信息 print("One hundred thousand data is inserting ...") # ping()方法,该方法默认的有个 reconnect 参数,默认是 True,如果失去连接了会重连 conn.ping() # 获取 cursor cursor = conn.cursor() # 批量插入数据 cursor.executemany(sql, insert_data) # 事务的手动提交 conn.commit() # 累加 start += 100000 # 条件判断 if end < len(sql_lst): # 切片操作 insert_data = sql_lst[end:len(sql_lst)] # 输出 log 信息 print("The last part of data is inserting ...") # ping()方法,该方法默认的有个 reconnect 参数,默认是 True,如果失去连接了会重连 conn.ping() # 获取 cursor cursor = conn.cursor() # 批量插入数据 cursor.executemany(sql, insert_data) # 事务的手动提交 conn.commit() else: pass else: # 输出 log 信息 print("Data is inserting ...") # ping()方法,该方法默认的有个 reconnect 参数,默认是 True,如果失去连接了会重连 conn.ping() # 获取 cursor cursor = conn.cursor() # 批量插入数据 cursor.executemany(sql, sql_lst) # 事务的手动提交 conn.commit() # 输出 log 日志 print('The inserting of data is finished!') except Exception as e: print("The error of insert_data(): ", e) finally: # 关闭 conn conn.close() # 输出 log 信息 print("Exiting the module of insert_data ...")