#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/11/20 09:57 # @File : log.py # @Description : """ from loguru import logger from application.libs.helper import MySQLHelper from application.models import Log from application.utils import ElasticsearchUtil, ParseUtil class LogDao: """ Data Access Object for logs. This class includes methods to get data from Elasticsearch, process it and save it to MySQL. """ @classmethod def get_data_from_es(cls, index=None, dsl=None, sid=None) -> dict: """ Get data from Elasticsearch by either scroll searching or direct searching. Args: index (str): The name of the Elasticsearch index. dsl (dict): The DSL query for Elasticsearch. sid (str): The scroll id for Elasticsearch scroll search. Returns: dict: The data returned from Elasticsearch. Raises: SystemError: If none of the required parameters are provided. """ if sid is not None: return ElasticsearchUtil.scroll_search(sid) elif index is not None and dsl is not None: return ElasticsearchUtil.search(index, dsl) else: raise SystemError('Could not get data from Elasticsearch') @classmethod def get_mdata(cls, data: dict) -> list: """ Get metadata from the data returned by Elasticsearch. Args: data (dict): The data returned from Elasticsearch. Returns: list: The metadata extracted from the data. Raises: SystemError: If the metadata is empty. """ mdata = data.get('hits').get('hits') if not mdata: logger.error('the mdata is an empty list ...') raise SystemError('the mdata is an empty list ...') return mdata @classmethod def get_intent_from_mysql(cls, sql: str, cfg: dict) -> list: """ Get the intent mapping from MySQL using the provided SQL. Args: sql (str): The SQL query to execute. cfg (dict): The configuration for MySQL. Returns: list: The intent mapping list. """ with MySQLHelper(**cfg) as helper: result = helper.execute(sql) mapping_list = [item[0] for item in result] return mapping_list @classmethod def process_and_save_data(cls, lst: list, mapping_list: list): """ Process the given list using the mapping list and save the result to the database. Args: lst (list): The list to process. mapping_list (list): The mapping list to use for processing. """ if not lst: return result_generator = ParseUtil(mapping_list=mapping_list).filter(lst) _ = Log.batch_save(result_generator) @classmethod def parse(cls, start: str, end: str, index: str, sql: str, cfg: dict) -> int: """ Parse logs from Elasticsearch and save them to MySQL. Args: start (str): The start date for the logs. end (str): The end date for the logs. index (str): The Elasticsearch index to get logs from. sql (str): The SQL query to get the intent mapping from MySQL. cfg (dict): The configuration for MySQL. Returns: int: The total number of logs parsed. Raises: SystemError: If there is an error during the process. """ # Get the DSL for the given start and end dates. dsl = ElasticsearchUtil.dsl(start, end) # Get data from Elasticsearch. data = cls.get_data_from_es(index=index, dsl=dsl) # Extract metadata from the data. mdata = cls.get_mdata(data) # Get the total number of logs. total = data.get('hits').get('total').get('value') logger.debug(f'The numbers of data by searching data from ES: {total}') # Log the start of the searching and saving process. logger.debug('The data is inserting ...') # Get the intent mapping from MySQL. mapping_list = cls.get_intent_from_mysql(sql, cfg) # Process and save the metadata. cls.process_and_save_data(mdata, mapping_list) # Get the scroll id for scroll searching in Elasticsearch. scroll_id = data.get('_scroll_id') try: for _ in range(0, int(total / dsl.get('size', 10000) + 1)): # Get more data from Elasticsearch using scroll searching. res = cls.get_data_from_es(sid=scroll_id) lst = res.get('hits').get('hits') if not lst: continue # Process and save the data. cls.process_and_save_data(lst, mapping_list) except Exception as e: # Log the error and raise a SystemError. logger.error(f'The error: {e}') raise SystemError() else: # Log the success of the process. logger.debug('The process of inserting data succeed!') finally: # Log the end of the process. logger.debug('The inserting of the data finished!') return total