# -*- coding: utf-8 -*- # @Author: Gree # @Date: 2020-12-17 15:42:46 # @Last Modified by: StudentCWZ # @Last Modified time: 2020-12-31 08:44:57 import pandas as pd import configparser import conn_sql as cs import get_time as gt import duplicate_check as dc import data_deal as dd import airconditioner_check as ac import play_control_check as pcc import universal_control_check as ucc import weather_check as wc import fm_check as fc import holiday_check as hc import music_check as mc import news_check as nc import ancient_poem_check as apc import science_check as sc import sports_check as spc import stocks_check as stc import translate_check as tc import chat_check as cc import global_control_check as gcc def IntegrateData(df): """ 1. IntergrateData函数用来整合每个自动化分类检查的函数返回的item。 2. 获取整合数据后的DataFrame。 """ result_lst = [] # 新建一个空列表用来接收自动化分类检查各个类别返回的生成器中的item。 for item in ac.AirconditionerCheck(df): result_lst.append(item) for item in apc.AncientPoemCheck(df): result_lst.append(item) for item in fc.FmCheck(df): result_lst.append(item) for item in hc.HolidayCheck(df): result_lst.append(item) for item in mc.MusicCheck(df): result_lst.append(item) for item in nc.NewsCheck(df): result_lst.append(item) for item in pcc.PlayControlCheck(df): result_lst.append(item) for item in sc.ScienceCheck(df): result_lst.append(item) for item in spc.SportsCheck(df): result_lst.append(item) for item in stc.StockCheck(df): result_lst.append(item) for item in tc.TranslateCheck(df): result_lst.append(item) for item in ucc.UniversalControlCheck(df): result_lst.append(item) for item in wc.WeatherCheck(df): result_lst.append(item) for item in gcc.GlobalControlCheck(df): result_lst.append(item) df = pd.DataFrame(result_lst, columns=['date_time', 'request_id', 'mac_wifi', 'user_id', 'query', 'domain', 'intent', 'response_text', 'domain_is_right', 'intent_is_right', 'response_is_right']) df = df.astype(object).where(pd.notnull(df), '') df = df.sort_values(["date_time"], ascending=True) print('The dimension of final dataframe: ', end='') print(df.shape) # 输出当前数据框的维度 return df def main(): GetTime = gt.GetTime() # 实例化时间操作类 time_before, time_now = GetTime.get_time(days=1) DbRun = cs.DbRun() # 实例化数据库操作类 """ cf = configparser.ConfigParser() cf.read("sql.conf") # 读取 sql.conf 配置文件 initial_table = str(cf.get("table_names", "initial_table")) # 获取 sql.conf 中的 table_names 中的信息 insert_table = str(cf.get("table_names", "insert_table")) # 获取 sql.conf 中的 table_names 中的信息 """ initial_table = "ctoc_tb" # 原始数据表 insert_table = "final_info" # 要插入数据表 # print("Get table_names of sql.conf") # 测试 table_exists_value = DbRun.table_exists(initial_table) # 判断 initial_table 是否存在,如果 initial_table 存在,则返回1,否则返回0。 if table_exists_value == 1: initial_data = DbRun.initial_data(initial_table, time_before, time_now) # 如果 initial_table 存在,获取 initial_table 中相关字段的信息。 else: print("Error: Get initial_data!") # 如果 initial_table 不存在,获取 initial_table 中相关字段的信息失败。 DataDeal = dd.DataDeal() # 实例化一个数据框操作类 initial_df = DataDeal.initial_df(initial_data) # 将获取到的 initial_data 数据进行数据框操作,得到原始的 initial_df 数据框。 table_exists_value = DbRun.table_exists(insert_table) # 判断 insert_table 是否存在,如果 insert_table 存在,则返回1,否则返回0。 insert_table 表是用来插入最终数据。 if table_exists_value == 1: contrast_data = DbRun.contrast_data(insert_table) # 如果 insert_table 存在,获取 insert_table 中相关字段的信息。 output_df = IntegrateData(initial_df) # 将原始的 initial_df 数据框中的数据进行自动化分类检查,返回一个 output_df 数据框。 DbRun.insert_data(insert_table, output_df) # 将 output_df 数据框中数据批量插入 insert_table 。 else: DbRun.new_table(insert_table) # 如果 insert_table 不存在,则新建 insert_table 。 output_df = IntegrateData(initial_df) DbRun.insert_data(insert_table, output_df) # DuplicateCheck = dc.DuplicateCheck() # input_df = DuplicateCheck.duplicate_check(contrast_data, initial_df) # output_df = IntegrateData(initial_df) # DbRun.insert_data(output_df) def job(): """ 1. 设置定时任务 """ main() # 运行主函数 if __name__ == '__main__': main() """ schedule.every(1).days.do(job) # 设置定时区间 while True: schedule.run_pending() sec = schedule.idle_seconds() time.sleep(sec) """