Celery(redis) + Django
参照リンク
https://www.codingforentrepreneurs.com/projects/time-tasks/ https://www.codingforentrepreneurs.com/blog/celery-redis-django/ https://redis.io/ http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules
Download Redis
- 以下のコマンドで、brew installして、startする。
$ brew install redis $ brew services start redis
- redisを開く
$ redis-cli ping PONG $ redis-server //これでredis-serverがスタートする?
Install Celery & Redis in Virtualenv
- install Celery redis
pip install celery pip install redis pip install django-celery-beat pip install django-celery-results pip freeze > requirements.txt
- djangoのsettings.pyを修正する。
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'django_celery_beat', 'django_celery_results', ] CELERY_BROKER_URL = 'redis://localhost:6379' CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = TIME_ZONE
Celery Module
- celery.py を、settings.pyと同じフォルダ内に作成する。
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'red.settings') app = Celery('proj') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
- 以下のように自分のアプリに合わせて修正する。
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. """ここの部分だけ修正した""" os.environ.setdefault("DJANGO_SETTINGS_MODULE", "kirr.settings") app = Celery('proj') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
- また、celery.pyがあるフォルダの、
__init__.py
に以下のコードを追記する。
# cfehome/src/cfehome/__init__.py from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa
Create a Django app
適当に、
python manage.py startapp billing
などで、アプリを作成する。modelsは以下のように設定しておく。
from django.db import models # Create your models here. class BillingItem(models.Model): item_name = models.CharField(max_length=120) number_1 = models.IntegerField() number_2 = models.IntegerField() total = models.IntegerField() timestamp = models.DateTimeField(auto_now_add=True) def __unicode__(self): return "{total}".format(total=self.total) def __str__(self): return "{total}".format(total=self.total)
- admin.pyを作成しておく。
from django.contrib import admin # Register your models here. from .models import BillingItem admin.site.register(BillingItem)
- billing/tasks.py ファイルを作成する。
from __future__ import absolute_import, unicode_literals import random from celery.decorators import task @task(name="sum_two_numbers") def add(x, y): return x + y @task(name="multiply_two_numbers") def mul(x, y): total = x * (y * random.randint(3, 100)) return total @task(name="sum_list_numbers") def xsum(numbers): return sum(numbers)
Defer Tasks with Celery
2つのターミナルを開いておく。2つとも、Virtualenvを起動させた状態にしておく。
以下のコマンドで、celery を起動させる。
$ celery -A kirr<ここはsettings.pyがあるフォルダ名> worker -l info
// tasksとなっている箇所が、こちらで定義したtask [tasks] . kirr.celery.debug_task . multiply_two_numbers . sum_list_numbers . sum_two_numbers
- 別のターミナルでは、django shellを起動させて、挙動を確認してみる。
$ python manage.py shell >>> from billing.tasks import add, mul, xsum >>> add(123,30) 153 >>> mul(323,23) 66861 >>> xsum([3,2]) 5 >>> add.delay(12,30) <AsyncResult: cf403f0f-9167-48d4-9ca9-6777cdfb3122> >>> mul.delay(323,23) <AsyncResult: b87efba2-2787-44d0-96bc-dc9266bedcfc> >>> xsum.delay([332,333]) <AsyncResult: f2b729c1-a0e2-48ff-b0bf-757388da52c1>
メソッド.delay(メソッドの引数)で設定すると、celeryの方に、答えが表示される!
さらに、この答えを、modelsに代入することが可能。tasks.pyを以下のように修正してみる。
from __future__ import absolute_import, unicode_literals import random from celery.decorators import task from .models import BillingItem @task(name="sum_two_numbers") def add(x, y): return x + y @task(name="multiply_two_numbers") def mul(x, y): number_1 = x number_2 = (y * random.randint(3, 100)) total = number_1 * number_2 new_obj = BillingItem.objects.create( item_name='some item', number_1=number_1, number_2=number_2, total=total) return total @task(name="sum_list_numbers") def xsum(numbers): return sum(numbers)
- 無事保存された!(^^)
Scheduled tasks
- scheduleを設定する。celery.pyにコードを追記する。
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "kirr.settings") app = Celery('proj') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() from celery.schedules import crontab app.conf.beat_schedule = { 'add-every-minute-contrab': { 'task': 'multiply_two_numbers', # taskの名前 'schedule': crontab(), # crontabで、設定も可能。crontabは以下のサイト参照 """ http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules """ 'args': (16, 16), }, 'add-every-5-seconds': { 'task': 'multiply_two_numbers', 'schedule': 5.0, 'args': (16, 16) }, 'add-every-30-seconds': { 'task': 'tasks.add',# 直接taskのmoduleをimportして指定することも可能 'schedule': 30.0, 'args': (16, 16) }, } @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
- scheduleされたtaskは以下のコマンドで、自動で実行される。beatを使う。
$ celery -A kirr beat -l info
お〜〜〜、自動で実行されてる。。。
さらに、このスケジュールは、admin画面から簡単に登録できる!すばらしい!