Djangoroidの奮闘記

python,django,angularJS1~三十路過ぎたプログラマーの奮闘記

Celery(redis) + Django

参照リンク

https://www.codingforentrepreneurs.com/projects/time-tasks/ https://www.codingforentrepreneurs.com/blog/celery-redis-django/ https://redis.io/ http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules

Download Redis

  • 以下のコマンドで、brew installして、startする。
$ brew install redis
$ brew services start redis
  • redisを開く
$ redis-cli ping
PONG
$ redis-server //これでredis-serverがスタートする?

Install Celery & Redis in Virtualenv

pip install celery
pip install redis
pip install django-celery-beat
pip install django-celery-results
pip freeze > requirements.txt
  • djangoのsettings.pyを修正する。
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django_celery_beat',
    'django_celery_results',
]

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE

Celery Module

  • celery.py を、settings.pyと同じフォルダ内に作成する。
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'red.settings')

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))
  • 以下のように自分のアプリに合わせて修正する。
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
"""ここの部分だけ修正した"""
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "kirr.settings")

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))
  • また、celery.pyがあるフォルダの、__init__.pyに以下のコードを追記する。
# cfehome/src/cfehome/__init__.py
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app  # noqa

Create a Django app

  • 適当に、python manage.py startapp billingなどで、アプリを作成する。

  • modelsは以下のように設定しておく。

from django.db import models

# Create your models here.

class BillingItem(models.Model):
    item_name = models.CharField(max_length=120)
    number_1 = models.IntegerField()
    number_2 = models.IntegerField()
    total = models.IntegerField()
    timestamp = models.DateTimeField(auto_now_add=True)

    def __unicode__(self):
        return "{total}".format(total=self.total)

    def __str__(self):
        return "{total}".format(total=self.total)
  • admin.pyを作成しておく。
from django.contrib import admin

# Register your models here.

from .models import BillingItem

admin.site.register(BillingItem)
  • billing/tasks.py ファイルを作成する。
from __future__ import absolute_import, unicode_literals
import random
from celery.decorators import task

@task(name="sum_two_numbers")
def add(x, y):
    return x + y

@task(name="multiply_two_numbers")
def mul(x, y):
    total = x * (y * random.randint(3, 100))
    return total

@task(name="sum_list_numbers")
def xsum(numbers):
    return sum(numbers)

Defer Tasks with Celery

  • 2つのターミナルを開いておく。2つとも、Virtualenvを起動させた状態にしておく。

  • 以下のコマンドで、celery を起動させる。

$ celery -A kirr<ここはsettings.pyがあるフォルダ名> worker -l info
// tasksとなっている箇所が、こちらで定義したtask
[tasks]
  . kirr.celery.debug_task
  . multiply_two_numbers
  . sum_list_numbers
  . sum_two_numbers
  • 別のターミナルでは、django shellを起動させて、挙動を確認してみる。
$ python manage.py shell
>>> from billing.tasks import add, mul, xsum
>>> add(123,30)
153
>>> mul(323,23)
66861
>>> xsum([3,2])
5
>>> add.delay(12,30)
<AsyncResult: cf403f0f-9167-48d4-9ca9-6777cdfb3122>
>>> mul.delay(323,23)
<AsyncResult: b87efba2-2787-44d0-96bc-dc9266bedcfc>
>>> xsum.delay([332,333])
<AsyncResult: f2b729c1-a0e2-48ff-b0bf-757388da52c1>
  • メソッド.delay(メソッドの引数)で設定すると、celeryの方に、答えが表示される!

  • さらに、この答えを、modelsに代入することが可能。tasks.pyを以下のように修正してみる。

from __future__ import absolute_import, unicode_literals
import random
from celery.decorators import task

from .models import BillingItem

@task(name="sum_two_numbers")
def add(x, y):
    return x + y

@task(name="multiply_two_numbers")
def mul(x, y):
    number_1 = x
    number_2 = (y * random.randint(3, 100))
    total = number_1 * number_2
    new_obj = BillingItem.objects.create(
            item_name='some item',
            number_1=number_1,
            number_2=number_2,
            total=total)
    return total

@task(name="sum_list_numbers")
def xsum(numbers):
    return sum(numbers)
  • 無事保存された!(^^)

Scheduled tasks

  • scheduleを設定する。celery.pyにコードを追記する。
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "kirr.settings")

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

from celery.schedules import crontab
app.conf.beat_schedule = {
    'add-every-minute-contrab': {
        'task': 'multiply_two_numbers', # taskの名前
        'schedule': crontab(), # crontabで、設定も可能。crontabは以下のサイト参照
        """
        http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules
        """
        'args': (16, 16),
    },
    'add-every-5-seconds': {
        'task': 'multiply_two_numbers',
        'schedule': 5.0,
        'args': (16, 16)
    },
    'add-every-30-seconds': {
        'task': 'tasks.add',# 直接taskのmoduleをimportして指定することも可能
        'schedule': 30.0,
        'args': (16, 16)
    },
}

@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))
  • scheduleされたtaskは以下のコマンドで、自動で実行される。beatを使う。
$ celery -A kirr beat -l info
  • お〜〜〜、自動で実行されてる。。。

  • さらに、このスケジュールは、admin画面から簡単に登録できる!すばらしい!

まとめ

  • celeryは、redisとセットで使うのがよさそう。
  • デプロイのときは、redisも一緒にデプロイして、redisのurlを指定する必要があり。
  • celeryには、beatとresultの2種類がある。
  • beatは、スケジュールされたtaskを順番に処理する。
  • resultは、実行を指示したときに、taskを順番に処理する