Django celery Peridoic Tasks

March 28, 2019 3 minutes

Preface

在最近自己做Django + Celery的项目的时候,有了定时任务的需求。查阅资料后发现Celery是有定时任务功能的,并且有个应用 dajngo-celery-beat 可以很好的满足Django和Celery定时任务的需求。

Celery

先简单介绍一下 Celery 。Celery 是一个基于Python开发的分布式异步消息队列,通过celery可以轻松的实现异步任务的处理。celery需要borker来进消息的发送和传递,需要backend将任务的执行结果存储起来。常用的Borker为RabbitMQ和Redis,当然也可以作为backend。

.. code:: python

from celery import Celery

app = Celery(‘hello’, broker='amqp://[email protected]//’, backend='amqp://[email protected]//')

@app.task def hello(): return ‘hello world’

这里仅仅做简单的介绍,具体的仅参考 官方文档 <http://docs.celeryproject.org/en/master/index.html#>__ 。

配置 Django + Celery

基础配置

pip install Django pip install celery pip install redis

注意:Celery仅仅支持Django1.8及以上版本,Django1.8以前的请使用Celery3.0。这里我使用的是Django1.11.2 Celery4.2.1 创建项目并建立 demo_celery 的APP后目录如下:


   demo_celery
   ├── admin.py
   ├── apps.py
   ├── __init__.py
   ├── migrations
   │   └── __init__.py
   ├── models.py
   ├── tests.py
   └── views.py
   demo
   ├── __init__.py
   ├── settings.py
   ├── urls.py
   └── wsgi.py
   manage.py
   templates
   venv

现在,我们有了一个名为 demo 的项目和名为 demo_celery_beat 的app(记得将该app添加到settings.py中)。接下来将celery配置进去。 在demo文件夹下创建 celery.py 文件,写入以下内容:


   from __future__ import absolute_import, unicode_literals
   import os
   from celery import Celery

   # set the default Django settings module for the 'celery' program.
   os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')

   app = Celery('demo')

   # Using a string here means the worker don't have to serialize
   # the configuration object to child processes.
   # - namespace='CELERY' means all celery-related configuration keys
   #   should have a `CELERY_` prefix.
   app.config_from_object('django.conf:settings', namespace='CELERY')

   # Load task modules from all registered Django app configs.
   app.autodiscover_tasks()


   @app.task(bind=True)
   def debug_task(self):
       print('Request: {0!r}'.format(self.request))

注意将项目名改为你自己的 然后在demo文件夹下的 __init__.py 中添加以下代码:


   from __future__ import absolute_import, unicode_literals

   # This will make sure the app is always imported when
   # Django starts so that shared_task will use this app.
   from .celery import app as celery_app

   __all__ = ['celery_app', ]

然后在 settings.py 中添加如下配置(这里我的borker和backend都用了redis,你可以根据自己的情况调整):


   CELERY_BROKER_URL = 'redis://:[email protected]:6379/0'

   CELERY_RESULT_BACKEND = 'redis://:[email protected]:6379/0'

   CELERY_RESULT_SERIALIZER = 'json'

之后在 demo_celery 文件夹下创建 tasks.py


   from celery import shared_task
   import time


   @shared_task
   def test():
       print "hello"
       time.sleep(2)
       return "done"

然后在views.py中调用该任务:


   # -*- coding: utf-8 -*-
   from __future__ import unicode_literals

   from django.shortcuts import render
   from django.http import HttpResponse
   from .tasks import test

   # Create your views here.

   def index(request):
       test.delay()
       return HttpResponse("OK")

之后就是打通路由,能使用URL访问到该view就OK了。 一切配置完成后,我们先测试一下。 先后执行: 1、 python manage.py runserver 2、 celery -A demo worker -l info

访问 http://127.0.0.1:8000/demo

配置django-celery-beat

上一步实现了Djnago+Celery异步任务的实现,现在终于到了我们想要说的定时任务了。 pip install django-celery-beatsettings.py 中添加如下代码:



   INSTALLED_APPS = [
       'django.contrib.admin',
       'django.contrib.auth',
       'django.contrib.contenttypes',
       'django.contrib.sessions',
       'django.contrib.messages',
       'django.contrib.staticfiles',
       'demo_celery',
       "django_celery_beat" # 添加django_celery_beat app
   ]
   ....
   TIME_ZONE = 'Asia/Shanghai'

   CELERY_ENABLE_UTC = False # 禁止使用UTC时间

   CELERY_TIMEZONE = "Asia/Shanghai" # 设置时区

   CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

   CELERYD_MAX_TASKS_PER_CHILD = 3 # 每个worker最大执行数,长时间执行造成内存泄露

其实到这一步,我们已经可以通过Django自带的后台来创建定时任务了。 python manage.py migrate python manage.py createsuperuser python manage.py runserver celery -A demo worker -l info celery -A demo beat -l info -S django

启动服务,登录后台 点击 Periodic tasks 创建定时任务: http://127.0.0.1:8000/admin/django_celery_beat/periodictask/add/

可以看到,已经可以定时执行了。但是到现在还远没有结束,我们需要在自己的代码里去实现,虽然Django的后台挺好用的。通过阅读文档和查看源码,发现django-celery-beat 主要由以下五个Moudle组成

  • PeriodicTask 这个moudle定义了一个单独的任务,必须和 IntervalScheduleCrontabScheduleSolarSchedule 等计划相关联。

  • PeriodicTasks 这个moudle仅仅用作索引来跟踪计划的改变,从而告知服务器重新加载计划。

  • IntervalSchedule 定义一个任务的执行频率,每隔(every) DAYS, HOURS, MINUTES, SECONDS, MICROSECONDS 去执行一次任务。

  • CrontabSchedule 和Linux中的计划任务相同,可以指定特定的时间去执行任务。

  • SolarSchedule 如果你想任务依据日出、日落、黄昏、黎明时间来执行,你可以使用 solar 调度器类型。

Example

一个每隔10秒运行一次的任务


   from django_celery_beat.models import PeriodicTasks, PeriodicTask, IntervalSchedule
   # Create your views here.

   def index(request):

       # 创建Interval对象
       schedule, created = IntervalSchedule.objects.get_or_create(every=10, period=IntervalSchedule.SECONDS)
       # 创建任务
       PeriodicTask.objects.create(interval=schedule, name="demo2", task="demo_celery.tasks.test")

       return HttpResponse("OK")

这是一个非常基本的一个任务,还可以加很多参数,如下一个例子

需要传入参数, 创建任务后,2秒钟运行,每隔10秒执行一次,一次性任务

任务修改为:


   from celery import shared_task
   import time

   @shared_task
   def test(name):
       print "hello" + name
       time.sleep(2)
       return "done"

view中的代码:


   # -*- coding: utf-8 -*-
   from __future__ import unicode_literals

   from django.shortcuts import render
   from django.http import HttpResponse
   from .tasks import test
   from django_celery_beat.models import PeriodicTasks, PeriodicTask, IntervalSchedule
   from datetime import datetime, timedelta
   # Create your views here.


   def index(request):

       # 创建Interval对象
       schedule, created = IntervalSchedule.objects.get_or_create(every=10, period=IntervalSchedule.SECONDS)
       # 创建任务
       PeriodicTask.objects.create(interval=schedule, name="demo3", task="demo_celery.tasks.test", one_off=True, args=[],
                                   kwargs='{"name":"world"}', start_time=(datetime.now() + timedelta(seconds=2)))

       return HttpResponse("OK")

创建一个cron任务 每周每天每小时的第1分钟执行一次


   # -*- coding: utf-8 -*-
   from __future__ import unicode_literals

   from django.shortcuts import render
   from django.http import HttpResponse
   from .tasks import test
   from django_celery_beat.models import PeriodicTasks, PeriodicTask, CrontabSchedule
   from datetime import datetime, timedelta
   # Create your views here.


   def index(request):

       # 创建cron对象
       crontab, created = CrontabSchedule.objects.get_or_create(minute=1, timezone="Asia/Shanghai")
       # 创建任务
       PeriodicTask.objects.create(crontab=crontab, name="demo5", task="demo_celery.tasks.test", one_off=False, args=[],
                                   kwargs='{"name":"world"}', start_time=datetime.now())

       return HttpResponse("OK")

Note

官方文档里给出的警告,大概意思就是,如果你Djnago项目更改了时区,你的定时任务还会使用的是以前的时区,你可以使用如下方法改变。

但是我,按照官方给出的代码老是报错,看了源码才知道,该方法需要传入一个对象,并且还要 no_change 这个选项, PeriodicTask 里刚好有这个,于是就都明白了。改为如下代码:


   PeriodicTask.objects.all().update(last_run_at=None)
   periodic = PeriodicTask.objects.all()
   for p in periodic:
       PeriodicTasks.changed(p)

到此为止,django-celery-beat 的简单介绍就这样了,如果你想看更多的内容建议阅读以下官方文档。

Reference

[1] Periodic Tasks — Celery 4.3.0rc2 documentation

[2] [First steps with Django — Celery 4.3.0rc2 documentation](<http://docs.celeryproject.org/en/master/django/first-steps-with-django.html)

[3] django_celery_beat 1.4.0 documentation