Commit 1b65ae3e authored by 崔为之's avatar 崔为之 💪🏽

Update project

parent af484b68
......@@ -38,9 +38,9 @@ Scheduler:
Timezone: Asia/Shanghai
ExtraDB:
Host: localhost
Host: 172.28.5.39
Port: 3306
User: root
Password: localhost123
DB: elp
Sql: select * from users;
Password: gree123
DB: corpora
Sql: select concat(scene.scene, '.', intent) from intents as i left join scene on scene.id=i.scene_id group by i.intent;
......@@ -51,4 +51,42 @@ def init_database(app: Flask) -> None:
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);
"""))
db.session.execute(text("""
CREATE TABLE IF NOT EXISTS logs (
id SERIAL NOT NULL,
date_time TIMESTAMP WITH TIME ZONE NOT NULL,
uuid varchar(100) DEFAULT NULL,
mid varchar(50) DEFAULT NULL,
mid_type varchar(50) DEFAULT NULL,
mac_wifi varchar(50) DEFAULT NULL,
mac_voice varchar(50) DEFAULT NULL,
code smallint DEFAULT NULL,
query varchar(255) DEFAULT NULL,
terminal_domain varchar(50) DEFAULT NULL,
terminal_intent varchar(50) DEFAULT NULL,
distribution_gree_domain varchar(50) DEFAULT NULL,
distribution_gree_intent varchar(50) DEFAULT NULL,
response_text text,
emotion_class varchar(50) DEFAULT NULL,
skill_id varchar(100) DEFAULT NULL,
voice_portal smallint NOT NULL,
service_nlu varchar(50) DEFAULT NULL,
service_type smallint NOT NULL,
slots text,
yzs_request_id varchar(50) DEFAULT NULL,
yzs_remote_ip varchar(15) DEFAULT NULL,
yzs_app_key varchar(50) DEFAULT NULL,
yzs_ud_id varchar(50) DEFAULT NULL,
yzs_user_id varchar(50) DEFAULT NULL,
yzs_intent text,
yzs_general text,
yzs_nlu_time varchar(20) DEFAULT NULL,
get_body_time varchar(20) DEFAULT NULL,
gree_nlu_time varchar(20) DEFAULT NULL,
get_homeid_time varchar(20) DEFAULT NULL,
tencent_nlu_time varchar(20) DEFAULT NULL,
cost_time varchar(20) DEFAULT NULL,
PRIMARY KEY (id, date_time)
) PARTITION BY RANGE (date_time);
"""))
db.session.commit()
......@@ -11,3 +11,4 @@
"""
from .user import User
from .log import Log
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/19 14:59
# @File : log.py
# @Description :
"""
import os
from datetime import datetime
from sqlalchemy import text
from application.extensions.init_sqlalchemy import db
class Log(db.Model):
"""Basic user model"""
__tablename__ = os.environ.get('TableName', 'logs')
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
date_time = db.Column(db.DateTime, nullable=False)
uuid = db.Column(db.String(80))
mid = db.Column(db.String(80))
mid_type = db.Column(db.String(80))
mac_wifi = db.Column(db.String(80))
mac_voice = db.Column(db.String(80))
code = db.Column(db.SmallInteger)
query = db.Column(db.String(80))
terminal_domain = db.Column(db.String(80))
terminal_intent = db.Column(db.String(80))
distribution_gree_domain = db.Column(db.String(80))
distribution_gree_intent = db.Column(db.String(80))
response_text = db.Column(db.Text)
emotion_class = db.Column(db.String(80))
skill_id = db.Column(db.String(80))
voice_portal = db.Column(db.SmallInteger)
service_nlu = db.Column(db.String(80))
service_type = db.Column(db.SmallInteger)
slots = db.Column(db.Text)
yzs_request_id = db.Column(db.String(80))
yzs_remote_ip = db.Column(db.String(80))
yzs_app_key = db.Column(db.String(80))
yzs_ud_id = db.Column(db.String(80))
yzs_user_id = db.Column(db.String(80))
yzs_intent = db.Column(db.Text)
yzs_general = db.Column(db.Text)
yzs_nlu_time = db.Column(db.String(80))
get_body_time = db.Column(db.String(80))
gree_nlu_time = db.Column(db.String(80))
get_homeid_time = db.Column(db.String(80))
tencent_nlu_time = db.Column(db.String(80))
cost_time = db.Column(db.String(80))
def __repr__(self) -> str:
return f'<Log {self.uuid}>'
def to_dict(self) -> dict:
"""object to dict"""
_dic = {}
for key, value in self.__dict__.items():
if str(key).startswith("_"):
# 前缀带下划线不要
continue
_dic[key] = value
return _dic
def save(self):
partition_date = self.date_time.strftime('%Y_%m')
partition_name = f'{self.__tablename__}_{partition_date}'
with db.session.begin_nested():
db.session.execute(text(f"""
CREATE TABLE IF NOT EXISTS {partition_name} PARTITION OF {self.__tablename__}
FOR VALUES FROM ('{self.date_time.strftime('%Y-%m-01')}') TO
('{self.date_time.strftime('%Y-%m-01')}'::date + interval '1 month');
"""))
db.session.add(self)
db.session.commit()
@classmethod
def batch_save(cls, objects):
with db.session.begin_nested():
for obj in objects:
log = Log(**obj)
# 将字符串转换为 datetime 对象
date_time_obj = datetime.strptime(log.date_time, '%Y-%m-%d %H:%M:%S')
partition_date = date_time_obj.strftime('%Y_%m')
partition_name = f'{log.__tablename__}_{partition_date}'
db.session.execute(text(f"""
CREATE TABLE IF NOT EXISTS {partition_name} PARTITION OF {log.__tablename__}
FOR VALUES FROM ('{date_time_obj.strftime('%Y-%m-01')}') TO
('{date_time_obj.strftime('%Y-%m-01')}'::date + interval '1 month');
"""))
db.session.add(log)
db.session.commit()
......@@ -12,3 +12,4 @@
from .user import CreateUserItem
from .user import UserSchema
from .request import ParseLogRequestItem
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/19 16:12
# @File : __init__.py.py
# @Description :
"""
from .log import ParseLogRequestItem
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/19 16:12
# @File : log.py
# @Description :
"""
from pydantic import BaseModel
from datetime import datetime
class ParseLogRequestItem(BaseModel):
start: str
end: str
......@@ -11,3 +11,4 @@
"""
from .user import UserService
from .log import LogService
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/19 16:02
# @File : log.py
# @Description :
"""
from flask import current_app
from loguru import logger
from application.utils import ElasticsearchUtil
from application.libs.helper import MySQLHelper
from application.utils import ParseUtil
from application.models import Log
class LogService:
@classmethod
def parse(cls, item) -> int:
start_date = item.start
end_date = item.end
es_index = current_app.config.Elasticsearch.Index
mysql_config = current_app.config.ExtraDB
sql = mysql_config.Sql
cfg = {
'host': mysql_config.Host,
'user': mysql_config.User,
'password': mysql_config.Password,
'db': mysql_config.DB,
'port': mysql_config.Port,
}
logger.debug(
f'The interval of time is between {start_date} and {end_date} ...')
dsl = ElasticsearchUtil.dsl(start_date, end_date, size=2500)
data = ElasticsearchUtil.search(es_index, dsl)
# 获取 mdata
mdata = data.get('hits').get('hits')
if not mdata:
logger.error('the mdata is an empty list ...')
raise SystemError('the mdata is an empty list ...')
# 获取 total
total = data.get('hits').get('total').get('value')
logger.debug(f'The numbers of data by searching data from ES: {total}')
with MySQLHelper(**cfg) as helper:
result = helper.execute(sql)
mapping_list = [item[0] for item in result]
result = ParseUtil(mapping_list=mapping_list).filter(mdata)
logger.debug('The first part of data is inserting ...')
_ = Log.batch_save(result)
logger.debug('The inserting of the first part data finished!')
logger.debug('The scrolling part of data is inserting ...')
# 获取 scroll_id
scroll_id = data.get('_scroll_id')
try:
for _ in range(0, int(total / dsl.get('size', 10000) + 1)):
res = ElasticsearchUtil.scroll_search(scroll_id)
lst = res.get('hits').get('hits')
if not lst:
continue
result_generator = ParseUtil(
mapping_list=mapping_list).filter(lst)
_ = Log.batch_save(result_generator)
except Exception as e:
# 输出 log 信息
logger.error(f'The error: {e}')
raise SystemError()
else:
logger.debug('The process of inserting scrolling part data succeed!')
finally:
logger.debug('The inserting of the scrolling part data finished!')
return total
......@@ -12,3 +12,4 @@
from .dsn import DatabaseUri, RedisUri
from .elasticsearch import ElasticsearchUtil
from .parse import ParseUtil
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/19 17:35
# @File : __init__.py.py
# @Description :
"""
from .parse import ParseUtil
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/19 17:35
# @File : parse.py
# @Description :
"""
import datetime
import json
import re
from typing import Generator
import dateutil.parser
from loguru import logger
class ParseUtil:
def __init__(self, mapping_list: list):
self.mapping_list = mapping_list
def filter(self, data: list) -> Generator:
"""
Filter logs
:param data: logs data
:return: Generator
"""
for _index, _data in enumerate(data):
_source = _data.get('_source', {})
if not _source:
print('the field of _source is not in data ...')
continue
# _tag = _source.get('tag', '')
# if _tag != 'global':
# continue
# 获取 res
res = self.parse(_source)
if not res:
# 获取 uuid
uuid = _source.get("uuid", "") or _source.get("requestId", "")
print(f'missing uuid of data: {uuid}')
continue
yield res
def parse(self, dic: dict) -> dict:
"""
Parse logs
:param dic: logs dict before parsing logs
:return: dic
"""
# 捕获异常
try:
# 获取 date_time
date_time = dic.get("time", "")
# 条件判断
if not date_time:
# 获取 time_stamp
time_stamp = dic.get("@timestamp", "").split(".")[0]
# 条件判断
if isinstance(time_stamp, str):
# 获取 date_time
date_time = (dateutil.parser.isoparse(time_stamp)
+ datetime.timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(time_stamp, datetime.datetime):
# 获取 date_time
date_time = (time_stamp + datetime.timedelta(hours=8)
).strftime("%Y-%m-%d %H:%M:%S")
else:
# 输出 log 信息
print("The error: parse time_stamp failed ...")
# 获取 uuid
uuid = dic.get("uuid", "") or dic.get("requestId", "")
# 获取 msg
msg = dic.get("message", "")
# 条件判断
if msg:
# 获取 req
req = json.loads(msg).get("field", {}).get(
"data", {}).get("request", {})
# 获取 resp
resp = json.loads(msg).get("field", {}).get(
"data", {}).get("response", {})
# 获取 data
data = json.loads(msg).get("field", {}).get("data", {})
else:
# 获取 req
req = dic.get("field", {}).get(
"data", {}).get("request", {})
# 获取 resp
resp = dic.get("field", {}).get(
"data", {}).get("response", {})
# 获取 data
data = dic.get("field", {}).get("data", {})
# 获取 mac_voice
mac_voice = req.get("macVoice", "")
# 获取 mac_wifi
mac_wifi = req.get("macWifi", "")
# 获取 query
query = req.get("query", "")
# 获取 mid
mid = req.get("mid", "")
# 获取 mid_type
mid_type = req.get("midType", "")
# 获取 req_param
req_param = req.get("requestBody", {}).get(
"reqParam", {}) or req.get("reqParam", {})
# 获取 common
common = req_param.get("common", {}) or req_param.get("Common", {})
# 获取 request_id
request_id = common.get("requestId", "") or common.get(
"RequestId", "") or ""
# 获取 remote_ip
remote_ip = common.get("remoteIP", "") or common.get(
"RemoteIP", "") or common.get("remoteIp", "") or ""
# 获取 app_key
app_key = common.get("appKey", "") or common.get(
"AppKey", "") or ""
# 获取 ud_id
ud_id = common.get("udid", "") or common.get("Udid", "") or ""
# 获取 user_id
user_id = common.get("userId", "") or common.get(
"UserId", "") or ""
# 获取 service_type
service_type = str(common.get(
"serviceType", "").replace("asr", "")) or 0
# 声明 voice_portal
voice_portal = 1
# 声明 emotion_class
emotion_class = ""
# 获取 nlu_ret
nlu_ret = req_param.get("nluRet", {})
# 获取 yzs_nlu_time
yzs_nlu_time = nlu_ret.get("nluProcessTime", "")
# 获取 yzs_general
yzs_general = json.dumps(nlu_ret.get(
"general", {}), ensure_ascii=False).replace("{}", "")
# 获取 yzs_intent
yzs_intent = json.dumps(nlu_ret.get("semantic", {}).get(
"intent", []), ensure_ascii=False).replace("[]", "")
# 条件判断
if resp:
# 获取 header
header = resp.get("header", {})
# 获取 semantic
semantic = header.get("semantic", {})
# 获取 code
code = semantic.get("code", 0)
# 获取 terminal_domain
terminal_domain = semantic.get("domain", "") or resp.get(
"semantic", {}).get("service", "")
# 获取 terminal_intent
terminal_intent = semantic.get("intent", "") or resp.get(
"semantic", {}).get("action", "")
# 获取 skill_id
skill_id = semantic.get("skill_id", "")
# 获取 response_text
response_text = resp.get("response_text", "")
# 获取 slots
if query == "空调调到26度":
logger.info(semantic.get("params", ""))
slots = semantic.get("params", "")
if slots == "":
if len(semantic.get("slots", [])) > 0:
slots = json.dumps(semantic.get("slots"), ensure_ascii=False)
# 条件判断
if terminal_domain == "chat":
if skill_id:
# 条件判断
if skill_id in self.mapping_list:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = skill_id.split(
".")
else:
# 获取 reg_num_list
reg_num_list = re.findall(r"(\d+)", skill_id, re.S)
# 条件判断
if not reg_num_list:
# 条件判断
if "." in skill_id:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = skill_id.split(
".")
else:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = skill_id, skill_id
else:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = "", ""
else:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = "", ""
else:
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = terminal_domain, terminal_intent
else:
# 声明 code
code = -3
# 声明 terminal_domain
terminal_domain = ""
# 声明 intent
terminal_intent = ""
# 声明 skill_id
skill_id = ""
# 声明 response_text
response_text = ""
# 声明 slots
slots = ""
# 获取 distribution_gree_domain, distribution_gree_intent
distribution_gree_domain, distribution_gree_intent = "", ""
# 获取 service_nlu
service_nlu = data.get(
"serviceNLU", "") or data.get("serverNLU", "")
# 获取 cost_time
cost_time = data.get("cost_time", {}).get("return", "") or data.get(
"cost_time", {}).get("save_records", "")
# 获取 get_body_time
get_body_time = data.get("cost_time", {}).get("get_body", "")
# 获取 gree_nlu_time
gree_nlu_time = data.get("cost_time", {}).get("gree_nlu", "")
# 获取 tencent_nlu_time
tencent_nlu_time = data.get("cost_time", {}).get("tencent_nlu", "")
# 获取 get_homeid_time
get_homeid_time = data.get("cost_time", {}).get("get_homeid", "")
# 条件判断
if gree_nlu_time:
# 条件判断
if get_body_time:
# 获取 gree_nlu_time
gree_nlu_time = str(
float(gree_nlu_time) - float(get_body_time))
# 条件判断
if tencent_nlu_time:
# 条件判断
if get_body_time:
# 获取 tencent_nlu_time
tencent_nlu_time = str(
float(tencent_nlu_time) - float(get_body_time))
# 条件判断
if get_homeid_time:
# 条件判断
if gree_nlu_time != "" or tencent_nlu_time != "":
# 条件判断
if gree_nlu_time == "":
# 获取 get_homeid_time
get_homeid_time = str(
float(get_homeid_time) - float(tencent_nlu_time))
# 条件判断
elif tencent_nlu_time == "":
# 获取 get_homeid_time
get_homeid_time = str(
float(get_homeid_time) - float(gree_nlu_time))
else:
# 获取 get_homeid_time
get_homeid_time = str(float(get_homeid_time) - max(float(gree_nlu_time),
float(tencent_nlu_time)))
except Exception as e:
# 输出 log 信息
print(f"The error: {e}")
else:
# 获取 result
result = {
"date_time": date_time,
"uuid": uuid,
"mid": mid,
"mid_type": mid_type,
"mac_wifi": mac_wifi,
"mac_voice": mac_voice,
"code": code,
"query": query,
"terminal_domain": terminal_domain,
"terminal_intent": terminal_intent,
"distribution_gree_domain": distribution_gree_domain,
"distribution_gree_intent": distribution_gree_intent,
"response_text": response_text,
"emotion_class": emotion_class,
"skill_id": skill_id,
"voice_portal": voice_portal,
"service_nlu": service_nlu,
"service_type": service_type,
"slots": slots,
"yzs_request_id": request_id,
"yzs_remote_ip": remote_ip,
"yzs_app_key": app_key,
"yzs_ud_id": ud_id,
"yzs_user_id": user_id,
"yzs_intent": yzs_intent,
"yzs_general": yzs_general,
"yzs_nlu_time": yzs_nlu_time,
"get_body_time": get_body_time,
"gree_nlu_time": gree_nlu_time,
"tencent_nlu_time": tencent_nlu_time,
"get_homeid_time": get_homeid_time,
"cost_time": cost_time
}
# 返回 result
return result
......@@ -13,7 +13,9 @@
from flask import Flask
from .user import register_user_views
from .log import register_log_views
def init_views(app: Flask) -> None:
register_user_views(app)
register_log_views(app)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/19 16:05
# @File : __init__.py
# @Description :
"""
from flask import Flask
from .log import log_api
def register_log_views(app: Flask):
app.register_blueprint(log_api)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/11/19 16:05
# @File : log.py
# @Description :
"""
from flask import Blueprint, request, jsonify
from flask_restful import Api
from loguru import logger
from pydantic import ValidationError
from application.schemas import ParseLogRequestItem
from application.services import LogService
log_api = Blueprint('log_api', __name__)
api_logs = Api(log_api)
@log_api.route('/logs', methods=['POST'])
def parse():
json_data = request.get_json(force=True)
if not json_data:
logger.error('No input data provided')
return jsonify({"message": "No input data provided"}), 400
try:
item = ParseLogRequestItem(**json_data)
except ValidationError as e:
return jsonify({"message": "Invalid input data", "errors": e.errors()}), 400
total = LogService.parse(item)
return jsonify(
search_total=total
), 200
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment