log.py 2.8 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/11/19 16:02
# @File        : log.py
# @Description :
"""

from flask import current_app
from loguru import logger

from application.utils import ElasticsearchUtil
from application.libs.helper import MySQLHelper
from application.utils import ParseUtil
from application.models import Log


class LogService:

    @classmethod
    def parse(cls, item) -> int:
        start_date = item.start
        end_date = item.end
        es_index = current_app.config.Elasticsearch.Index
        mysql_config = current_app.config.ExtraDB
        sql = mysql_config.Sql
        cfg = {
            'host': mysql_config.Host,
            'user': mysql_config.User,
            'password': mysql_config.Password,
            'db': mysql_config.DB,
            'port': mysql_config.Port,
        }
        logger.debug(
            f'The interval of time is between {start_date} and {end_date} ...')
        dsl = ElasticsearchUtil.dsl(start_date, end_date, size=2500)
        data = ElasticsearchUtil.search(es_index, dsl)
        # 获取 mdata
        mdata = data.get('hits').get('hits')
        if not mdata:
            logger.error('the mdata is an empty list ...')
            raise SystemError('the mdata is an empty list ...')
        # 获取 total
        total = data.get('hits').get('total').get('value')
        logger.debug(f'The numbers of data by searching data from ES: {total}')
        with MySQLHelper(**cfg) as helper:
            result = helper.execute(sql)
        mapping_list = [item[0] for item in result]
        result = ParseUtil(mapping_list=mapping_list).filter(mdata)
        logger.debug('The first part of data is inserting ...')
        _ = Log.batch_save(result)
        logger.debug('The inserting of the first part data finished!')
        logger.debug('The scrolling part of data is inserting ...')
        # 获取 scroll_id
        scroll_id = data.get('_scroll_id')
        try:
            for _ in range(0, int(total / dsl.get('size', 10000) + 1)):
                res = ElasticsearchUtil.scroll_search(scroll_id)
                lst = res.get('hits').get('hits')
                if not lst:
                    continue
                result_generator = ParseUtil(
                    mapping_list=mapping_list).filter(lst)
                _ = Log.batch_save(result_generator)
        except Exception as e:
            # 输出 log 信息
            logger.error(f'The error: {e}')
            raise SystemError()
        else:
            logger.debug('The process of inserting scrolling part data succeed!')
        finally:
            logger.debug('The inserting of the scrolling part data finished!')

        return total