diff --git a/app.py b/app.py index 00cfd94db261cecf8ee5d9a77f0662601ca4a6e1..2d71d37f4961e097bd62745c02e4e0d3c239ba85 100644 --- a/app.py +++ b/app.py @@ -24,7 +24,7 @@ def runserver(): # Run the Flask development server # The run() method starts the application's development server. # It's not meant to be used on production environment. - app.run(debug=True) + app.run(debug=True, use_reloader=False) if __name__ == '__main__': diff --git a/application/config/config.yaml b/application/config/config.yaml index ea90305a66f3ecdfbeaaff46e7390d9ee479ce7d..c4ca7721a05a3bf98a04c6e27919a31d32a3e56f 100644 --- a/application/config/config.yaml +++ b/application/config/config.yaml @@ -33,7 +33,7 @@ Logger: Retention: Scheduler: - Start: 2023-11-12 12:08:00 + Start: 2023-11-20 18:50:00 End: 2099-11-11 16:00:00 Timezone: Asia/Shanghai diff --git a/application/dao/log.py b/application/dao/log.py index 7a2111bb866fd031801bcce735cea2be523d7fc2..22628ef8d0d75900c1733be1d79cd0b4982d6a7d 100644 --- a/application/dao/log.py +++ b/application/dao/log.py @@ -10,12 +10,18 @@ # @Description : """ +from datetime import datetime +from typing import Generator from loguru import logger +from sqlalchemy import text from application.libs.helper import MySQLHelper from application.models import Log from application.utils import ElasticsearchUtil, ParseUtil +from application.extensions.init_sqlalchemy import db + +created_partitions = set() # Now it's a global variable class LogDao: @@ -85,7 +91,28 @@ class LogDao: return mapping_list @classmethod - def process_and_save_data(cls, lst: list, mapping_list: list): + def batch_save(cls, objects: Generator) -> None: + global created_partitions # Reference the global variable + for obj in objects: + log = Log(**obj) + # Convert string to datetime object + date_time_obj = datetime.strptime(log.date_time, '%Y-%m-%d %H:%M:%S') + partition_date = date_time_obj.strftime('%Y_%m') + partition_name = f'{log.__tablename__}_{partition_date}' + + if partition_name not in created_partitions: + db.session.execute(text(f""" + CREATE TABLE IF NOT EXISTS {partition_name} PARTITION OF {log.__tablename__} + FOR VALUES FROM ('{date_time_obj.strftime('%Y-%m-01')}') TO + ('{date_time_obj.strftime('%Y-%m-01')}'::date + interval '1 month'); + """)) + created_partitions.add(partition_name) # Remember that this partition has been created + + db.session.add(log) + db.session.commit() + + @classmethod + def process_and_save_data(cls, lst: list, mapping_list: list) -> None: """ Process the given list using the mapping list and save the result to the database. @@ -96,7 +123,8 @@ class LogDao: if not lst: return result_generator = ParseUtil(mapping_list=mapping_list).filter(lst) - _ = Log.batch_save(result_generator) + # 批量保存数据 + cls.batch_save(result_generator) @classmethod def parse(cls, start: str, end: str, index: str, sql: str, options: dict) -> int: @@ -143,7 +171,7 @@ class LogDao: scroll_id = data.get('_scroll_id') try: - for _ in range(0, int(total / dsl.get('size', 10000) + 1)): + for _ in range(0, int(total / dsl.get('size', 5000) + 1)): # Get more data from Elasticsearch using scroll searching. res = cls.get_data_from_es(sid=scroll_id) lst = res.get('hits').get('hits') diff --git a/application/extensions/__init__.py b/application/extensions/__init__.py index d838186448d16e8b4730a7e3a0796882532a4737..202b955842c3ab0294d754a53566a16b89d7b3b0 100644 --- a/application/extensions/__init__.py +++ b/application/extensions/__init__.py @@ -12,26 +12,21 @@ from flask import Flask + +from .init_apispec import init_apispec +from .init_apscheduler import init_tasks from .init_config import init_config from .init_cors import init_cors +from .init_elasticsearch import init_elasticsearch from .init_logger import init_logger from .init_sqlalchemy import init_database -from .init_bcrypt import init_bcrypt -from .init_migrate import init_migrate -from .init_apispec import init_apispec -from .init_marshmallow import init_marshmallow -from .init_elasticsearch import init_elasticsearch -from .init_apscheduler import init_tasks def init_plugs(app: Flask) -> None: init_config(app) init_logger(app) init_database(app) - init_bcrypt(app) - init_migrate(app) init_apispec(app) - init_marshmallow(app) init_elasticsearch(app) init_cors(app) init_tasks(app) diff --git a/application/extensions/init_bcrypt.py b/application/extensions/init_bcrypt.py deleted file mode 100644 index 34ade7c299954a962a8b721f1d6cf991661c6676..0000000000000000000000000000000000000000 --- a/application/extensions/init_bcrypt.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -# @Version : Python 3.11.4 -# @Software : Sublime Text 4 -# @Author : StudentCWZ -# @Email : StudentCWZ@outlook.com -# @Date : 2023/10/28 22:06 -# @File : init_bcrypt.py -# @Description : -""" - -from flask import Flask -from flask_bcrypt import Bcrypt - -bcrypt = Bcrypt() - - -def init_bcrypt(app: Flask) -> None: - """ - Initialize the bcrypt extension - - :param app: flask.Flask application instance - :return: None - """ - bcrypt.init_app(app) diff --git a/application/extensions/init_marshmallow.py b/application/extensions/init_marshmallow.py deleted file mode 100644 index 796e7349b99a89bc718ee2c16e51305561d76457..0000000000000000000000000000000000000000 --- a/application/extensions/init_marshmallow.py +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -# @Version : Python 3.11.4 -# @Software : Sublime Text 4 -# @Author : StudentCWZ -# @Email : StudentCWZ@outlook.com -# @Date : 2023/10/29 12:36 -# @File : init_marshmallow.py -# @Description : -""" - -from flask import Flask - -from flask_marshmallow import Marshmallow - -ma = Marshmallow() - - -def init_marshmallow(app: Flask) -> None: - """ - Initialize the database extension - - :param app: flask.Flask application instance - :return: None - """ - ma.init_app(app) diff --git a/application/extensions/init_migrate.py b/application/extensions/init_migrate.py deleted file mode 100644 index 2f0457bec33ea9acc9f95885a9fe9b2fde998969..0000000000000000000000000000000000000000 --- a/application/extensions/init_migrate.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -# @Version : Python 3.11.4 -# @Software : Sublime Text 4 -# @Author : StudentCWZ -# @Email : StudentCWZ@outlook.com -# @Date : 2023/10/29 01:19 -# @File : init_migrate.py -# @Description : -""" - -from flask import Flask -from flask_migrate import Migrate - -from application.extensions.init_sqlalchemy import db - -migrate = Migrate() - - -def init_migrate(app: Flask) -> None: - """ - Initialize the database extension - - :param app: flask.Flask application instance - :return: None - """ - migrate.init_app(app, db) diff --git a/application/libs/flask_elasticsearch/elasticsearch.py b/application/libs/flask_elasticsearch/elasticsearch.py index e0f5dafa6acc7502775ee3a05861301e293c731d..20b497f387adae0343a618807918eb2a466b543e 100644 --- a/application/libs/flask_elasticsearch/elasticsearch.py +++ b/application/libs/flask_elasticsearch/elasticsearch.py @@ -62,7 +62,6 @@ class FlaskElasticsearch(BaseFlaskElasticsearch): # Retry connection on failure for i in range(5): # Retry up to 5 times if ctx.elasticsearch.ping(): - logger.info('Connected to Elasticsearch') break else: logger.warning(f'Attempt {i + 1} to connect to Elasticsearch failed. Retrying...') diff --git a/application/libs/tasks/task.py b/application/libs/tasks/task.py index 8c3eac097cca83f77c9f8b0b5351e6eb5c352056..3841aa0a0dfca9f6529b5c4c9c16aefa0ad9dad7 100644 --- a/application/libs/tasks/task.py +++ b/application/libs/tasks/task.py @@ -11,8 +11,9 @@ """ import datetime + +from application.dao import LogDao from application.extensions.init_sqlalchemy import db -from application.libs.helper import MySQLHelper def task(): @@ -21,7 +22,14 @@ def task(): :return: None """ + # end_stamp = datetime.datetime.now() + # start_stamp = end_stamp - datetime.timedelta(days=1) + # start = start_stamp.strftime('%Y-%m-%d %H:%M:%S') + # end = end_stamp.strftime('%Y-%m-%d %H:%M:%S') + start = "2021-11-08 00:00:00" + end = "2021-11-09 00:00:00" with db.app.app_context(): + index = db.app.config.Elasticsearch.Index cfg = db.app.config.ExtraDB options = { "host": cfg.Host, @@ -30,5 +38,6 @@ def task(): "database": cfg.DB, "port": cfg.Port, } - with MySQLHelper(**options) as helper: - print(helper.execute(cfg.Sql)) + sql = cfg.Sql + LogDao.parse(start, end, index, sql, options) + diff --git a/application/models/log.py b/application/models/log.py index 192b880f65d07d0a3ebfd86114e81a99ab3a35d1..c5e6185c23d6081935a691799f9cbe73d9fa5ed8 100644 --- a/application/models/log.py +++ b/application/models/log.py @@ -12,8 +12,6 @@ import os -from datetime import datetime -from sqlalchemy import text from application.extensions.init_sqlalchemy import db @@ -68,22 +66,3 @@ class Log(db.Model): continue _dic[key] = value return _dic - - @classmethod - def batch_save(cls, objects): - with db.session.begin_nested(): - for obj in objects: - log = Log(**obj) - # 将字符串转换为 datetime 对象 - date_time_obj = datetime.strptime(log.date_time, '%Y-%m-%d %H:%M:%S') - partition_date = date_time_obj.strftime('%Y_%m') - partition_name = f'{log.__tablename__}_{partition_date}' - - db.session.execute(text(f""" - CREATE TABLE IF NOT EXISTS {partition_name} PARTITION OF {log.__tablename__} - FOR VALUES FROM ('{date_time_obj.strftime('%Y-%m-01')}') TO - ('{date_time_obj.strftime('%Y-%m-01')}'::date + interval '1 month'); - """)) - db.session.add(log) - - db.session.commit() diff --git a/application/utils/elasticsearch/elasticsearch.py b/application/utils/elasticsearch/elasticsearch.py index fa0fecf3a09d106109dbd43eeb7b2c2ba4475275..12ee7d857f235e1849ce226732a86883470015f6 100644 --- a/application/utils/elasticsearch/elasticsearch.py +++ b/application/utils/elasticsearch/elasticsearch.py @@ -17,7 +17,7 @@ from application.extensions.init_elasticsearch import es class ElasticsearchUtil: @classmethod - def dsl(cls, _start: str, _end: str, size=10000) -> dict: + def dsl(cls, _start: str, _end: str, size=5000) -> dict: """ Setting dsl