init_apscheduler.py 1.39 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/11/11 14:50
# @File        : init_apscheduler.py
崔为之's avatar
崔为之 committed
10
# @Description : Initializes the APScheduler extension for a Flask application.
崔为之's avatar
崔为之 committed
11 12 13
"""

from flask import Flask
崔为之's avatar
崔为之 committed
14
from apscheduler.triggers.interval import IntervalTrigger
崔为之's avatar
崔为之 committed
15 16

from application.libs.flask_apscheduler import APScheduler
崔为之's avatar
崔为之 committed
17
from application.libs.tasks import task
崔为之's avatar
崔为之 committed
18

崔为之's avatar
崔为之 committed
19
# Initialize APScheduler
崔为之's avatar
崔为之 committed
20 21 22 23 24
scheduler = APScheduler()


def init_tasks(app: Flask) -> None:
    """
崔为之's avatar
崔为之 committed
25 26 27 28 29 30 31
    Initializes the task extension for a Flask application.

    Args:
        app (Flask): The Flask application.

    Returns:
        None
崔为之's avatar
崔为之 committed
32

崔为之's avatar
崔为之 committed
33 34
    Raises:
        TypeError: If the provided argument is not a Flask application instance.
崔为之's avatar
崔为之 committed
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
    """
    if not isinstance(app, Flask):
        raise TypeError(f'Expected a Flask application instance, got {type(app).__name__}')

    scheduler.api_enabled = True
    scheduler.init_app(app)

    # Create an interval trigger
    interval = IntervalTrigger(
        days=1,
        start_date=app.config.Scheduler.Start,
        end_date=app.config.Scheduler.End,
        timezone=app.config.Scheduler.Timezone)

    # Add a job to the scheduler
    scheduler.add_job(func=task, trigger=interval, id='task_one')

    # Start the scheduler
    scheduler.start()