Commit 1dfe4c1d authored by 崔为之's avatar 崔为之 💪🏽

Update project

parent d2952806
...@@ -24,7 +24,7 @@ def runserver(): ...@@ -24,7 +24,7 @@ def runserver():
# Run the Flask development server # Run the Flask development server
# The run() method starts the application's development server. # The run() method starts the application's development server.
# It's not meant to be used on production environment. # It's not meant to be used on production environment.
app.run(debug=True) app.run(debug=True, use_reloader=False)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -33,7 +33,7 @@ Logger: ...@@ -33,7 +33,7 @@ Logger:
Retention: Retention:
Scheduler: Scheduler:
Start: 2023-11-12 12:08:00 Start: 2023-11-20 18:50:00
End: 2099-11-11 16:00:00 End: 2099-11-11 16:00:00
Timezone: Asia/Shanghai Timezone: Asia/Shanghai
......
...@@ -10,12 +10,18 @@ ...@@ -10,12 +10,18 @@
# @Description : # @Description :
""" """
from datetime import datetime
from typing import Generator
from loguru import logger from loguru import logger
from sqlalchemy import text
from application.libs.helper import MySQLHelper from application.libs.helper import MySQLHelper
from application.models import Log from application.models import Log
from application.utils import ElasticsearchUtil, ParseUtil from application.utils import ElasticsearchUtil, ParseUtil
from application.extensions.init_sqlalchemy import db
created_partitions = set() # Now it's a global variable
class LogDao: class LogDao:
...@@ -85,7 +91,28 @@ class LogDao: ...@@ -85,7 +91,28 @@ class LogDao:
return mapping_list return mapping_list
@classmethod @classmethod
def process_and_save_data(cls, lst: list, mapping_list: list): def batch_save(cls, objects: Generator) -> None:
global created_partitions # Reference the global variable
for obj in objects:
log = Log(**obj)
# Convert string to datetime object
date_time_obj = datetime.strptime(log.date_time, '%Y-%m-%d %H:%M:%S')
partition_date = date_time_obj.strftime('%Y_%m')
partition_name = f'{log.__tablename__}_{partition_date}'
if partition_name not in created_partitions:
db.session.execute(text(f"""
CREATE TABLE IF NOT EXISTS {partition_name} PARTITION OF {log.__tablename__}
FOR VALUES FROM ('{date_time_obj.strftime('%Y-%m-01')}') TO
('{date_time_obj.strftime('%Y-%m-01')}'::date + interval '1 month');
"""))
created_partitions.add(partition_name) # Remember that this partition has been created
db.session.add(log)
db.session.commit()
@classmethod
def process_and_save_data(cls, lst: list, mapping_list: list) -> None:
""" """
Process the given list using the mapping list and save the result to the database. Process the given list using the mapping list and save the result to the database.
...@@ -96,7 +123,8 @@ class LogDao: ...@@ -96,7 +123,8 @@ class LogDao:
if not lst: if not lst:
return return
result_generator = ParseUtil(mapping_list=mapping_list).filter(lst) result_generator = ParseUtil(mapping_list=mapping_list).filter(lst)
_ = Log.batch_save(result_generator) # 批量保存数据
cls.batch_save(result_generator)
@classmethod @classmethod
def parse(cls, start: str, end: str, index: str, sql: str, options: dict) -> int: def parse(cls, start: str, end: str, index: str, sql: str, options: dict) -> int:
...@@ -143,7 +171,7 @@ class LogDao: ...@@ -143,7 +171,7 @@ class LogDao:
scroll_id = data.get('_scroll_id') scroll_id = data.get('_scroll_id')
try: try:
for _ in range(0, int(total / dsl.get('size', 10000) + 1)): for _ in range(0, int(total / dsl.get('size', 5000) + 1)):
# Get more data from Elasticsearch using scroll searching. # Get more data from Elasticsearch using scroll searching.
res = cls.get_data_from_es(sid=scroll_id) res = cls.get_data_from_es(sid=scroll_id)
lst = res.get('hits').get('hits') lst = res.get('hits').get('hits')
......
...@@ -12,26 +12,21 @@ ...@@ -12,26 +12,21 @@
from flask import Flask from flask import Flask
from .init_apispec import init_apispec
from .init_apscheduler import init_tasks
from .init_config import init_config from .init_config import init_config
from .init_cors import init_cors from .init_cors import init_cors
from .init_elasticsearch import init_elasticsearch
from .init_logger import init_logger from .init_logger import init_logger
from .init_sqlalchemy import init_database from .init_sqlalchemy import init_database
from .init_bcrypt import init_bcrypt
from .init_migrate import init_migrate
from .init_apispec import init_apispec
from .init_marshmallow import init_marshmallow
from .init_elasticsearch import init_elasticsearch
from .init_apscheduler import init_tasks
def init_plugs(app: Flask) -> None: def init_plugs(app: Flask) -> None:
init_config(app) init_config(app)
init_logger(app) init_logger(app)
init_database(app) init_database(app)
init_bcrypt(app)
init_migrate(app)
init_apispec(app) init_apispec(app)
init_marshmallow(app)
init_elasticsearch(app) init_elasticsearch(app)
init_cors(app) init_cors(app)
init_tasks(app) init_tasks(app)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/10/28 22:06
# @File : init_bcrypt.py
# @Description :
"""
from flask import Flask
from flask_bcrypt import Bcrypt
bcrypt = Bcrypt()
def init_bcrypt(app: Flask) -> None:
"""
Initialize the bcrypt extension
:param app: flask.Flask application instance
:return: None
"""
bcrypt.init_app(app)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/10/29 12:36
# @File : init_marshmallow.py
# @Description :
"""
from flask import Flask
from flask_marshmallow import Marshmallow
ma = Marshmallow()
def init_marshmallow(app: Flask) -> None:
"""
Initialize the database extension
:param app: flask.Flask application instance
:return: None
"""
ma.init_app(app)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version : Python 3.11.4
# @Software : Sublime Text 4
# @Author : StudentCWZ
# @Email : StudentCWZ@outlook.com
# @Date : 2023/10/29 01:19
# @File : init_migrate.py
# @Description :
"""
from flask import Flask
from flask_migrate import Migrate
from application.extensions.init_sqlalchemy import db
migrate = Migrate()
def init_migrate(app: Flask) -> None:
"""
Initialize the database extension
:param app: flask.Flask application instance
:return: None
"""
migrate.init_app(app, db)
...@@ -62,7 +62,6 @@ class FlaskElasticsearch(BaseFlaskElasticsearch): ...@@ -62,7 +62,6 @@ class FlaskElasticsearch(BaseFlaskElasticsearch):
# Retry connection on failure # Retry connection on failure
for i in range(5): # Retry up to 5 times for i in range(5): # Retry up to 5 times
if ctx.elasticsearch.ping(): if ctx.elasticsearch.ping():
logger.info('Connected to Elasticsearch')
break break
else: else:
logger.warning(f'Attempt {i + 1} to connect to Elasticsearch failed. Retrying...') logger.warning(f'Attempt {i + 1} to connect to Elasticsearch failed. Retrying...')
......
...@@ -11,8 +11,9 @@ ...@@ -11,8 +11,9 @@
""" """
import datetime import datetime
from application.dao import LogDao
from application.extensions.init_sqlalchemy import db from application.extensions.init_sqlalchemy import db
from application.libs.helper import MySQLHelper
def task(): def task():
...@@ -21,7 +22,14 @@ def task(): ...@@ -21,7 +22,14 @@ def task():
:return: None :return: None
""" """
# end_stamp = datetime.datetime.now()
# start_stamp = end_stamp - datetime.timedelta(days=1)
# start = start_stamp.strftime('%Y-%m-%d %H:%M:%S')
# end = end_stamp.strftime('%Y-%m-%d %H:%M:%S')
start = "2021-11-08 00:00:00"
end = "2021-11-09 00:00:00"
with db.app.app_context(): with db.app.app_context():
index = db.app.config.Elasticsearch.Index
cfg = db.app.config.ExtraDB cfg = db.app.config.ExtraDB
options = { options = {
"host": cfg.Host, "host": cfg.Host,
...@@ -30,5 +38,6 @@ def task(): ...@@ -30,5 +38,6 @@ def task():
"database": cfg.DB, "database": cfg.DB,
"port": cfg.Port, "port": cfg.Port,
} }
with MySQLHelper(**options) as helper: sql = cfg.Sql
print(helper.execute(cfg.Sql)) LogDao.parse(start, end, index, sql, options)
...@@ -12,8 +12,6 @@ ...@@ -12,8 +12,6 @@
import os import os
from datetime import datetime
from sqlalchemy import text
from application.extensions.init_sqlalchemy import db from application.extensions.init_sqlalchemy import db
...@@ -68,22 +66,3 @@ class Log(db.Model): ...@@ -68,22 +66,3 @@ class Log(db.Model):
continue continue
_dic[key] = value _dic[key] = value
return _dic return _dic
@classmethod
def batch_save(cls, objects):
with db.session.begin_nested():
for obj in objects:
log = Log(**obj)
# 将字符串转换为 datetime 对象
date_time_obj = datetime.strptime(log.date_time, '%Y-%m-%d %H:%M:%S')
partition_date = date_time_obj.strftime('%Y_%m')
partition_name = f'{log.__tablename__}_{partition_date}'
db.session.execute(text(f"""
CREATE TABLE IF NOT EXISTS {partition_name} PARTITION OF {log.__tablename__}
FOR VALUES FROM ('{date_time_obj.strftime('%Y-%m-01')}') TO
('{date_time_obj.strftime('%Y-%m-01')}'::date + interval '1 month');
"""))
db.session.add(log)
db.session.commit()
...@@ -17,7 +17,7 @@ from application.extensions.init_elasticsearch import es ...@@ -17,7 +17,7 @@ from application.extensions.init_elasticsearch import es
class ElasticsearchUtil: class ElasticsearchUtil:
@classmethod @classmethod
def dsl(cls, _start: str, _end: str, size=10000) -> dict: def dsl(cls, _start: str, _end: str, size=5000) -> dict:
""" """
Setting dsl Setting dsl
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment