bugfix> python > 投稿

定期的なタスクを実行するために最低限の例が必要です(5分ごとに何らかの機能を実行するか、12:00:00に何かを実行するなど)。

私の myapp/tasks.py で 、 私が持っています、

from celery.task.schedules import crontab
from celery.decorators import periodic_task
from celery import task

@periodic_task(run_every=(crontab(hour="*", minute=1)), name="run_every_1_minutes", ignore_result=True)
def return_5():
    return 5

@task
def test():
    return "test"

セロリの労働者を実行すると、タスクが表示されます(以下を参照)が、値を返さない (ターミナルまたは花のいずれか)。

[tasks]
  . mathematica.core.tasks.test
  . run_every_1_minutes

必要な結果を得るために、最小限の例またはヒントを提供してください。

バックグラウンド:

私は config/celery.py を持っています次のものが含まれます。

import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.local")
app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

そして私の config/__init__.py で 、 私が持っています

from .celery import app as celery_app
__all__ = ['celery_app']

以下のような関数を myapp/tasks.py に追加しました

from celery import task
@task
def test():
    return "test"

test.delay() を実行するときシェルから、正常に実行され、タスク情報も花で表示されます