#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ # @Version : Python 3.11.4 # @Software : Sublime Text 4 # @Author : StudentCWZ # @Email : StudentCWZ@outlook.com # @Date : 2023/11/7 16:10 # @File : init_celery.py # @Description : """ from flask import Flask from celery import Celery from celery.schedules import crontab def init_celery(app: Flask) -> None: """ Initialize Celery for the Flask application. :param app: The Flask application. :type app: Flask """ celery = Celery( app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask @celery.task() def update_database(): # Add your database operations here pass celery.conf.beat_schedule = { 'update-database-every-day': { 'task': 'update_database', 'schedule': crontab(hour=0, minute=0) # Execute daily at midnight } } app.celery = celery