Использование RabbitMQ в django проектах без Celery, и что нового в Celery 3.0

в 17:40, , рубрики: celery, django, python, RabbitMQ, метки: , , ,

Думаю что большинство python программистов уже в какой-то степени знакомы с возможностями Celery. В 1-ой части я расскажу, как можно использовать RabbitMQ без celery, а во второй части — краткий обзор новых возможностей celery 3.0.
Об установке связки Django-Celery-RabbitMQ можно почитать тут.
Про использование RabbitMQ хорошо написано тут, и тут, ну и на сайте RabbitMQ.

Коротко напомню установку и настройку:
RabbitMQ:
sudo apt-get install rabbitmq-server
Добавим пользователя:

$ rabbitmqctl add_user myuser mypassword
$ rabbitmqctl add_vhost '/'
$ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

Коротко настройки Celery, RabbitMQ:
in settings.py

import djcelery

os.environ["CELERY_LOADER"] = "django"
djcelery.setup_loader()
AMQP_HOST = 'localhost'
BROKER_HOST='localhost'
BROKER_PORT = 5672
BROKER_VHOST = "/"
BROKER_USER = "myuser"
BROKER_PASSWORD = "mypassword"

INSTALLED_APPS+='djcelery'

Утверждение: для того чтобы сделать одну небольшую задачу асинхронной вовсе не обязательно использовать celery. Вполне можно обойтись RabbitMQ.
Доказательство:
Начнём от противного:
Задача: проверить email на наличие письма от заданного отправителя, если письма нет, повторить проверку через минуту, если есть — пойти дальше ( распарсить его например...)
Используем poplib, email.
Напишем функцию, получающую email от наперёд заданного отправителя и обернём её декоратором task
Функция принимает email адрес, пароль и email адрес, от кого должно прийти письмо и возвращает статус (Ok, Error) и сообщение
in tasks.py

from celery.task import task, periodic_task
from celery.task.schedules import crontab

import poplib
import email

@task
def mail_content(user_mail, mail_pass, mail_from):
    mail_server = 'pop.'+user_mail.split('@')[1]
    mail_login = user_mail.split('@')[0]
    p = poplib.POP3(mail_server)
    print p.getwelcome()
    try:
        p.user(mail_login)
        p.pass_(mail_pass)
    except poplib.error_proto:
        return 'Error', 'Email is blocked'   
    try:
        print p.list()
    except:
        return 'Error', 'dont receive list of mails'
    numMessages = len(p.list()[1])
    print numMessages
    for i in range(numMessages):
        m = email.message_from_string("n".join(p.top(i+1, 1)[1]))
        try:
            this_email_from = m['From']
            if this_email_from.find(mail_from) >= 0:
                print this_email_from
                m = email.message_from_string('n'.join(p.retr(i+1)[1]))
                content = get_text_body(m)
                print content
                return 'Ok', content
            else:
                pass
        except Exception, e:
            return 'Error', unicode(e, 'utf8')
    raise mail_content.retry(exc=e, countdown=30)

Последняя строчка кода описывает перезапуск таска через 30 секунд, если письмо не было найдено.
Теперь мы можем запустить таск например так:

>>>res = mail_content.delay('user@domen', 'password', 'email_from@domen.email.from')

в этом случае выполнение начнётся немедленно, или так:

>>>res = mail_content.apply_async(('user@domen', 'password', 'email_from@domen.email.from'), countdown=30)

В этом случае выполнение начнется через 30 секунд.
(Предварительно нужно нужно запустить сервер celery:
python manage.py celeryd
и в другом окне запустить shell:
python manage.py shell,
А уже из шела вызывать эти команды)
Результат мы можем получить выполнив

>>>res.get()
(синхронно)
>>>res.info

(возвращает None, если нет ещё результата и результат, если он есть)
Но проверять есть ли результат не всегда удобно и всегда означает выполнение лишних действий.
Для вызова функции после выполнения задачи можно реализовать callback. Если у Вас установлена celery и вы можете функцию, принимающую результат сделать задачей (task), то можете перейти к следующему подразделу. Кто хочет обойтись без celery — способ организации callback на основе pika и rabbitMQ.
Для работы с AMQP установим пакет pika:

$ sudo pip install pika==0.9.5

Подробно Hello world с использованием этой библиотеки и RabbitMQ описано тут
in decorators.py:

import pika
import pickle
import time

importr settings

 def callback(function_to_decorate):
    user = settings.BROKER_USER
    broker_host = settings.BROKER_HOST
    password = settings.BROKER_PASSWORD
    credentials = pika.PlainCredentials(user, password)
    parameters = pika.ConnectionParameters(host=broker_host, credentials=credentials)
    def receiver(*args, **kw):
        (backend_function, data) = function_to_decorate(*args, **kw)
        pickled_obj = pickle.dumps(data)
        queue_name = str(time.time())
        print "call_backend", backend_function.__name__
        connection = pika.BlockingConnection(parameters)
        channel = connection.channel()
        channel.queue_declare( queue = queue_name)
        channel.basic_publish(exchange='', routing_key=queue_name, body=pickled_obj)
        channel.basic_consume( backend_function, queue=queue_name, no_ack = True)
        channel.queue_delete(queue=queue_name)
        connection.close()
    return receiver

Это декоратор, которым мы обернем функцию mail_content перед(!) оборачиванием декоратором @task
Декоратор возвращает нашу функцию mail_content с добавленными инструкциями отправки сообщения в rabbitmq
Не буду переписывать всю функцию в tasks.py, только что поменялось
in tasks.py:

from decorators import *
from tasks_backend import mail_analizer, mail_error

@task
@callback
def mail_content(...):
    ...
    if (...):
        ...
        return mail_analizer, (content,)
    return mail_error, ('error',)

Возвращаем первым аргументом функцию, вторым — список аргументов, которые хотим передать в функцию
in tasks_backend.py

import tasks
def mail_analizer(ch, method, properties, body):
    email_text = pickle.loads(body)
    if emai_text.find(u'Hello'):
        tasks.send_emails.delay(email_text)
    else:
        tasks.send_twitter_status.delay(email_text)

Приняли email, распознали его и запустили новые задачи.
Заметим, что аргументы не очень удобные, исправим это:
in decorators.py

def backend(function_to_decorate):
    def receive(ch, method, properties, body):
        data=pickle.loads(body)
        args = data
        function_to_decorate(*args)
    return receive

теперь можем переписать функцию mail_analizer так:

@backend
def mail_analizer(email_text):
     if emai_text.find(u'Hello'):
        tasks.send_emails.delay(email_text)
    else:
        tasks.send_twitter_status.delay(email_text)

Для запуска следующих функций используем декоратор

@callback

так-же как и в mail_content:

@backend
@callback
def mail_analizer(cont):
    print cont
    return send_twitter_status, (cont,)

Простой пример, построения цепочки функций с данным интерфейсом:

@callback
def first(*args):
    print first.__name__
    print args
    return senders, args
@backend
@callback
def senders(*args):
    print args
    return analizer, args
@backend
@callback
def analizer( *args):
    print args
    return ended_fun, args
@backend
def ended_fun(*args):
    print ended_fun.__name__
    print args

Первая функция обёрнута только декоратором

@callback

, т.к. она ничего не принимает из кролика, а последняя — только

@backend

— т.к. она ничего не передаёт.
Заметим, что функция может вызывать сама себя. Также заметим что функцию, которя обёрнута декоратором backend можно вызвать только из rabbitmq.
Для запуска используем функцию, которая обёрнута только callback.

@callback
def runer(*args):
    return test_func, (args)
@backend
@callback
def test_func( *args):
    print args
    return test_func, args

Окончательный вариант функций mail_content, email_analizer, run_email:

@backend
@call_backend
def mail_content(user_mail, mail_pass, mail_from):
    mail_server = 'pop.'+user_mail.split('@')[1]
    mail_login = user_mail.split('@')[0]
    p = poplib.POP3(mail_server)
    print p.getwelcome()
    try:
        p.user(mail_login)
        p.pass_(mail_pass)
    except poplib.error_proto:
        return mail_error, 'Email is blocked'
    try:
        print p.list()
    except:
        return mail_error, 'dont receive list of mails'
    numMessages = len(p.list()[1])
    print numMessages
    for i in range(numMessages):
        m = email.message_from_string("n".join(p.top(i+1, 1)[1]))
        try:
            this_from = m['From']
            this_from = this_from.decode('cp1251').split('<'.decode('cp1251'))[1]
            if this_from.find(mail_from) >= 0:
                print m['From']
                m = email.message_from_string('n'.join(p.retr(i+1)[1]))
                content = get_text_body(m)
                print content
                return email_analizer, (content, email_from)
            else:
                pass
        except Exception, e:
            return email_error, (unicode(e, 'utf8'),)
    return mail_content, (user_mail, mail_pass, mail_from)

@backend
@call_backend
def email_analizer(content, email_from):
    if content.find(u'Hello'):
        email_to = email_from
        text=u'Hello, my dear friend'
        return send_mail, (email_to, text)
    return send_twitter_status, (cont,)
    
@call_backend
def run_email():
    '''получаем из базы, например, email, password, email_from '''
    return  mail_content, (email, password, email_from)

Подитог:

я надеюсь, что не было ничего сложного. Можно использовать, вместо celery, если у Вас к примеру одна небольшая задача (task).

Как это можно сделать средствами celery 3.0

В celery 3.0 в задачу (task) можно передать имя задачи, которой нужно передать результат выполнения задачи
Пример из документации:

@celery.task
def add(x, y):
    return x + y
add.apply_async((2, 2), link=add.s(16))

где add — наша задача (task), add.s — подзадача( subtask), которая запускается после выполнения add(2, 2), первым аргументом в подзадачу приходит результат выполнения add(2, 2), вторым аргументом приходит 16. Итого получается (2+2)+16=20. Что такое subtask тут
Применительно к нашей задаче делаем из функции mail_analizer task, оставляем один аргумент — content, убираем декоратор @call_backend и вызываем так:

>>>mail_content.apply_async(mail_addres, mail_password, email_from, link=mail_analizer.s())
Также предусмотрена переменная link_error для случая, когда задача «рэйзит» ошибку.
Подробнее об этом тут
Помимо этого в celery 3.0 появилось:

Group

группа, принимает список задач, которые должны быть применены параллельно:
пример из документации:

>>> from celery import group
>>> res = group(add.s(i, i) for i in xrange(10))()
>>> res.get(timeout=1)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
или так
>>> g = group(add.s(i, i) for i in xrange(10))
>>> g.apply_async()

chain:

Цепочки вызовов
Теперь задачи можно вызывать цепочками, например:

>>> from celery import chain
@task
def mul(x,y):
    return x*y
@task
def div(x,y):
    return x/y
# (2 + 2) * 8 / 2
>>> res = chain(add.subtask((2, 2)),
                mul.subtask((8, )),
                div.subtask((2,))).apply_async()
>>> res.get() == 16

>>> res.parent.get() == 32

>>> res.parent.parent.get() == 4
короткая запись
>>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
16
</source
 <h5>immutable</h5>
Подзадачу можно определить как неизменяемую,  тогда эта подзадача будет вызываться только с определёнными при инициализации аргументами
<source lang="python">
>>> add.subtask((2, 2), immutable=True)
или
>>> add.si(2, 2)

chord

Аккорд:
принимает список задач, которые должны быть выполнены параллельно и задачу, которая принимает список результатов выполнения списка задач. Вот это загнул.

@task
def xsum(res_list):
    return sum(res_list)
>>> from celery import chord
>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
>>> res.get()
90

используя chain(group) получим chord:

>>> c3 = (group(add.s(i, i) for i in xrange(10) | xsum.s()))
>>> res = c3()
>>> res.get()
90

map

Как map(fun, [1,2,3])

res=task.map([1,2])
выполнит
res=[task(1), task(2)]

starmap

res=add.starmap([(1,2), (2,4)])
выполнит
res=[add(1,2), add(2,4)]

chuncs

Разбивает длинные списки аргументов на разные таски,

>>> from proj.tasks import add

>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

Подитог:

Celery 3.0 дает много очень удобных плюшек, которые использовать одно удовольствие, если их использовать.

Итог:

Celery предоставляет много удобных инструментов, но для небольших задач, где 90% этих функций не надо, вполне можно обойтись очередью сообщений (rabbit), таким образом избавиться от необходимости настраивать celery, уменьшить нагрузку на сервер, избавится от дополнительных зависимостей проекта.
Всем спасибо за внимание.

Автор: student100500

Источник

Поделиться

* - обязательные к заполнению поля