diff --git a/application/dao/__init__.py b/application/dao/__init__.py index 7aa926b858d05a405635493e8ce47bb4bd7f8ac2..2e9edd7553462cf0600d98e21effc9899e117a26 100644 --- a/application/dao/__init__.py +++ b/application/dao/__init__.py @@ -11,3 +11,4 @@ """ from .user import UserDao +from .log import LogDao diff --git a/application/dao/log.py b/application/dao/log.py new file mode 100644 index 0000000000000000000000000000000000000000..66827f726bf39c98dcbac6fd2a7633a5cbde0b07 --- /dev/null +++ b/application/dao/log.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +# @Version : Python 3.11.4 +# @Software : Sublime Text 4 +# @Author : StudentCWZ +# @Email : StudentCWZ@outlook.com +# @Date : 2023/11/20 09:57 +# @File : log.py +# @Description : +""" + + +from loguru import logger + +from application.utils import ElasticsearchUtil +from application.libs.helper import MySQLHelper +from application.utils import ParseUtil +from application.models import Log + + +class LogDao: + """ + Data Access Object for logs. + This class includes methods to get data from Elasticsearch, process it and save it to MySQL. + """ + + @classmethod + def get_data_from_es(cls, index=None, dsl=None, sid=None) -> dict: + """ + Get data from Elasticsearch by either scroll searching or direct searching. + + Args: + index (str): The name of the Elasticsearch index. + dsl (dict): The DSL query for Elasticsearch. + sid (str): The scroll id for Elasticsearch scroll search. + + Returns: + dict: The data returned from Elasticsearch. + + Raises: + SystemError: If none of the required parameters are provided. + """ + if sid is not None: + return ElasticsearchUtil.scroll_search(sid) + elif index is not None and dsl is not None: + return ElasticsearchUtil.search(index, dsl) + else: + raise SystemError('Could not get data from Elasticsearch') + + @classmethod + def get_mdata(cls, data: dict) -> list: + """ + Get metadata from the data returned by Elasticsearch. + + Args: + data (dict): The data returned from Elasticsearch. + + Returns: + list: The metadata extracted from the data. + + Raises: + SystemError: If the metadata is empty. + """ + mdata = data.get('hits').get('hits') + if not mdata: + logger.error('the mdata is an empty list ...') + raise SystemError('the mdata is an empty list ...') + return mdata + + @classmethod + def get_intent_from_mysql(cls, sql: str, cfg: dict) -> list: + """ + Get the intent mapping from MySQL using the provided SQL. + + Args: + sql (str): The SQL query to execute. + cfg (dict): The configuration for MySQL. + + Returns: + list: The intent mapping list. + """ + with MySQLHelper(**cfg) as helper: + result = helper.execute(sql) + mapping_list = [item[0] for item in result] + return mapping_list + + @classmethod + def process_and_save_data(cls, lst: list, mapping_list: list): + """ + Process the given list using the mapping list and save the result to the database. + + Args: + lst (list): The list to process. + mapping_list (list): The mapping list to use for processing. + """ + if not lst: + return + result_generator = ParseUtil(mapping_list=mapping_list).filter(lst) + _ = Log.batch_save(result_generator) + + @classmethod + def parse(cls, start: str, end: str, index: str, sql: str, cfg: dict) -> int: + """ + Parse logs from Elasticsearch and save them to MySQL. + + Args: + start (str): The start date for the logs. + end (str): The end date for the logs. + index (str): The Elasticsearch index to get logs from. + sql (str): The SQL query to get the intent mapping from MySQL. + cfg (dict): The configuration for MySQL. + + Returns: + int: The total number of logs parsed. + + Raises: + SystemError: If there is an error during the process. + """ + + # Get the DSL for the given start and end dates. + dsl = ElasticsearchUtil.dsl(start, end) + + # Get data from Elasticsearch. + data = cls.get_data_from_es(index=index, dsl=dsl) + + # Extract metadata from the data. + mdata = cls.get_mdata(data) + + # Get the total number of logs. + total = data.get('hits').get('total').get('value') + logger.debug(f'The numbers of data by searching data from ES: {total}') + + # Log the start of the searching and saving process. + logger.debug('The data is inserting ...') + + # Get the intent mapping from MySQL. + mapping_list = cls.get_intent_from_mysql(sql, cfg) + + # Process and save the metadata. + cls.process_and_save_data(mdata, mapping_list) + + # Get the scroll id for scroll searching in Elasticsearch. + scroll_id = data.get('_scroll_id') + + try: + for _ in range(0, int(total / dsl.get('size', 10000) + 1)): + # Get more data from Elasticsearch using scroll searching. + res = cls.get_data_from_es(sid=scroll_id) + lst = res.get('hits').get('hits') + + if not lst: + continue + + # Process and save the data. + cls.process_and_save_data(lst, mapping_list) + except Exception as e: + # Log the error and raise a SystemError. + logger.error(f'The error: {e}') + raise SystemError() + else: + # Log the success of the process. + logger.debug('The process of inserting data succeed!') + finally: + # Log the end of the process. + logger.debug('The inserting of the data finished!') + + return total diff --git a/application/services/log.py b/application/services/log.py index d11e29bc47a627fce560963e708ac41e7e7c8772..ebf17bc3c4a0d6b67aedb833590e879afb7d6a85 100644 --- a/application/services/log.py +++ b/application/services/log.py @@ -10,24 +10,48 @@ # @Description : """ + from flask import current_app from loguru import logger -from application.utils import ElasticsearchUtil -from application.libs.helper import MySQLHelper -from application.utils import ParseUtil -from application.models import Log +from application.dao import LogDao +from application.schemas import ParseLogRequestItem class LogService: @classmethod - def parse(cls, item) -> int: + def parse(cls, item: ParseLogRequestItem) -> int: + """ + Parse logs from Elasticsearch and process them according to the configured SQL commands. + + Args: + item: The item containing the start and end dates for which logs are to be parsed. + + Returns: + The result of the log parsing as an integer. + + Raises: + SystemError: If there is an issue with the process. + """ + + # Retrieve the start and end date from the given item. start_date = item.start end_date = item.end - es_index = current_app.config.Elasticsearch.Index + + # Log the time interval for debugging purposes. + logger.debug(f'The interval of time is between {start_date} and {end_date}...') + + # Retrieve the Elasticsearch index from the current application's configuration. + index = current_app.config.Elasticsearch.Index + + # Retrieve the MySQL configuration from the current application's configuration. mysql_config = current_app.config.ExtraDB + + # Retrieve the SQL to be executed from the MySQL configuration. sql = mysql_config.Sql + + # Construct a configuration dictionary for the MySQL database. cfg = { 'host': mysql_config.Host, 'user': mysql_config.User, @@ -35,44 +59,6 @@ class LogService: 'db': mysql_config.DB, 'port': mysql_config.Port, } - logger.debug( - f'The interval of time is between {start_date} and {end_date} ...') - dsl = ElasticsearchUtil.dsl(start_date, end_date, size=2500) - data = ElasticsearchUtil.search(es_index, dsl) - # 获取 mdata - mdata = data.get('hits').get('hits') - if not mdata: - logger.error('the mdata is an empty list ...') - raise SystemError('the mdata is an empty list ...') - # 获取 total - total = data.get('hits').get('total').get('value') - logger.debug(f'The numbers of data by searching data from ES: {total}') - with MySQLHelper(**cfg) as helper: - result = helper.execute(sql) - mapping_list = [item[0] for item in result] - result = ParseUtil(mapping_list=mapping_list).filter(mdata) - logger.debug('The first part of data is inserting ...') - _ = Log.batch_save(result) - logger.debug('The inserting of the first part data finished!') - logger.debug('The scrolling part of data is inserting ...') - # 获取 scroll_id - scroll_id = data.get('_scroll_id') - try: - for _ in range(0, int(total / dsl.get('size', 10000) + 1)): - res = ElasticsearchUtil.scroll_search(scroll_id) - lst = res.get('hits').get('hits') - if not lst: - continue - result_generator = ParseUtil( - mapping_list=mapping_list).filter(lst) - _ = Log.batch_save(result_generator) - except Exception as e: - # 输出 log 信息 - logger.error(f'The error: {e}') - raise SystemError() - else: - logger.debug('The process of inserting scrolling part data succeed!') - finally: - logger.debug('The inserting of the scrolling part data finished!') - - return total + + # Parse the logs using the LogDao class and return the result. + return LogDao.parse(start_date, end_date, index, sql, cfg) diff --git a/application/views/log/log.py b/application/views/log/log.py index d4ac8d6b7c167c5a7259bcb7408cf267b366ad89..4e05da8e568dc40d5299b2068c9f47d048bd0aea 100644 --- a/application/views/log/log.py +++ b/application/views/log/log.py @@ -10,7 +10,7 @@ # @Description : """ -from flask import Blueprint, request, jsonify +from flask import Blueprint, current_app, request, jsonify from flask_restful import Api from loguru import logger from pydantic import ValidationError