init_celery.py 1.21 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/11/7 16:10
# @File        : init_celery.py
# @Description :
"""


from flask import Flask
from celery import Celery
from celery.schedules import crontab


def init_celery(app: Flask) -> None:
    """
    Initialize Celery for the Flask application.

    :param app: The Flask application.
    :type app: Flask
    """
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask

    @celery.task()
    def update_database():
        # Add your database operations here
        pass

    celery.conf.beat_schedule = {
        'update-database-every-day': {
            'task': 'update_database',
            'schedule': crontab(hour=0, minute=0)  # Execute daily at midnight
        }
    }

    app.celery = celery