#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/11/19 17:35 # @File : parse.py # @Description : """ import datetime import json import re from typing import Generator import dateutil.parser class ParseUtil: def __init__(self, mapping_list: list): self.mapping_list = mapping_list def filter(self, data: list) -> Generator: """ Filter logs :param data: logs data :return: Generator """ for _index, _data in enumerate(data): _source = _data.get('_source', {}) if not _source: print('the field of _source is not in data ...') continue # _tag = _source.get('tag', '') # if _tag != 'global': # continue # 获取 res res = self.parse(_source) if not res: # 获取 uuid uuid = _source.get('uuid', '') or _source.get('requestId', '') print(f'missing uuid of data: {uuid}') continue yield res def parse(self, dic: dict) -> dict: """ Parse logs :param dic: logs dict before parsing logs :return: dic """ # 捕获异常 try: # 获取 date_time date_time = dic.get('time', '') # 条件判断 if not date_time: # 获取 time_stamp time_stamp = dic.get('@timestamp', '').split('.')[0] # 条件判断 if isinstance(time_stamp, str): # 获取 date_time date_time = (dateutil.parser.isoparse(time_stamp) + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') elif isinstance(time_stamp, datetime.datetime): # 获取 date_time date_time = (time_stamp + datetime.timedelta(hours=8) ).strftime('%Y-%m-%d %H:%M:%S') else: # 输出 log 信息 print('The error: parse time_stamp failed ...') # 获取 uuid uuid = dic.get('uuid', '') or dic.get('requestId', '') # 获取 msg msg = dic.get('message', '') # 条件判断 if msg: # 获取 req req = json.loads(msg).get('field', {}).get( 'data', {}).get('request', {}) # 获取 resp resp = json.loads(msg).get('field', {}).get( 'data', {}).get('response', {}) # 获取 data data = json.loads(msg).get('field', {}).get('data', {}) else: # 获取 req req = dic.get('field', {}).get( 'data', {}).get('request', {}) # 获取 resp resp = dic.get('field', {}).get( 'data', {}).get('response', {}) # 获取 data data = dic.get('field', {}).get('data', {}) # 获取 mac_voice mac_voice = req.get('macVoice', '') # 获取 mac_wifi mac_wifi = req.get('macWifi', '') # 获取 query query = req.get('query', '') # 获取 mid mid = req.get('mid', '') # 获取 mid_type mid_type = req.get('midType', '') # 获取 req_param req_param = req.get('requestBody', {}).get( 'reqParam', {}) or req.get('reqParam', {}) # 获取 common common = req_param.get('common', {}) or req_param.get('Common', {}) # 获取 request_id request_id = common.get('requestId', '') or common.get( 'RequestId', '') or '' # 获取 remote_ip remote_ip = common.get('remoteIP', '') or common.get( 'RemoteIP', '') or common.get('remoteIp', '') or '' # 获取 app_key app_key = common.get('appKey', '') or common.get( 'AppKey', '') or '' # 获取 ud_id ud_id = common.get('udid', '') or common.get('Udid', '') or '' # 获取 user_id user_id = common.get('userId', '') or common.get( 'UserId', '') or '' # 获取 service_type service_type = str(common.get( 'serviceType', '').replace('asr', '')) or 0 # 声明 voice_portal voice_portal = 1 # 声明 emotion_class emotion_class = '' # 获取 nlu_ret nlu_ret = req_param.get('nluRet', {}) # 获取 yzs_nlu_time yzs_nlu_time = nlu_ret.get('nluProcessTime', '') # 获取 yzs_general yzs_general = json.dumps(nlu_ret.get( 'general', {}), ensure_ascii=False).replace('{}', '') # 获取 yzs_intent yzs_intent = json.dumps(nlu_ret.get('semantic', {}).get( 'intent', []), ensure_ascii=False).replace('[]', '') # 条件判断 if resp: # 获取 header header = resp.get('header', {}) # 获取 semantic semantic = header.get('semantic', {}) or resp.get('semantic', {}) # 获取 code code = semantic.get('code', 0) # 获取 terminal_domain terminal_domain = semantic.get('domain', '') or semantic.get('service', '') # 获取 terminal_intent terminal_intent = semantic.get('intent', '') or semantic.get('action', '') # 获取 skill_id skill_id = semantic.get('skill_id', '') # 获取 response_text response_text = resp.get('response_text', '') # 获取 slots slots = semantic.get('params', '') # 条件判断 if slots != '': slots = json.dumps(slots, ensure_ascii=False) else: if len(semantic.get('slots', [])) > 0: slots = json.dumps(semantic.get('slots'), ensure_ascii=False) # 条件判断 if terminal_domain == 'chat': if skill_id: # 条件判断 if skill_id in self.mapping_list: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = skill_id.split( '.') else: # 获取 reg_num_list reg_num_list = re.findall(r'(\d+)', skill_id, re.S) # 条件判断 if not reg_num_list: # 条件判断 if '.' in skill_id: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = skill_id.split( '.') else: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = skill_id, skill_id else: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = '', '' else: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = '', '' else: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = terminal_domain, terminal_intent else: # 声明 code code = -3 # 声明 terminal_domain terminal_domain = '' # 声明 intent terminal_intent = '' # 声明 skill_id skill_id = '' # 声明 response_text response_text = '' # 声明 slots slots = '' # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = '', '' # 获取 service_nlu service_nlu = data.get( 'serviceNLU', '') or data.get('serverNLU', '') # 获取 cost_time cost_time = data.get('cost_time', {}).get('return', '') or data.get( 'cost_time', {}).get('save_records', '') # 获取 get_body_time get_body_time = data.get('cost_time', {}).get('get_body', '') # 获取 gree_nlu_time gree_nlu_time = data.get('cost_time', {}).get('gree_nlu', '') # 获取 tencent_nlu_time tencent_nlu_time = data.get('cost_time', {}).get('tencent_nlu', '') # 获取 get_homeid_time get_homeid_time = data.get('cost_time', {}).get('get_homeid', '') # 条件判断 if gree_nlu_time: # 条件判断 if get_body_time: # 获取 gree_nlu_time gree_nlu_time = str( float(gree_nlu_time) - float(get_body_time)) # 条件判断 if tencent_nlu_time: # 条件判断 if get_body_time: # 获取 tencent_nlu_time tencent_nlu_time = str( float(tencent_nlu_time) - float(get_body_time)) # 条件判断 if get_homeid_time: # 条件判断 if gree_nlu_time != '' or tencent_nlu_time != '': # 条件判断 if gree_nlu_time == '': # 获取 get_homeid_time get_homeid_time = str( float(get_homeid_time) - float(tencent_nlu_time)) # 条件判断 elif tencent_nlu_time == '': # 获取 get_homeid_time get_homeid_time = str( float(get_homeid_time) - float(gree_nlu_time)) else: # 获取 get_homeid_time get_homeid_time = str(float(get_homeid_time) - max(float(gree_nlu_time), float(tencent_nlu_time))) except Exception as e: # 输出 log 信息 print(f'The error: {e}') else: # 获取 result result = { 'date_time': date_time, 'uuid': uuid, 'mid': mid, 'mid_type': mid_type, 'mac_wifi': mac_wifi, 'mac_voice': mac_voice, 'code': code, 'query': query, 'terminal_domain': terminal_domain, 'terminal_intent': terminal_intent, 'distribution_gree_domain': distribution_gree_domain, 'distribution_gree_intent': distribution_gree_intent, 'response_text': response_text, 'emotion_class': emotion_class, 'skill_id': skill_id, 'voice_portal': voice_portal, 'service_nlu': service_nlu, 'service_type': service_type, 'slots': slots, 'yzs_request_id': request_id, 'yzs_remote_ip': remote_ip, 'yzs_app_key': app_key, 'yzs_ud_id': ud_id, 'yzs_user_id': user_id, 'yzs_intent': yzs_intent, 'yzs_general': yzs_general, 'yzs_nlu_time': yzs_nlu_time, 'get_body_time': get_body_time, 'gree_nlu_time': gree_nlu_time, 'tencent_nlu_time': tencent_nlu_time, 'get_homeid_time': get_homeid_time, 'cost_time': cost_time } # 返回 result return result