#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/11/7 16:16 # @File : task.py # @Description : """ from celery import Celery from celery.schedules import crontab from flask import Flask from application.utils import RedisUri def make_celery(app: Flask) -> Celery: cfg = app.config.Redis uri = RedisUri( username=cfg.Username, password=cfg.Password, host=cfg.Host, port=int(cfg.Port), db=int(cfg.DB), ) print(uri) celery = Celery( app.import_name, backend='redis://localhost:6379/0', broker='redis://localhost:6379/0' ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) @celery.task(base=ContextTask) def update_database(): # Add your database operations here print(1232141241421) celery.conf.beat_schedule = { 'update-database-every-day': { 'task': 'update_database', 'schedule': crontab(minute='*') # Execute every minute } } return celery