#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/11/19 16:02 # @File : log.py # @Description : """ from flask import current_app from loguru import logger from application.utils import ElasticsearchUtil from application.libs.helper import MySQLHelper from application.utils import ParseUtil from application.models import Log class LogService: @classmethod def parse(cls, item) -> int: start_date = item.start end_date = item.end es_index = current_app.config.Elasticsearch.Index mysql_config = current_app.config.ExtraDB sql = mysql_config.Sql cfg = { 'host': mysql_config.Host, 'user': mysql_config.User, 'password': mysql_config.Password, 'db': mysql_config.DB, 'port': mysql_config.Port, } logger.debug( f'The interval of time is between {start_date} and {end_date} ...') dsl = ElasticsearchUtil.dsl(start_date, end_date, size=2500) data = ElasticsearchUtil.search(es_index, dsl) # 获取 mdata mdata = data.get('hits').get('hits') if not mdata: logger.error('the mdata is an empty list ...') raise SystemError('the mdata is an empty list ...') # 获取 total total = data.get('hits').get('total').get('value') logger.debug(f'The numbers of data by searching data from ES: {total}') with MySQLHelper(**cfg) as helper: result = helper.execute(sql) mapping_list = [item[0] for item in result] result = ParseUtil(mapping_list=mapping_list).filter(mdata) logger.debug('The first part of data is inserting ...') _ = Log.batch_save(result) logger.debug('The inserting of the first part data finished!') logger.debug('The scrolling part of data is inserting ...') # 获取 scroll_id scroll_id = data.get('_scroll_id') try: for _ in range(0, int(total / dsl.get('size', 10000) + 1)): res = ElasticsearchUtil.scroll_search(scroll_id) lst = res.get('hits').get('hits') if not lst: continue result_generator = ParseUtil( mapping_list=mapping_list).filter(lst) _ = Log.batch_save(result_generator) except Exception as e: # 输出 log 信息 logger.error(f'The error: {e}') raise SystemError() else: logger.debug('The process of inserting scrolling part data succeed!') finally: logger.debug('The inserting of the scrolling part data finished!') return total