#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/11/19 17:35 # @File : parse.py # @Description : """ import datetime import json import re from typing import Generator import dateutil.parser from loguru import logger class ParseUtil: def __init__(self, mapping_list: list): self.mapping_list = mapping_list def filter(self, data: list) -> Generator: """ Filter logs :param data: logs data :return: Generator """ for _index, _data in enumerate(data): _source = _data.get('_source', {}) if not _source: print('the field of _source is not in data ...') continue # _tag = _source.get('tag', '') # if _tag != 'global': # continue # 获取 res res = self.parse(_source) if not res: # 获取 uuid uuid = _source.get("uuid", "") or _source.get("requestId", "") print(f'missing uuid of data: {uuid}') continue yield res def parse(self, dic: dict) -> dict: """ Parse logs :param dic: logs dict before parsing logs :return: dic """ # 捕获异常 try: # 获取 date_time date_time = dic.get("time", "") # 条件判断 if not date_time: # 获取 time_stamp time_stamp = dic.get("@timestamp", "").split(".")[0] # 条件判断 if isinstance(time_stamp, str): # 获取 date_time date_time = (dateutil.parser.isoparse(time_stamp) + datetime.timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S") elif isinstance(time_stamp, datetime.datetime): # 获取 date_time date_time = (time_stamp + datetime.timedelta(hours=8) ).strftime("%Y-%m-%d %H:%M:%S") else: # 输出 log 信息 print("The error: parse time_stamp failed ...") # 获取 uuid uuid = dic.get("uuid", "") or dic.get("requestId", "") # 获取 msg msg = dic.get("message", "") # 条件判断 if msg: # 获取 req req = json.loads(msg).get("field", {}).get( "data", {}).get("request", {}) # 获取 resp resp = json.loads(msg).get("field", {}).get( "data", {}).get("response", {}) # 获取 data data = json.loads(msg).get("field", {}).get("data", {}) else: # 获取 req req = dic.get("field", {}).get( "data", {}).get("request", {}) # 获取 resp resp = dic.get("field", {}).get( "data", {}).get("response", {}) # 获取 data data = dic.get("field", {}).get("data", {}) # 获取 mac_voice mac_voice = req.get("macVoice", "") # 获取 mac_wifi mac_wifi = req.get("macWifi", "") # 获取 query query = req.get("query", "") # 获取 mid mid = req.get("mid", "") # 获取 mid_type mid_type = req.get("midType", "") # 获取 req_param req_param = req.get("requestBody", {}).get( "reqParam", {}) or req.get("reqParam", {}) # 获取 common common = req_param.get("common", {}) or req_param.get("Common", {}) # 获取 request_id request_id = common.get("requestId", "") or common.get( "RequestId", "") or "" # 获取 remote_ip remote_ip = common.get("remoteIP", "") or common.get( "RemoteIP", "") or common.get("remoteIp", "") or "" # 获取 app_key app_key = common.get("appKey", "") or common.get( "AppKey", "") or "" # 获取 ud_id ud_id = common.get("udid", "") or common.get("Udid", "") or "" # 获取 user_id user_id = common.get("userId", "") or common.get( "UserId", "") or "" # 获取 service_type service_type = str(common.get( "serviceType", "").replace("asr", "")) or 0 # 声明 voice_portal voice_portal = 1 # 声明 emotion_class emotion_class = "" # 获取 nlu_ret nlu_ret = req_param.get("nluRet", {}) # 获取 yzs_nlu_time yzs_nlu_time = nlu_ret.get("nluProcessTime", "") # 获取 yzs_general yzs_general = json.dumps(nlu_ret.get( "general", {}), ensure_ascii=False).replace("{}", "") # 获取 yzs_intent yzs_intent = json.dumps(nlu_ret.get("semantic", {}).get( "intent", []), ensure_ascii=False).replace("[]", "") # 条件判断 if resp: # 获取 header header = resp.get("header", {}) # 获取 semantic semantic = header.get("semantic", {}) # 获取 code code = semantic.get("code", 0) # 获取 terminal_domain terminal_domain = semantic.get("domain", "") or resp.get( "semantic", {}).get("service", "") # 获取 terminal_intent terminal_intent = semantic.get("intent", "") or resp.get( "semantic", {}).get("action", "") # 获取 skill_id skill_id = semantic.get("skill_id", "") # 获取 response_text response_text = resp.get("response_text", "") # 获取 slots if query == "空调调到26度": logger.info(semantic.get("params", "")) slots = semantic.get("params", "") if slots == "": if len(semantic.get("slots", [])) > 0: slots = json.dumps(semantic.get("slots"), ensure_ascii=False) # 条件判断 if terminal_domain == "chat": if skill_id: # 条件判断 if skill_id in self.mapping_list: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = skill_id.split( ".") else: # 获取 reg_num_list reg_num_list = re.findall(r"(\d+)", skill_id, re.S) # 条件判断 if not reg_num_list: # 条件判断 if "." in skill_id: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = skill_id.split( ".") else: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = skill_id, skill_id else: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = "", "" else: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = "", "" else: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = terminal_domain, terminal_intent else: # 声明 code code = -3 # 声明 terminal_domain terminal_domain = "" # 声明 intent terminal_intent = "" # 声明 skill_id skill_id = "" # 声明 response_text response_text = "" # 声明 slots slots = "" # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = "", "" # 获取 service_nlu service_nlu = data.get( "serviceNLU", "") or data.get("serverNLU", "") # 获取 cost_time cost_time = data.get("cost_time", {}).get("return", "") or data.get( "cost_time", {}).get("save_records", "") # 获取 get_body_time get_body_time = data.get("cost_time", {}).get("get_body", "") # 获取 gree_nlu_time gree_nlu_time = data.get("cost_time", {}).get("gree_nlu", "") # 获取 tencent_nlu_time tencent_nlu_time = data.get("cost_time", {}).get("tencent_nlu", "") # 获取 get_homeid_time get_homeid_time = data.get("cost_time", {}).get("get_homeid", "") # 条件判断 if gree_nlu_time: # 条件判断 if get_body_time: # 获取 gree_nlu_time gree_nlu_time = str( float(gree_nlu_time) - float(get_body_time)) # 条件判断 if tencent_nlu_time: # 条件判断 if get_body_time: # 获取 tencent_nlu_time tencent_nlu_time = str( float(tencent_nlu_time) - float(get_body_time)) # 条件判断 if get_homeid_time: # 条件判断 if gree_nlu_time != "" or tencent_nlu_time != "": # 条件判断 if gree_nlu_time == "": # 获取 get_homeid_time get_homeid_time = str( float(get_homeid_time) - float(tencent_nlu_time)) # 条件判断 elif tencent_nlu_time == "": # 获取 get_homeid_time get_homeid_time = str( float(get_homeid_time) - float(gree_nlu_time)) else: # 获取 get_homeid_time get_homeid_time = str(float(get_homeid_time) - max(float(gree_nlu_time), float(tencent_nlu_time))) except Exception as e: # 输出 log 信息 print(f"The error: {e}") else: # 获取 result result = { "date_time": date_time, "uuid": uuid, "mid": mid, "mid_type": mid_type, "mac_wifi": mac_wifi, "mac_voice": mac_voice, "code": code, "query": query, "terminal_domain": terminal_domain, "terminal_intent": terminal_intent, "distribution_gree_domain": distribution_gree_domain, "distribution_gree_intent": distribution_gree_intent, "response_text": response_text, "emotion_class": emotion_class, "skill_id": skill_id, "voice_portal": voice_portal, "service_nlu": service_nlu, "service_type": service_type, "slots": slots, "yzs_request_id": request_id, "yzs_remote_ip": remote_ip, "yzs_app_key": app_key, "yzs_ud_id": ud_id, "yzs_user_id": user_id, "yzs_intent": yzs_intent, "yzs_general": yzs_general, "yzs_nlu_time": yzs_nlu_time, "get_body_time": get_body_time, "gree_nlu_time": gree_nlu_time, "tencent_nlu_time": tencent_nlu_time, "get_homeid_time": get_homeid_time, "cost_time": cost_time } # 返回 result return result