task.py 1.3 KB
Newer Older
崔为之's avatar
崔为之 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# @Version     : Python 3.11.4
# @Software    : Sublime Text 4
# @Author      : StudentCWZ
# @Email       : StudentCWZ@outlook.com
# @Date        : 2023/11/7 16:16
# @File        : task.py
# @Description :
"""


from celery import Celery
from celery.schedules import crontab
from flask import Flask

from application.utils import RedisUri


def make_celery(app: Flask) -> Celery:
    cfg = app.config.Redis
    uri = RedisUri(
        username=cfg.Username,
        password=cfg.Password,
        host=cfg.Host,
        port=int(cfg.Port),
        db=int(cfg.DB),
    )
    print(uri)

    celery = Celery(
        app.import_name,
        backend='redis://localhost:6379/0',
        broker='redis://localhost:6379/0'
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    @celery.task(base=ContextTask)
    def update_database():
        # Add your database operations here
        print(1232141241421)

    celery.conf.beat_schedule = {
        'update-database-every-day': {
            'task': 'update_database',
            'schedule': crontab(minute='*')  # Execute every minute
        }
    }

    return celery