parse.py 12.6 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/11/19 17:35
# @File        : parse.py
# @Description :
"""

import datetime
import json
import re
from typing import Generator

import dateutil.parser
from loguru import logger


class ParseUtil:

    def __init__(self, mapping_list: list):
        self.mapping_list = mapping_list

    def filter(self, data: list) -> Generator:
        """
        Filter logs

        :param data: logs data
        :return: Generator
        """
        for _index, _data in enumerate(data):
            _source = _data.get('_source', {})
            if not _source:
                print('the field of _source is not in data ...')
                continue
            # _tag = _source.get('tag', '')
            # if _tag != 'global':
            #     continue
            # 获取 res
            res = self.parse(_source)
            if not res:
                # 获取 uuid
                uuid = _source.get("uuid", "") or _source.get("requestId", "")
                print(f'missing uuid of data: {uuid}')
                continue
            yield res

    def parse(self, dic: dict) -> dict:
        """
        Parse logs

        :param dic: logs dict before parsing logs
        :return: dic
        """
        # 捕获异常
        try:
            # 获取 date_time
            date_time = dic.get("time", "")
            # 条件判断
            if not date_time:
                # 获取 time_stamp
                time_stamp = dic.get("@timestamp", "").split(".")[0]
                # 条件判断
                if isinstance(time_stamp, str):
                    # 获取 date_time
                    date_time = (dateutil.parser.isoparse(time_stamp)
                                 + datetime.timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
                elif isinstance(time_stamp, datetime.datetime):
                    # 获取 date_time
                    date_time = (time_stamp + datetime.timedelta(hours=8)
                                 ).strftime("%Y-%m-%d %H:%M:%S")
                else:
                    # 输出 log 信息
                    print("The error: parse time_stamp failed ...")

            # 获取 uuid
            uuid = dic.get("uuid", "") or dic.get("requestId", "")
            # 获取 msg
            msg = dic.get("message", "")
            # 条件判断
            if msg:
                # 获取 req
                req = json.loads(msg).get("field", {}).get(
                    "data", {}).get("request", {})
                # 获取 resp
                resp = json.loads(msg).get("field", {}).get(
                    "data", {}).get("response", {})
                # 获取 data
                data = json.loads(msg).get("field", {}).get("data", {})
            else:
                # 获取 req
                req = dic.get("field", {}).get(
                    "data", {}).get("request", {})
                # 获取 resp
                resp = dic.get("field", {}).get(
                    "data", {}).get("response", {})
                # 获取 data
                data = dic.get("field", {}).get("data", {})
            # 获取 mac_voice
            mac_voice = req.get("macVoice", "")
            # 获取 mac_wifi
            mac_wifi = req.get("macWifi", "")
            # 获取 query
            query = req.get("query", "")
            # 获取 mid
            mid = req.get("mid", "")
            # 获取 mid_type
            mid_type = req.get("midType", "")
            # 获取 req_param
            req_param = req.get("requestBody", {}).get(
                "reqParam", {}) or req.get("reqParam", {})
            # 获取 common
            common = req_param.get("common", {}) or req_param.get("Common", {})
            # 获取 request_id
            request_id = common.get("requestId", "") or common.get(
                "RequestId", "") or ""
            # 获取 remote_ip
            remote_ip = common.get("remoteIP", "") or common.get(
                "RemoteIP", "") or common.get("remoteIp", "") or ""
            # 获取 app_key
            app_key = common.get("appKey", "") or common.get(
                "AppKey", "") or ""
            # 获取 ud_id
            ud_id = common.get("udid", "") or common.get("Udid", "") or ""
            # 获取 user_id
            user_id = common.get("userId", "") or common.get(
                "UserId", "") or ""
            # 获取 service_type
            service_type = str(common.get(
                "serviceType", "").replace("asr", "")) or 0
            # 声明 voice_portal
            voice_portal = 1
            # 声明 emotion_class
            emotion_class = ""
            # 获取 nlu_ret
            nlu_ret = req_param.get("nluRet", {})
            # 获取 yzs_nlu_time
            yzs_nlu_time = nlu_ret.get("nluProcessTime", "")
            # 获取 yzs_general
            yzs_general = json.dumps(nlu_ret.get(
                "general", {}), ensure_ascii=False).replace("{}", "")
            # 获取 yzs_intent
            yzs_intent = json.dumps(nlu_ret.get("semantic", {}).get(
                "intent", []), ensure_ascii=False).replace("[]", "")
            # 条件判断
            if resp:
                # 获取 header
                header = resp.get("header", {})
                # 获取 semantic
                semantic = header.get("semantic", {})
                # 获取 code
                code = semantic.get("code", 0)
                # 获取 terminal_domain
                terminal_domain = semantic.get("domain", "") or resp.get(
                    "semantic", {}).get("service", "")
                # 获取 terminal_intent
                terminal_intent = semantic.get("intent", "") or resp.get(
                    "semantic", {}).get("action", "")
                # 获取 skill_id
                skill_id = semantic.get("skill_id", "")
                # 获取 response_text
                response_text = resp.get("response_text", "")
                # 获取 slots
                if query == "空调调到26度":
                    logger.info(semantic.get("params", ""))
                slots = semantic.get("params", "")
                if slots == "":
                    if len(semantic.get("slots", [])) > 0:
                        slots = json.dumps(semantic.get("slots"), ensure_ascii=False)
                # 条件判断
                if terminal_domain == "chat":
                    if skill_id:
                        # 条件判断
                        if skill_id in self.mapping_list:
                            # 获取 distribution_gree_domain, distribution_gree_intent
                            distribution_gree_domain, distribution_gree_intent = skill_id.split(
                                ".")
                        else:
                            # 获取 reg_num_list
                            reg_num_list = re.findall(r"(\d+)", skill_id, re.S)
                            # 条件判断
                            if not reg_num_list:
                                # 条件判断
                                if "." in skill_id:
                                    # 获取 distribution_gree_domain, distribution_gree_intent
                                    distribution_gree_domain, distribution_gree_intent = skill_id.split(
                                        ".")
                                else:
                                    # 获取 distribution_gree_domain, distribution_gree_intent
                                    distribution_gree_domain, distribution_gree_intent = skill_id, skill_id
                            else:
                                # 获取 distribution_gree_domain, distribution_gree_intent
                                distribution_gree_domain, distribution_gree_intent = "", ""
                    else:
                        # 获取 distribution_gree_domain, distribution_gree_intent
                        distribution_gree_domain, distribution_gree_intent = "", ""
                else:
                    # 获取 distribution_gree_domain, distribution_gree_intent
                    distribution_gree_domain, distribution_gree_intent = terminal_domain, terminal_intent
            else:
                # 声明 code
                code = -3
                # 声明 terminal_domain
                terminal_domain = ""
                # 声明 intent
                terminal_intent = ""
                # 声明 skill_id
                skill_id = ""
                # 声明 response_text
                response_text = ""
                # 声明 slots
                slots = ""
                # 获取 distribution_gree_domain, distribution_gree_intent
                distribution_gree_domain, distribution_gree_intent = "", ""

            # 获取 service_nlu
            service_nlu = data.get(
                "serviceNLU", "") or data.get("serverNLU", "")
            # 获取 cost_time
            cost_time = data.get("cost_time", {}).get("return", "") or data.get(
                "cost_time", {}).get("save_records", "")
            # 获取 get_body_time
            get_body_time = data.get("cost_time", {}).get("get_body", "")
            # 获取 gree_nlu_time
            gree_nlu_time = data.get("cost_time", {}).get("gree_nlu", "")
            # 获取 tencent_nlu_time
            tencent_nlu_time = data.get("cost_time", {}).get("tencent_nlu", "")
            # 获取 get_homeid_time
            get_homeid_time = data.get("cost_time", {}).get("get_homeid", "")
            # 条件判断
            if gree_nlu_time:
                # 条件判断
                if get_body_time:
                    # 获取 gree_nlu_time
                    gree_nlu_time = str(
                        float(gree_nlu_time) - float(get_body_time))
            # 条件判断
            if tencent_nlu_time:
                # 条件判断
                if get_body_time:
                    # 获取 tencent_nlu_time
                    tencent_nlu_time = str(
                        float(tencent_nlu_time) - float(get_body_time))
            # 条件判断
            if get_homeid_time:
                # 条件判断
                if gree_nlu_time != "" or tencent_nlu_time != "":
                    # 条件判断
                    if gree_nlu_time == "":
                        # 获取 get_homeid_time
                        get_homeid_time = str(
                            float(get_homeid_time) - float(tencent_nlu_time))
                    # 条件判断
                    elif tencent_nlu_time == "":
                        # 获取 get_homeid_time
                        get_homeid_time = str(
                            float(get_homeid_time) - float(gree_nlu_time))
                    else:
                        # 获取 get_homeid_time
                        get_homeid_time = str(float(get_homeid_time) - max(float(gree_nlu_time),
                                                                           float(tencent_nlu_time)))
        except Exception as e:
            # 输出 log 信息
            print(f"The error: {e}")
        else:
            # 获取 result
            result = {
                "date_time": date_time,
                "uuid": uuid,
                "mid": mid,
                "mid_type": mid_type,
                "mac_wifi": mac_wifi,
                "mac_voice": mac_voice,
                "code": code,
                "query": query,
                "terminal_domain": terminal_domain,
                "terminal_intent": terminal_intent,
                "distribution_gree_domain": distribution_gree_domain,
                "distribution_gree_intent": distribution_gree_intent,
                "response_text": response_text,
                "emotion_class": emotion_class,
                "skill_id": skill_id,
                "voice_portal": voice_portal,
                "service_nlu": service_nlu,
                "service_type": service_type,
                "slots": slots,
                "yzs_request_id": request_id,
                "yzs_remote_ip": remote_ip,
                "yzs_app_key": app_key,
                "yzs_ud_id": ud_id,
                "yzs_user_id": user_id,
                "yzs_intent": yzs_intent,
                "yzs_general": yzs_general,
                "yzs_nlu_time": yzs_nlu_time,
                "get_body_time": get_body_time,
                "gree_nlu_time": gree_nlu_time,
                "tencent_nlu_time": tencent_nlu_time,
                "get_homeid_time": get_homeid_time,
                "cost_time": cost_time
            }
            # 返回 result
            return result