diff --git a/app.py b/app.py index 2d71d37f4961e097bd62745c02e4e0d3c239ba85..87cae6c708305010c46f1680c376a04ed44f441a 100644 --- a/app.py +++ b/app.py @@ -16,18 +16,41 @@ from application import create_app def runserver(): """ This function creates a new instance of the Flask application and runs it. + + It retrieves the host and port from the application configuration + and uses these values when running the app. + + Please note that the Flask development server is not meant to be used in a + production environment. It's intended to be used in development, where you + can easily restart the server whenever you make changes to your code. """ - # Create a new instance of the Flask application + # Create a new instance of the Flask application. + # The create_app() function should be defined elsewhere in your project, + # and should return an instance of the Flask class. app = create_app() - # Run the Flask development server - # The run() method starts the application's development server. - # It's not meant to be used on production environment. - app.run(debug=True, use_reloader=False) + # Retrieve the server configuration from the Flask application config. + # The 'Server' key is assumed to be a dictionary containing the keys 'IP' and 'Port'. + cfg = app.config['Server'] + + # Retrieve the host and port from the config. + # If they're not present, default to '0.0.0.0' for the host and '5000' for the port. + host = cfg.get('IP', '0.0.0.0') + port = cfg.get('Port', 5000) + + # Run the Flask development server. + app.run(host=host, port=port, use_reloader=False) if __name__ == '__main__': + """ + This condition is true if the script is run directly. + If the script is imported from another script, the condition is false. + + This allows you to use this script's functions without running the server + if you import this script from another script. + """ + # Only run the development server if the script is executed directly. - # This allows other scripts to import this file without starting the server. runserver() diff --git a/application/__init__.py b/application/__init__.py index 4b51ca50798999b4319e2b2f52ac49dcabfeafc6..5086922a4fe769d5112235512c3e69a6e3cc3877 100644 --- a/application/__init__.py +++ b/application/__init__.py @@ -44,5 +44,7 @@ def create_app() -> Flask: # This could include data migration scripts, administrative tasks, etc. init_script(app) + print(app.config) + # Return the fully initialized Flask application return app diff --git a/application/config/config.yaml b/application/config/config.yaml index c4ca7721a05a3bf98a04c6e27919a31d32a3e56f..045ab3a592bf02ce5727871ce395c22a85887625 100644 --- a/application/config/config.yaml +++ b/application/config/config.yaml @@ -1,5 +1,11 @@ -System: - Env: public +Server: + Name: elasticsearch-log-parse + Tag: + IP: 172.28.12.236 + Port: 5000 + Interval: 30s + Timeout: 30s + Deregister: 60s Database: Type: postgresql diff --git a/application/extensions/__init__.py b/application/extensions/__init__.py index 202b955842c3ab0294d754a53566a16b89d7b3b0..29e667b8002c650b6a359a65c773df75fbacee0e 100644 --- a/application/extensions/__init__.py +++ b/application/extensions/__init__.py @@ -16,17 +16,21 @@ from flask import Flask from .init_apispec import init_apispec from .init_apscheduler import init_tasks from .init_config import init_config +from .init_consul import init_consul from .init_cors import init_cors +from .init_dotenv import init_dotenv from .init_elasticsearch import init_elasticsearch from .init_logger import init_logger from .init_sqlalchemy import init_database def init_plugs(app: Flask) -> None: + init_dotenv() init_config(app) init_logger(app) init_database(app) init_apispec(app) init_elasticsearch(app) init_cors(app) + init_consul(app) init_tasks(app) diff --git a/application/extensions/init_consul.py b/application/extensions/init_consul.py new file mode 100644 index 0000000000000000000000000000000000000000..0c0dde6e0fd8800a25c2a7cd3c5c504520205984 --- /dev/null +++ b/application/extensions/init_consul.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +# @Version : Python 3.11.4 +# @Software : Sublime Text 4 +# @Author : StudentCWZ +# @Email : StudentCWZ@outlook.com +# @Date : 2023/11/21 10:01 +# @File : init_consul.py +# @Description : +""" + +from flask import Flask + +from application.libs import FlaskConsulService + + +def init_consul(app: Flask) -> None: + FlaskConsulService(app) diff --git a/application/extensions/init_dotenv.py b/application/extensions/init_dotenv.py new file mode 100644 index 0000000000000000000000000000000000000000..9a7b19fc4d919bfcdb4df1bfef052bfb1c8291fc --- /dev/null +++ b/application/extensions/init_dotenv.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +# @Version : Python 3.11.4 +# @Software : Sublime Text 4 +# @Author : StudentCWZ +# @Email : StudentCWZ@outlook.com +# @Date : 2023/11/21 10:51 +# @File : init_dotenv.py +# @Description : +""" + +import os + +from dotenv import load_dotenv + + +def init_dotenv() -> None: + """ + Initialize the dotenv extension + + :return: None + """ + root_path = os.path.abspath(os.path.dirname(__file__)).split('extensions')[0] + flask_env_path = os.path.join(root_path, '.flaskenv') + if os.path.exists(flask_env_path): + load_dotenv(flask_env_path) diff --git a/application/libs/__init__.py b/application/libs/__init__.py index 20aa673ab0a5619a9ff27f34cd652fe52a1fa95a..27fc3de22ca1a14b31cfbb4b3fe904714f1f87d0 100644 --- a/application/libs/__init__.py +++ b/application/libs/__init__.py @@ -14,3 +14,4 @@ from .config import ConsulConfig, LocalConfig from .error import ConfigKeyError from .flask_elasticsearch import FlaskElasticsearch from .flask_loguru import FlaskLoguru +from .flask_consul import FlaskConsulService diff --git a/application/libs/flask_consul/__init__.py b/application/libs/flask_consul/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..6be05355826b6870d53c025321cd15cfcb07de32 --- /dev/null +++ b/application/libs/flask_consul/__init__.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +# @Version : Python 3.11.4 +# @Software : Sublime Text 4 +# @Author : StudentCWZ +# @Email : StudentCWZ@outlook.com +# @Date : 2023/11/21 09:10 +# @File : __init__.py.py +# @Description : +""" + +from .consul import FlaskConsulService diff --git a/application/libs/flask_consul/consul.py b/application/libs/flask_consul/consul.py new file mode 100644 index 0000000000000000000000000000000000000000..f644c7b3e3c938804c12dfb3bd915a108f21ff7e --- /dev/null +++ b/application/libs/flask_consul/consul.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +# @Version : Python 3.11.4 +# @Software : Sublime Text 4 +# @Author : StudentCWZ +# @Email : StudentCWZ@outlook.com +# @Date : 2023/11/21 09:10 +# @File : consul.py +# @Description : +""" + + +import atexit +import consul +from flask import current_app, Flask + +from application.libs.helper import ConfigHelper +from application.libs.helper import EnvVarHelper + + +class FlaskConsulService: + + def __init__(self, app=None): + self.app = app + self.client = None + self.service_id = None + if app is not None: + self.init_app(app) + + def init_app(self, app: Flask): + env_vars, server_cfg = self.set_server_cfg(app) + self.register_service(env_vars, server_cfg) + atexit.register(self.deregister_service) + + @staticmethod + def set_server_cfg(app: Flask) -> tuple: + # 实例化对象 + eh = EnvVarHelper() + # 从环境变量获取 consul 参数 + env_vars = eh.consul_vars + # 获取 consul key + _ = env_vars.pop('key', None) + + # Initialize ConfigHelper + ch = ConfigHelper(app) + # Fetch the logging configuration from the app's config + cfg = ch.Server + + return env_vars, cfg + + def register_service(self, env_vars: dict, cfg: ConfigHelper): + service_name = cfg.get('Name', 'elasticsearch-log-parse') + service_address = cfg.get('IP', 'localhost') + service_port = cfg.get('Port', '5000') + check_interval = cfg.get('Interval', '30s') + timeout = cfg.get('Timeout', '30s') + deregister_after = cfg.get('Deregister', '60s') + if self.service_id is None: + self.service_id = f'{service_name}-{service_address}-{service_port}' + try: + check = consul.Check().tcp(host=service_address, port=service_port, interval=check_interval, timeout=timeout, + deregister=deregister_after) + if self.client is None: + self.client = consul.Consul(**env_vars) + self.client.agent.service.register( + name=service_name, + service_id=self.service_id, + address=service_address, + port=service_port, + check=check + ) + except Exception as e: + print(e) + + def deregister_service(self): + self.client.agent.service.deregister(self.service_id) diff --git a/application/libs/flask_loguru/logger.py b/application/libs/flask_loguru/logger.py index 95a369b978265865acf830e3a341e2f09bbde543..591d83a528099752de6ab42b89aa10bd75ef8a84 100644 --- a/application/libs/flask_loguru/logger.py +++ b/application/libs/flask_loguru/logger.py @@ -74,7 +74,12 @@ class FlaskLoguru: """ Log the start of the request. """ - data = dict(url=request.url, method=request.method, ip=request.remote_addr, request_body=request.get_json()) + if request.method == 'POST': + data = dict(url=request.url, method=request.method, ip=request.remote_addr, + request_body=request.get_json()) + else: + data = dict(url=request.url, method=request.method, ip=request.remote_addr, + request_body="") g.logger = logger.bind(data=data) g.logger.info('Request started') diff --git a/application/utils/elasticsearch/elasticsearch.py b/application/utils/elasticsearch/elasticsearch.py index 12ee7d857f235e1849ce226732a86883470015f6..43fa4ebfcd7809ef1cab24be953041684f406d58 100644 --- a/application/utils/elasticsearch/elasticsearch.py +++ b/application/utils/elasticsearch/elasticsearch.py @@ -80,3 +80,8 @@ class ElasticsearchUtil: :return: data after search by scroll """ return es.scroll(scroll_id=_id, scroll=_scroll, request_timeout=30) + + @classmethod + def insert_data(cls, index: str, doc_type: str, data: dict) -> None: + # 插入数据 + es.index(index=index, doc_type=doc_type, body=data) diff --git a/application/utils/parse/parse.py b/application/utils/parse/parse.py index ec77c640d7901320cd8d69da0eda4ea08c1517d3..d6fedba252ab562ed31d0856dfa5944a4a4d33a2 100644 --- a/application/utils/parse/parse.py +++ b/application/utils/parse/parse.py @@ -16,7 +16,6 @@ import re from typing import Generator import dateutil.parser -from loguru import logger class ParseUtil: @@ -43,7 +42,7 @@ class ParseUtil: res = self.parse(_source) if not res: # 获取 uuid - uuid = _source.get("uuid", "") or _source.get("requestId", "") + uuid = _source.get('uuid', '') or _source.get('requestId', '') print(f'missing uuid of data: {uuid}') continue yield res @@ -58,145 +57,145 @@ class ParseUtil: # 捕获异常 try: # 获取 date_time - date_time = dic.get("time", "") + date_time = dic.get('time', '') # 条件判断 if not date_time: # 获取 time_stamp - time_stamp = dic.get("@timestamp", "").split(".")[0] + time_stamp = dic.get('@timestamp', '').split('.')[0] # 条件判断 if isinstance(time_stamp, str): # 获取 date_time date_time = (dateutil.parser.isoparse(time_stamp) - + datetime.timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S") + + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') elif isinstance(time_stamp, datetime.datetime): # 获取 date_time date_time = (time_stamp + datetime.timedelta(hours=8) - ).strftime("%Y-%m-%d %H:%M:%S") + ).strftime('%Y-%m-%d %H:%M:%S') else: # 输出 log 信息 - print("The error: parse time_stamp failed ...") + print('The error: parse time_stamp failed ...') # 获取 uuid - uuid = dic.get("uuid", "") or dic.get("requestId", "") + uuid = dic.get('uuid', '') or dic.get('requestId', '') # 获取 msg - msg = dic.get("message", "") + msg = dic.get('message', '') # 条件判断 if msg: # 获取 req - req = json.loads(msg).get("field", {}).get( - "data", {}).get("request", {}) + req = json.loads(msg).get('field', {}).get( + 'data', {}).get('request', {}) # 获取 resp - resp = json.loads(msg).get("field", {}).get( - "data", {}).get("response", {}) + resp = json.loads(msg).get('field', {}).get( + 'data', {}).get('response', {}) # 获取 data - data = json.loads(msg).get("field", {}).get("data", {}) + data = json.loads(msg).get('field', {}).get('data', {}) else: # 获取 req - req = dic.get("field", {}).get( - "data", {}).get("request", {}) + req = dic.get('field', {}).get( + 'data', {}).get('request', {}) # 获取 resp - resp = dic.get("field", {}).get( - "data", {}).get("response", {}) + resp = dic.get('field', {}).get( + 'data', {}).get('response', {}) # 获取 data - data = dic.get("field", {}).get("data", {}) + data = dic.get('field', {}).get('data', {}) # 获取 mac_voice - mac_voice = req.get("macVoice", "") + mac_voice = req.get('macVoice', '') # 获取 mac_wifi - mac_wifi = req.get("macWifi", "") + mac_wifi = req.get('macWifi', '') # 获取 query - query = req.get("query", "") + query = req.get('query', '') # 获取 mid - mid = req.get("mid", "") + mid = req.get('mid', '') # 获取 mid_type - mid_type = req.get("midType", "") + mid_type = req.get('midType', '') # 获取 req_param - req_param = req.get("requestBody", {}).get( - "reqParam", {}) or req.get("reqParam", {}) + req_param = req.get('requestBody', {}).get( + 'reqParam', {}) or req.get('reqParam', {}) # 获取 common - common = req_param.get("common", {}) or req_param.get("Common", {}) + common = req_param.get('common', {}) or req_param.get('Common', {}) # 获取 request_id - request_id = common.get("requestId", "") or common.get( - "RequestId", "") or "" + request_id = common.get('requestId', '') or common.get( + 'RequestId', '') or '' # 获取 remote_ip - remote_ip = common.get("remoteIP", "") or common.get( - "RemoteIP", "") or common.get("remoteIp", "") or "" + remote_ip = common.get('remoteIP', '') or common.get( + 'RemoteIP', '') or common.get('remoteIp', '') or '' # 获取 app_key - app_key = common.get("appKey", "") or common.get( - "AppKey", "") or "" + app_key = common.get('appKey', '') or common.get( + 'AppKey', '') or '' # 获取 ud_id - ud_id = common.get("udid", "") or common.get("Udid", "") or "" + ud_id = common.get('udid', '') or common.get('Udid', '') or '' # 获取 user_id - user_id = common.get("userId", "") or common.get( - "UserId", "") or "" + user_id = common.get('userId', '') or common.get( + 'UserId', '') or '' # 获取 service_type service_type = str(common.get( - "serviceType", "").replace("asr", "")) or 0 + 'serviceType', '').replace('asr', '')) or 0 # 声明 voice_portal voice_portal = 1 # 声明 emotion_class - emotion_class = "" + emotion_class = '' # 获取 nlu_ret - nlu_ret = req_param.get("nluRet", {}) + nlu_ret = req_param.get('nluRet', {}) # 获取 yzs_nlu_time - yzs_nlu_time = nlu_ret.get("nluProcessTime", "") + yzs_nlu_time = nlu_ret.get('nluProcessTime', '') # 获取 yzs_general yzs_general = json.dumps(nlu_ret.get( - "general", {}), ensure_ascii=False).replace("{}", "") + 'general', {}), ensure_ascii=False).replace('{}', '') # 获取 yzs_intent - yzs_intent = json.dumps(nlu_ret.get("semantic", {}).get( - "intent", []), ensure_ascii=False).replace("[]", "") + yzs_intent = json.dumps(nlu_ret.get('semantic', {}).get( + 'intent', []), ensure_ascii=False).replace('[]', '') # 条件判断 if resp: # 获取 header - header = resp.get("header", {}) + header = resp.get('header', {}) # 获取 semantic - semantic = header.get("semantic", {}) + semantic = header.get('semantic', {}) or resp.get('semantic', {}) # 获取 code - code = semantic.get("code", 0) + code = semantic.get('code', 0) # 获取 terminal_domain - terminal_domain = semantic.get("domain", "") or resp.get( - "semantic", {}).get("service", "") + terminal_domain = semantic.get('domain', '') or semantic.get('service', '') # 获取 terminal_intent - terminal_intent = semantic.get("intent", "") or resp.get( - "semantic", {}).get("action", "") + terminal_intent = semantic.get('intent', '') or semantic.get('action', '') # 获取 skill_id - skill_id = semantic.get("skill_id", "") + skill_id = semantic.get('skill_id', '') # 获取 response_text - response_text = resp.get("response_text", "") + response_text = resp.get('response_text', '') # 获取 slots - if query == "空调调到26度": - logger.info(semantic.get("params", "")) - slots = semantic.get("params", "") - if slots == "": - if len(semantic.get("slots", [])) > 0: - slots = json.dumps(semantic.get("slots"), ensure_ascii=False) + slots = semantic.get('params', '') # 条件判断 - if terminal_domain == "chat": + if slots != '': + slots = json.dumps(slots, ensure_ascii=False) + else: + if len(semantic.get('slots', [])) > 0: + slots = json.dumps(semantic.get('slots'), ensure_ascii=False) + + # 条件判断 + if terminal_domain == 'chat': if skill_id: # 条件判断 if skill_id in self.mapping_list: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = skill_id.split( - ".") + '.') else: # 获取 reg_num_list - reg_num_list = re.findall(r"(\d+)", skill_id, re.S) + reg_num_list = re.findall(r'(\d+)', skill_id, re.S) # 条件判断 if not reg_num_list: # 条件判断 - if "." in skill_id: + if '.' in skill_id: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = skill_id.split( - ".") + '.') else: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = skill_id, skill_id else: # 获取 distribution_gree_domain, distribution_gree_intent - distribution_gree_domain, distribution_gree_intent = "", "" + distribution_gree_domain, distribution_gree_intent = '', '' else: # 获取 distribution_gree_domain, distribution_gree_intent - distribution_gree_domain, distribution_gree_intent = "", "" + distribution_gree_domain, distribution_gree_intent = '', '' else: # 获取 distribution_gree_domain, distribution_gree_intent distribution_gree_domain, distribution_gree_intent = terminal_domain, terminal_intent @@ -204,32 +203,32 @@ class ParseUtil: # 声明 code code = -3 # 声明 terminal_domain - terminal_domain = "" + terminal_domain = '' # 声明 intent - terminal_intent = "" + terminal_intent = '' # 声明 skill_id - skill_id = "" + skill_id = '' # 声明 response_text - response_text = "" + response_text = '' # 声明 slots - slots = "" + slots = '' # 获取 distribution_gree_domain, distribution_gree_intent - distribution_gree_domain, distribution_gree_intent = "", "" + distribution_gree_domain, distribution_gree_intent = '', '' # 获取 service_nlu service_nlu = data.get( - "serviceNLU", "") or data.get("serverNLU", "") + 'serviceNLU', '') or data.get('serverNLU', '') # 获取 cost_time - cost_time = data.get("cost_time", {}).get("return", "") or data.get( - "cost_time", {}).get("save_records", "") + cost_time = data.get('cost_time', {}).get('return', '') or data.get( + 'cost_time', {}).get('save_records', '') # 获取 get_body_time - get_body_time = data.get("cost_time", {}).get("get_body", "") + get_body_time = data.get('cost_time', {}).get('get_body', '') # 获取 gree_nlu_time - gree_nlu_time = data.get("cost_time", {}).get("gree_nlu", "") + gree_nlu_time = data.get('cost_time', {}).get('gree_nlu', '') # 获取 tencent_nlu_time - tencent_nlu_time = data.get("cost_time", {}).get("tencent_nlu", "") + tencent_nlu_time = data.get('cost_time', {}).get('tencent_nlu', '') # 获取 get_homeid_time - get_homeid_time = data.get("cost_time", {}).get("get_homeid", "") + get_homeid_time = data.get('cost_time', {}).get('get_homeid', '') # 条件判断 if gree_nlu_time: # 条件判断 @@ -247,14 +246,14 @@ class ParseUtil: # 条件判断 if get_homeid_time: # 条件判断 - if gree_nlu_time != "" or tencent_nlu_time != "": + if gree_nlu_time != '' or tencent_nlu_time != '': # 条件判断 - if gree_nlu_time == "": + if gree_nlu_time == '': # 获取 get_homeid_time get_homeid_time = str( float(get_homeid_time) - float(tencent_nlu_time)) # 条件判断 - elif tencent_nlu_time == "": + elif tencent_nlu_time == '': # 获取 get_homeid_time get_homeid_time = str( float(get_homeid_time) - float(gree_nlu_time)) @@ -264,42 +263,42 @@ class ParseUtil: float(tencent_nlu_time))) except Exception as e: # 输出 log 信息 - print(f"The error: {e}") + print(f'The error: {e}') else: # 获取 result result = { - "date_time": date_time, - "uuid": uuid, - "mid": mid, - "mid_type": mid_type, - "mac_wifi": mac_wifi, - "mac_voice": mac_voice, - "code": code, - "query": query, - "terminal_domain": terminal_domain, - "terminal_intent": terminal_intent, - "distribution_gree_domain": distribution_gree_domain, - "distribution_gree_intent": distribution_gree_intent, - "response_text": response_text, - "emotion_class": emotion_class, - "skill_id": skill_id, - "voice_portal": voice_portal, - "service_nlu": service_nlu, - "service_type": service_type, - "slots": slots, - "yzs_request_id": request_id, - "yzs_remote_ip": remote_ip, - "yzs_app_key": app_key, - "yzs_ud_id": ud_id, - "yzs_user_id": user_id, - "yzs_intent": yzs_intent, - "yzs_general": yzs_general, - "yzs_nlu_time": yzs_nlu_time, - "get_body_time": get_body_time, - "gree_nlu_time": gree_nlu_time, - "tencent_nlu_time": tencent_nlu_time, - "get_homeid_time": get_homeid_time, - "cost_time": cost_time + 'date_time': date_time, + 'uuid': uuid, + 'mid': mid, + 'mid_type': mid_type, + 'mac_wifi': mac_wifi, + 'mac_voice': mac_voice, + 'code': code, + 'query': query, + 'terminal_domain': terminal_domain, + 'terminal_intent': terminal_intent, + 'distribution_gree_domain': distribution_gree_domain, + 'distribution_gree_intent': distribution_gree_intent, + 'response_text': response_text, + 'emotion_class': emotion_class, + 'skill_id': skill_id, + 'voice_portal': voice_portal, + 'service_nlu': service_nlu, + 'service_type': service_type, + 'slots': slots, + 'yzs_request_id': request_id, + 'yzs_remote_ip': remote_ip, + 'yzs_app_key': app_key, + 'yzs_ud_id': ud_id, + 'yzs_user_id': user_id, + 'yzs_intent': yzs_intent, + 'yzs_general': yzs_general, + 'yzs_nlu_time': yzs_nlu_time, + 'get_body_time': get_body_time, + 'gree_nlu_time': gree_nlu_time, + 'tencent_nlu_time': tencent_nlu_time, + 'get_homeid_time': get_homeid_time, + 'cost_time': cost_time } # 返回 result return result diff --git a/application/views/log/log.py b/application/views/log/log.py index 4e05da8e568dc40d5299b2068c9f47d048bd0aea..6fe5c2947ac0cfa8aef318da89121f5c45a87e4d 100644 --- a/application/views/log/log.py +++ b/application/views/log/log.py @@ -18,6 +18,7 @@ from pydantic import ValidationError from application.schemas import ParseLogRequestItem from application.services import LogService + log_api = Blueprint('log_api', __name__) api_logs = Api(log_api) @@ -38,3 +39,4 @@ def parse(): return jsonify( search_total=total ), 200 +