From 1dfe4c1d5b64a8c6434476c263e6f6ca27217b9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B4=94=E4=B8=BA=E4=B9=8B?= <560397@gree.com.cn> Date: Mon, 20 Nov 2023 18:53:44 +0800 Subject: [PATCH] Update project --- app.py | 2 +- application/config/config.yaml | 2 +- application/dao/log.py | 34 +++++++++++++++++-- application/extensions/__init__.py | 13 +++---- application/extensions/init_bcrypt.py | 26 -------------- application/extensions/init_marshmallow.py | 27 --------------- application/extensions/init_migrate.py | 28 --------------- .../libs/flask_elasticsearch/elasticsearch.py | 1 - application/libs/tasks/task.py | 15 ++++++-- application/models/log.py | 21 ------------ .../utils/elasticsearch/elasticsearch.py | 2 +- 11 files changed, 50 insertions(+), 121 deletions(-) delete mode 100644 application/extensions/init_bcrypt.py delete mode 100644 application/extensions/init_marshmallow.py delete mode 100644 application/extensions/init_migrate.py diff --git a/app.py b/app.py index 00cfd94..2d71d37 100644 --- a/app.py +++ b/app.py @@ -24,7 +24,7 @@ def runserver(): # Run the Flask development server # The run() method starts the application's development server. # It's not meant to be used on production environment. - app.run(debug=True) + app.run(debug=True, use_reloader=False) if __name__ == '__main__': diff --git a/application/config/config.yaml b/application/config/config.yaml index ea90305..c4ca772 100644 --- a/application/config/config.yaml +++ b/application/config/config.yaml @@ -33,7 +33,7 @@ Logger: Retention: Scheduler: - Start: 2023-11-12 12:08:00 + Start: 2023-11-20 18:50:00 End: 2099-11-11 16:00:00 Timezone: Asia/Shanghai diff --git a/application/dao/log.py b/application/dao/log.py index 7a2111b..22628ef 100644 --- a/application/dao/log.py +++ b/application/dao/log.py @@ -10,12 +10,18 @@ # @Description : """ +from datetime import datetime +from typing import Generator from loguru import logger +from sqlalchemy import text from application.libs.helper import MySQLHelper from application.models import Log from application.utils import ElasticsearchUtil, ParseUtil +from application.extensions.init_sqlalchemy import db + +created_partitions = set() # Now it's a global variable class LogDao: @@ -85,7 +91,28 @@ class LogDao: return mapping_list @classmethod - def process_and_save_data(cls, lst: list, mapping_list: list): + def batch_save(cls, objects: Generator) -> None: + global created_partitions # Reference the global variable + for obj in objects: + log = Log(**obj) + # Convert string to datetime object + date_time_obj = datetime.strptime(log.date_time, '%Y-%m-%d %H:%M:%S') + partition_date = date_time_obj.strftime('%Y_%m') + partition_name = f'{log.__tablename__}_{partition_date}' + + if partition_name not in created_partitions: + db.session.execute(text(f""" + CREATE TABLE IF NOT EXISTS {partition_name} PARTITION OF {log.__tablename__} + FOR VALUES FROM ('{date_time_obj.strftime('%Y-%m-01')}') TO + ('{date_time_obj.strftime('%Y-%m-01')}'::date + interval '1 month'); + """)) + created_partitions.add(partition_name) # Remember that this partition has been created + + db.session.add(log) + db.session.commit() + + @classmethod + def process_and_save_data(cls, lst: list, mapping_list: list) -> None: """ Process the given list using the mapping list and save the result to the database. @@ -96,7 +123,8 @@ class LogDao: if not lst: return result_generator = ParseUtil(mapping_list=mapping_list).filter(lst) - _ = Log.batch_save(result_generator) + # 批量保存数据 + cls.batch_save(result_generator) @classmethod def parse(cls, start: str, end: str, index: str, sql: str, options: dict) -> int: @@ -143,7 +171,7 @@ class LogDao: scroll_id = data.get('_scroll_id') try: - for _ in range(0, int(total / dsl.get('size', 10000) + 1)): + for _ in range(0, int(total / dsl.get('size', 5000) + 1)): # Get more data from Elasticsearch using scroll searching. res = cls.get_data_from_es(sid=scroll_id) lst = res.get('hits').get('hits') diff --git a/application/extensions/__init__.py b/application/extensions/__init__.py index d838186..202b955 100644 --- a/application/extensions/__init__.py +++ b/application/extensions/__init__.py @@ -12,26 +12,21 @@ from flask import Flask + +from .init_apispec import init_apispec +from .init_apscheduler import init_tasks from .init_config import init_config from .init_cors import init_cors +from .init_elasticsearch import init_elasticsearch from .init_logger import init_logger from .init_sqlalchemy import init_database -from .init_bcrypt import init_bcrypt -from .init_migrate import init_migrate -from .init_apispec import init_apispec -from .init_marshmallow import init_marshmallow -from .init_elasticsearch import init_elasticsearch -from .init_apscheduler import init_tasks def init_plugs(app: Flask) -> None: init_config(app) init_logger(app) init_database(app) - init_bcrypt(app) - init_migrate(app) init_apispec(app) - init_marshmallow(app) init_elasticsearch(app) init_cors(app) init_tasks(app) diff --git a/application/extensions/init_bcrypt.py b/application/extensions/init_bcrypt.py deleted file mode 100644 index 34ade7c..0000000 --- a/application/extensions/init_bcrypt.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -# @Version : Python 3.11.4 -# @Software : Sublime Text 4 -# @Author : StudentCWZ -# @Email : StudentCWZ@outlook.com -# @Date : 2023/10/28 22:06 -# @File : init_bcrypt.py -# @Description : -""" - -from flask import Flask -from flask_bcrypt import Bcrypt - -bcrypt = Bcrypt() - - -def init_bcrypt(app: Flask) -> None: - """ - Initialize the bcrypt extension - - :param app: flask.Flask application instance - :return: None - """ - bcrypt.init_app(app) diff --git a/application/extensions/init_marshmallow.py b/application/extensions/init_marshmallow.py deleted file mode 100644 index 796e734..0000000 --- a/application/extensions/init_marshmallow.py +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -# @Version : Python 3.11.4 -# @Software : Sublime Text 4 -# @Author : StudentCWZ -# @Email : StudentCWZ@outlook.com -# @Date : 2023/10/29 12:36 -# @File : init_marshmallow.py -# @Description : -""" - -from flask import Flask - -from flask_marshmallow import Marshmallow - -ma = Marshmallow() - - -def init_marshmallow(app: Flask) -> None: - """ - Initialize the database extension - - :param app: flask.Flask application instance - :return: None - """ - ma.init_app(app) diff --git a/application/extensions/init_migrate.py b/application/extensions/init_migrate.py deleted file mode 100644 index 2f0457b..0000000 --- a/application/extensions/init_migrate.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -# @Version : Python 3.11.4 -# @Software : Sublime Text 4 -# @Author : StudentCWZ -# @Email : StudentCWZ@outlook.com -# @Date : 2023/10/29 01:19 -# @File : init_migrate.py -# @Description : -""" - -from flask import Flask -from flask_migrate import Migrate - -from application.extensions.init_sqlalchemy import db - -migrate = Migrate() - - -def init_migrate(app: Flask) -> None: - """ - Initialize the database extension - - :param app: flask.Flask application instance - :return: None - """ - migrate.init_app(app, db) diff --git a/application/libs/flask_elasticsearch/elasticsearch.py b/application/libs/flask_elasticsearch/elasticsearch.py index e0f5daf..20b497f 100644 --- a/application/libs/flask_elasticsearch/elasticsearch.py +++ b/application/libs/flask_elasticsearch/elasticsearch.py @@ -62,7 +62,6 @@ class FlaskElasticsearch(BaseFlaskElasticsearch): # Retry connection on failure for i in range(5): # Retry up to 5 times if ctx.elasticsearch.ping(): - logger.info('Connected to Elasticsearch') break else: logger.warning(f'Attempt {i + 1} to connect to Elasticsearch failed. Retrying...') diff --git a/application/libs/tasks/task.py b/application/libs/tasks/task.py index 8c3eac0..3841aa0 100644 --- a/application/libs/tasks/task.py +++ b/application/libs/tasks/task.py @@ -11,8 +11,9 @@ """ import datetime + +from application.dao import LogDao from application.extensions.init_sqlalchemy import db -from application.libs.helper import MySQLHelper def task(): @@ -21,7 +22,14 @@ def task(): :return: None """ + # end_stamp = datetime.datetime.now() + # start_stamp = end_stamp - datetime.timedelta(days=1) + # start = start_stamp.strftime('%Y-%m-%d %H:%M:%S') + # end = end_stamp.strftime('%Y-%m-%d %H:%M:%S') + start = "2021-11-08 00:00:00" + end = "2021-11-09 00:00:00" with db.app.app_context(): + index = db.app.config.Elasticsearch.Index cfg = db.app.config.ExtraDB options = { "host": cfg.Host, @@ -30,5 +38,6 @@ def task(): "database": cfg.DB, "port": cfg.Port, } - with MySQLHelper(**options) as helper: - print(helper.execute(cfg.Sql)) + sql = cfg.Sql + LogDao.parse(start, end, index, sql, options) + diff --git a/application/models/log.py b/application/models/log.py index 192b880..c5e6185 100644 --- a/application/models/log.py +++ b/application/models/log.py @@ -12,8 +12,6 @@ import os -from datetime import datetime -from sqlalchemy import text from application.extensions.init_sqlalchemy import db @@ -68,22 +66,3 @@ class Log(db.Model): continue _dic[key] = value return _dic - - @classmethod - def batch_save(cls, objects): - with db.session.begin_nested(): - for obj in objects: - log = Log(**obj) - # 将字符串转换为 datetime 对象 - date_time_obj = datetime.strptime(log.date_time, '%Y-%m-%d %H:%M:%S') - partition_date = date_time_obj.strftime('%Y_%m') - partition_name = f'{log.__tablename__}_{partition_date}' - - db.session.execute(text(f""" - CREATE TABLE IF NOT EXISTS {partition_name} PARTITION OF {log.__tablename__} - FOR VALUES FROM ('{date_time_obj.strftime('%Y-%m-01')}') TO - ('{date_time_obj.strftime('%Y-%m-01')}'::date + interval '1 month'); - """)) - db.session.add(log) - - db.session.commit() diff --git a/application/utils/elasticsearch/elasticsearch.py b/application/utils/elasticsearch/elasticsearch.py index fa0fecf..12ee7d8 100644 --- a/application/utils/elasticsearch/elasticsearch.py +++ b/application/utils/elasticsearch/elasticsearch.py @@ -17,7 +17,7 @@ from application.extensions.init_elasticsearch import es class ElasticsearchUtil: @classmethod - def dsl(cls, _start: str, _end: str, size=10000) -> dict: + def dsl(cls, _start: str, _end: str, size=5000) -> dict: """ Setting dsl -- GitLab