Commit ff248f24 authored by 崔为之's avatar 崔为之 💪🏽

Update project

parent 0c960667
......@@ -16,18 +16,41 @@ from application import create_app
def runserver():
"""
This function creates a new instance of the Flask application and runs it.
It retrieves the host and port from the application configuration
and uses these values when running the app.
Please note that the Flask development server is not meant to be used in a
production environment. It's intended to be used in development, where you
can easily restart the server whenever you make changes to your code.
"""
# Create a new instance of the Flask application
# Create a new instance of the Flask application.
# The create_app() function should be defined elsewhere in your project,
# and should return an instance of the Flask class.
app = create_app()
# Run the Flask development server
# The run() method starts the application's development server.
# It's not meant to be used on production environment.
app.run(debug=True, use_reloader=False)
# Retrieve the server configuration from the Flask application config.
# The 'Server' key is assumed to be a dictionary containing the keys 'IP' and 'Port'.
cfg = app.config['Server']
# Retrieve the host and port from the config.
# If they're not present, default to '0.0.0.0' for the host and '5000' for the port.
host = cfg.get('IP', '0.0.0.0')
port = cfg.get('Port', 5000)
# Run the Flask development server.
app.run(host=host, port=port, use_reloader=False)
if __name__ == '__main__':
"""
This condition is true if the script is run directly.
If the script is imported from another script, the condition is false.
This allows you to use this script's functions without running the server
if you import this script from another script.
"""
# Only run the development server if the script is executed directly.
# This allows other scripts to import this file without starting the server.
runserver()
......@@ -44,5 +44,7 @@ def create_app() -> Flask:
# This could include data migration scripts, administrative tasks, etc.
init_script(app)
print(app.config)
# Return the fully initialized Flask application
return app
System:
Env: public
Server:
Name: elasticsearch-log-parse
Tag:
IP: 172.28.12.236
Port: 5000
Interval: 30s
Timeout: 30s
Deregister: 60s
Database:
Type: postgresql
......
......@@ -16,17 +16,21 @@ from flask import Flask
from .init_apispec import init_apispec
from .init_apscheduler import init_tasks
from .init_config import init_config
from .init_consul import init_consul
from .init_cors import init_cors
from .init_dotenv import init_dotenv
from .init_elasticsearch import init_elasticsearch
from .init_logger import init_logger
from .init_sqlalchemy import init_database
def init_plugs(app: Flask) -> None:
init_dotenv()
init_config(app)
init_logger(app)
init_database(app)
init_apispec(app)
init_elasticsearch(app)
init_cors(app)
init_consul(app)
init_tasks(app)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/21 10:01
# @File : init_consul.py
# @Description :
"""
from flask import Flask
from application.libs import FlaskConsulService
def init_consul(app: Flask) -> None:
FlaskConsulService(app)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/21 10:51
# @File : init_dotenv.py
# @Description :
"""
import os
from dotenv import load_dotenv
def init_dotenv() -> None:
"""
Initialize the dotenv extension
:return: None
"""
root_path = os.path.abspath(os.path.dirname(__file__)).split('extensions')[0]
flask_env_path = os.path.join(root_path, '.flaskenv')
if os.path.exists(flask_env_path):
load_dotenv(flask_env_path)
......@@ -14,3 +14,4 @@ from .config import ConsulConfig, LocalConfig
from .error import ConfigKeyError
from .flask_elasticsearch import FlaskElasticsearch
from .flask_loguru import FlaskLoguru
from .flask_consul import FlaskConsulService
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/21 09:10
# @File : __init__.py.py
# @Description :
"""
from .consul import FlaskConsulService
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/21 09:10
# @File : consul.py
# @Description :
"""
import atexit
import consul
from flask import current_app, Flask
from application.libs.helper import ConfigHelper
from application.libs.helper import EnvVarHelper
class FlaskConsulService:
def __init__(self, app=None):
self.app = app
self.client = None
self.service_id = None
if app is not None:
self.init_app(app)
def init_app(self, app: Flask):
env_vars, server_cfg = self.set_server_cfg(app)
self.register_service(env_vars, server_cfg)
atexit.register(self.deregister_service)
@staticmethod
def set_server_cfg(app: Flask) -> tuple:
# 实例化对象
eh = EnvVarHelper()
# 从环境变量获取 consul 参数
env_vars = eh.consul_vars
# 获取 consul key
_ = env_vars.pop('key', None)
# Initialize ConfigHelper
ch = ConfigHelper(app)
# Fetch the logging configuration from the app's config
cfg = ch.Server
return env_vars, cfg
def register_service(self, env_vars: dict, cfg: ConfigHelper):
service_name = cfg.get('Name', 'elasticsearch-log-parse')
service_address = cfg.get('IP', 'localhost')
service_port = cfg.get('Port', '5000')
check_interval = cfg.get('Interval', '30s')
timeout = cfg.get('Timeout', '30s')
deregister_after = cfg.get('Deregister', '60s')
if self.service_id is None:
self.service_id = f'{service_name}-{service_address}-{service_port}'
try:
check = consul.Check().tcp(host=service_address, port=service_port, interval=check_interval, timeout=timeout,
deregister=deregister_after)
if self.client is None:
self.client = consul.Consul(**env_vars)
self.client.agent.service.register(
name=service_name,
service_id=self.service_id,
address=service_address,
port=service_port,
check=check
)
except Exception as e:
print(e)
def deregister_service(self):
self.client.agent.service.deregister(self.service_id)
......@@ -74,7 +74,12 @@ class FlaskLoguru:
"""
Log the start of the request.
"""
data = dict(url=request.url, method=request.method, ip=request.remote_addr, request_body=request.get_json())
if request.method == 'POST':
data = dict(url=request.url, method=request.method, ip=request.remote_addr,
request_body=request.get_json())
else:
data = dict(url=request.url, method=request.method, ip=request.remote_addr,
request_body="")
g.logger = logger.bind(data=data)
g.logger.info('Request started')
......
......@@ -80,3 +80,8 @@ class ElasticsearchUtil:
:return: data after search by scroll
"""
return es.scroll(scroll_id=_id, scroll=_scroll, request_timeout=30)
@classmethod
def insert_data(cls, index: str, doc_type: str, data: dict) -> None:
# 插入数据
es.index(index=index, doc_type=doc_type, body=data)
......@@ -16,7 +16,6 @@ import re
from typing import Generator
import dateutil.parser
from loguru import logger
class ParseUtil:
......@@ -43,7 +42,7 @@ class ParseUtil:
res = self.parse(_source)
if not res:
# 获取 uuid
uuid = _source.get("uuid", "") or _source.get("requestId", "")
uuid = _source.get('uuid', '') or _source.get('requestId', '')
print(f'missing uuid of data: {uuid}')
continue
yield res
......@@ -58,145 +57,145 @@ class ParseUtil:
# 捕获异常
try:
# 获取 date_time
date_time = dic.get("time", "")
date_time = dic.get('time', '')
# 条件判断
if not date_time:
# 获取 time_stamp
time_stamp = dic.get("@timestamp", "").split(".")[0]
time_stamp = dic.get('@timestamp', '').split('.')[0]
# 条件判断
if isinstance(time_stamp, str):
# 获取 date_time
date_time = (dateutil.parser.isoparse(time_stamp)
+ datetime.timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
+ datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(time_stamp, datetime.datetime):
# 获取 date_time
date_time = (time_stamp + datetime.timedelta(hours=8)
).strftime("%Y-%m-%d %H:%M:%S")
).strftime('%Y-%m-%d %H:%M:%S')
else:
# 输出 log 信息
print("The error: parse time_stamp failed ...")
print('The error: parse time_stamp failed ...')
# 获取 uuid
uuid = dic.get("uuid", "") or dic.get("requestId", "")
uuid = dic.get('uuid', '') or dic.get('requestId', '')
# 获取 msg
msg = dic.get("message", "")
msg = dic.get('message', '')
# 条件判断
if msg:
# 获取 req
req = json.loads(msg).get("field", {}).get(
"data", {}).get("request", {})
req = json.loads(msg).get('field', {}).get(
'data', {}).get('request', {})
# 获取 resp
resp = json.loads(msg).get("field", {}).get(
"data", {}).get("response", {})
resp = json.loads(msg).get('field', {}).get(
'data', {}).get('response', {})
# 获取 data
data = json.loads(msg).get("field", {}).get("data", {})
data = json.loads(msg).get('field', {}).get('data', {})
else:
# 获取 req
req = dic.get("field", {}).get(
"data", {}).get("request", {})
req = dic.get('field', {}).get(
'data', {}).get('request', {})
# 获取 resp
resp = dic.get("field", {}).get(
"data", {}).get("response", {})
resp = dic.get('field', {}).get(
'data', {}).get('response', {})
# 获取 data
data = dic.get("field", {}).get("data", {})
data = dic.get('field', {}).get('data', {})
# 获取 mac_voice
mac_voice = req.get("macVoice", "")
mac_voice = req.get('macVoice', '')
# 获取 mac_wifi
mac_wifi = req.get("macWifi", "")
mac_wifi = req.get('macWifi', '')
# 获取 query
query = req.get("query", "")
query = req.get('query', '')
# 获取 mid
mid = req.get("mid", "")
mid = req.get('mid', '')
# 获取 mid_type
mid_type = req.get("midType", "")
mid_type = req.get('midType', '')
# 获取 req_param
req_param = req.get("requestBody", {}).get(
"reqParam", {}) or req.get("reqParam", {})
req_param = req.get('requestBody', {}).get(
'reqParam', {}) or req.get('reqParam', {})
# 获取 common
common = req_param.get("common", {}) or req_param.get("Common", {})
common = req_param.get('common', {}) or req_param.get('Common', {})
# 获取 request_id
request_id = common.get("requestId", "") or common.get(
"RequestId", "") or ""
request_id = common.get('requestId', '') or common.get(
'RequestId', '') or ''
# 获取 remote_ip
remote_ip = common.get("remoteIP", "") or common.get(
"RemoteIP", "") or common.get("remoteIp", "") or ""
remote_ip = common.get('remoteIP', '') or common.get(
'RemoteIP', '') or common.get('remoteIp', '') or ''
# 获取 app_key
app_key = common.get("appKey", "") or common.get(
"AppKey", "") or ""
app_key = common.get('appKey', '') or common.get(
'AppKey', '') or ''
# 获取 ud_id
ud_id = common.get("udid", "") or common.get("Udid", "") or ""
ud_id = common.get('udid', '') or common.get('Udid', '') or ''
# 获取 user_id
user_id = common.get("userId", "") or common.get(
"UserId", "") or ""
user_id = common.get('userId', '') or common.get(
'UserId', '') or ''
# 获取 service_type
service_type = str(common.get(
"serviceType", "").replace("asr", "")) or 0
'serviceType', '').replace('asr', '')) or 0
# 声明 voice_portal
voice_portal = 1
# 声明 emotion_class
emotion_class = ""
emotion_class = ''
# 获取 nlu_ret
nlu_ret = req_param.get("nluRet", {})
nlu_ret = req_param.get('nluRet', {})
# 获取 yzs_nlu_time
yzs_nlu_time = nlu_ret.get("nluProcessTime", "")
yzs_nlu_time = nlu_ret.get('nluProcessTime', '')
# 获取 yzs_general
yzs_general = json.dumps(nlu_ret.get(
"general", {}), ensure_ascii=False).replace("{}", "")
'general', {}), ensure_ascii=False).replace('{}', '')
# 获取 yzs_intent
yzs_intent = json.dumps(nlu_ret.get("semantic", {}).get(
"intent", []), ensure_ascii=False).replace("[]", "")
yzs_intent = json.dumps(nlu_ret.get('semantic', {}).get(
'intent', []), ensure_ascii=False).replace('[]', '')
# 条件判断
if resp:
# 获取 header
header = resp.get("header", {})
header = resp.get('header', {})
# 获取 semantic
semantic = header.get("semantic", {})
semantic = header.get('semantic', {}) or resp.get('semantic', {})
# 获取 code
code = semantic.get("code", 0)
code = semantic.get('code', 0)
# 获取 terminal_domain
terminal_domain = semantic.get("domain", "") or resp.get(
"semantic", {}).get("service", "")
terminal_domain = semantic.get('domain', '') or semantic.get('service', '')
# 获取 terminal_intent
terminal_intent = semantic.get("intent", "") or resp.get(
"semantic", {}).get("action", "")
terminal_intent = semantic.get('intent', '') or semantic.get('action', '')
# 获取 skill_id
skill_id = semantic.get("skill_id", "")
skill_id = semantic.get('skill_id', '')
# 获取 response_text
response_text = resp.get("response_text", "")
response_text = resp.get('response_text', '')
# 获取 slots
if query == "空调调到26度":
logger.info(semantic.get("params", ""))
slots = semantic.get("params", "")
if slots == "":
if len(semantic.get("slots", [])) > 0:
slots = json.dumps(semantic.get("slots"), ensure_ascii=False)
slots = semantic.get('params', '')
# 条件判断
if terminal_domain == "chat":
if slots != '':
slots = json.dumps(slots, ensure_ascii=False)
else:
if len(semantic.get('slots', [])) > 0:
slots = json.dumps(semantic.get('slots'), ensure_ascii=False)
# 条件判断
if terminal_domain == 'chat':
if skill_id:
# 条件判断
if skill_id in self.mapping_list:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = skill_id.split(
".")
'.')
else:
# 获取 reg_num_list
reg_num_list = re.findall(r"(\d+)", skill_id, re.S)
reg_num_list = re.findall(r'(\d+)', skill_id, re.S)
# 条件判断
if not reg_num_list:
# 条件判断
if "." in skill_id:
if '.' in skill_id:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = skill_id.split(
".")
'.')
else:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = skill_id, skill_id
else:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = "", ""
distribution_gree_domain, distribution_gree_intent = '', ''
else:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = "", ""
distribution_gree_domain, distribution_gree_intent = '', ''
else:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = terminal_domain, terminal_intent
......@@ -204,32 +203,32 @@ class ParseUtil:
# 声明 code
code = -3
# 声明 terminal_domain
terminal_domain = ""
terminal_domain = ''
# 声明 intent
terminal_intent = ""
terminal_intent = ''
# 声明 skill_id
skill_id = ""
skill_id = ''
# 声明 response_text
response_text = ""
response_text = ''
# 声明 slots
slots = ""
slots = ''
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = "", ""
distribution_gree_domain, distribution_gree_intent = '', ''
# 获取 service_nlu
service_nlu = data.get(
"serviceNLU", "") or data.get("serverNLU", "")
'serviceNLU', '') or data.get('serverNLU', '')
# 获取 cost_time
cost_time = data.get("cost_time", {}).get("return", "") or data.get(
"cost_time", {}).get("save_records", "")
cost_time = data.get('cost_time', {}).get('return', '') or data.get(
'cost_time', {}).get('save_records', '')
# 获取 get_body_time
get_body_time = data.get("cost_time", {}).get("get_body", "")
get_body_time = data.get('cost_time', {}).get('get_body', '')
# 获取 gree_nlu_time
gree_nlu_time = data.get("cost_time", {}).get("gree_nlu", "")
gree_nlu_time = data.get('cost_time', {}).get('gree_nlu', '')
# 获取 tencent_nlu_time
tencent_nlu_time = data.get("cost_time", {}).get("tencent_nlu", "")
tencent_nlu_time = data.get('cost_time', {}).get('tencent_nlu', '')
# 获取 get_homeid_time
get_homeid_time = data.get("cost_time", {}).get("get_homeid", "")
get_homeid_time = data.get('cost_time', {}).get('get_homeid', '')
# 条件判断
if gree_nlu_time:
# 条件判断
......@@ -247,14 +246,14 @@ class ParseUtil:
# 条件判断
if get_homeid_time:
# 条件判断
if gree_nlu_time != "" or tencent_nlu_time != "":
if gree_nlu_time != '' or tencent_nlu_time != '':
# 条件判断
if gree_nlu_time == "":
if gree_nlu_time == '':
# 获取 get_homeid_time
get_homeid_time = str(
float(get_homeid_time) - float(tencent_nlu_time))
# 条件判断
elif tencent_nlu_time == "":
elif tencent_nlu_time == '':
# 获取 get_homeid_time
get_homeid_time = str(
float(get_homeid_time) - float(gree_nlu_time))
......@@ -264,42 +263,42 @@ class ParseUtil:
float(tencent_nlu_time)))
except Exception as e:
# 输出 log 信息
print(f"The error: {e}")
print(f'The error: {e}')
else:
# 获取 result
result = {
"date_time": date_time,
"uuid": uuid,
"mid": mid,
"mid_type": mid_type,
"mac_wifi": mac_wifi,
"mac_voice": mac_voice,
"code": code,
"query": query,
"terminal_domain": terminal_domain,
"terminal_intent": terminal_intent,
"distribution_gree_domain": distribution_gree_domain,
"distribution_gree_intent": distribution_gree_intent,
"response_text": response_text,
"emotion_class": emotion_class,
"skill_id": skill_id,
"voice_portal": voice_portal,
"service_nlu": service_nlu,
"service_type": service_type,
"slots": slots,
"yzs_request_id": request_id,
"yzs_remote_ip": remote_ip,
"yzs_app_key": app_key,
"yzs_ud_id": ud_id,
"yzs_user_id": user_id,
"yzs_intent": yzs_intent,
"yzs_general": yzs_general,
"yzs_nlu_time": yzs_nlu_time,
"get_body_time": get_body_time,
"gree_nlu_time": gree_nlu_time,
"tencent_nlu_time": tencent_nlu_time,
"get_homeid_time": get_homeid_time,
"cost_time": cost_time
'date_time': date_time,
'uuid': uuid,
'mid': mid,
'mid_type': mid_type,
'mac_wifi': mac_wifi,
'mac_voice': mac_voice,
'code': code,
'query': query,
'terminal_domain': terminal_domain,
'terminal_intent': terminal_intent,
'distribution_gree_domain': distribution_gree_domain,
'distribution_gree_intent': distribution_gree_intent,
'response_text': response_text,
'emotion_class': emotion_class,
'skill_id': skill_id,
'voice_portal': voice_portal,
'service_nlu': service_nlu,
'service_type': service_type,
'slots': slots,
'yzs_request_id': request_id,
'yzs_remote_ip': remote_ip,
'yzs_app_key': app_key,
'yzs_ud_id': ud_id,
'yzs_user_id': user_id,
'yzs_intent': yzs_intent,
'yzs_general': yzs_general,
'yzs_nlu_time': yzs_nlu_time,
'get_body_time': get_body_time,
'gree_nlu_time': gree_nlu_time,
'tencent_nlu_time': tencent_nlu_time,
'get_homeid_time': get_homeid_time,
'cost_time': cost_time
}
# 返回 result
return result
......@@ -18,6 +18,7 @@ from pydantic import ValidationError
from application.schemas import ParseLogRequestItem
from application.services import LogService
log_api = Blueprint('log_api', __name__)
api_logs = Api(log_api)
......@@ -38,3 +39,4 @@ def parse():
return jsonify(
search_total=total
), 200
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment