Как просто написать распределенный веб-сервис на Python + AMQP

в 11:15, , рубрики: AMQP, python, RabbitMQ, Программирование

Привет. Я уже довольно давно пишу на Python. Недавно пришлось разбираться с RabbitMQ. Мне понравилось. Потому что он без всяких проблем (понятно, что с некоторыми тонкостями) собирается в кластер. Тут я подумал: а неплохо бы его использовать в качестве очереди сообщений в кусочке API проекта, над которым я работаю. Сам API написан на tornado, основная мысль была в исключении блокирующего кода из API. Все синхронные операции выполнялись в пуле тредов.

Первое, что я решил, это сделать отдельный процесс(ы) «worker», который бы брал на себя всю синхронную работу. Задумал, чтобы «worker» был максимально прост, и делал задачи из очереди одну за другой. Скажем, выбрал из базы что-нибудь, ответил, взял на себя следующую задачу и так далее. Самих «worker»ов можно запустить много и тогда AMQP выступает уже в роли некоего подобия IPC.

Спустя некоторое время из этого вырос модуль, который берет на себя всю рутину связанную с AMQP и передачей сообщений туда и назад, а также сжимает их gzipом, если данных слишком много. Так родился crew. Собственно, используя его, мы с вами напишем простой API, который будет состоять из сервера на tornado и простых и незамысловатых «worker» процессов. Забегая вперед скажу, что весь код доступен на github, а то, о чем я буду рассказывать дальше, собрано в папке example.

Подготовка

Итак, давайте разберемся по порядку. Первое, что нам нужно будет сделать — это установить RabbitMQ. Как это делать я описывать не буду. Скажу лишь то, что на той-же убунте он ставится и работает из коробки. У меня на маке единственное, что пришлось сделать, это поставить LaunchRocket, который собрал все сервисы, что были установлены через homebrew и вывел в GUI:

LaunchRocket

Дальше создадим наш проект virtualenv и установим сам модуль через pip:

mkdir -p api
cd api
virtualenv env
source env/bin/activate
pip install crew tornado

В зависимостях модуля умышленно не указан tornado, так как на хосте с workerом его может и не быть. А на веб-части обычно создают requirements.txt, где указаны все остальные зависимости.

Код я буду писать частями, чтобы не нарушать порядок повествования. То, что у нас получится в итоге, можно посмотреть тут.

Пишем код

Сам tornado сервер состоит из двух частей. В первой части мы определяем обработчики запросов handlers, а во второй запускается event-loop. Давайте напишем сервер и создадим наш первый метод api.

Файл master.py:

# encoding: utf-8

import tornado.ioloop
import tornado.gen
import tornado.web
import tornado.options


class MainHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        # Вызываем задачу test c приоритетом 100
        resp = yield self.application.crew.call('test', priority=100)
        self.write("{0}: {1}".format(type(resp).__name__, str(resp)))


application = tornado.web.Application(
    [
        ('/', MainHandler),
    ],
    autoreload=True,
    debug=True,
)


if __name__ == "__main__":
    tornado.options.parse_command_line()
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Благодаря coroutine в торнадо, код выглядит просто. Можно написать тоже самое без coroutine.

Файл master.py:

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        # Вызываем задачу test c приоритетом 100
        self.application.crew.call('test', priority=100, callback=self._on_response)

    def _on_response(resp, headers):
        self.write("{0}: {1}".format(type(resp).__name__, str(resp)))

Наш сервер готов. Но если мы его запустим, и сходим на /, то не дождемся ответа, его некому обрабатывать.

Теперь напишем простой worker:

Файл worker.py:

# encoding: utf-8

from crew.worker import run, context, Task

@Task('test')
def long_task(req):
    context.settings.counter += 1
    return 'Wake up Neo.n'

run(
    counter=0,      # This is a part of this worker context
)

Итак, как видно в коде, есть простая функция, обернутая декоратором Task(«test»), где test — это уникальный идентификатор задачи. В вашем worker не может быть двух задач с одинаковыми идентификаторами. Конечно, правильно было бы назвать задачу «crew.example.test» (так обычно и называю в продакшн среде), но для нашего примера достаточно просто «test».

Сразу бросается в глаза context.settings.counter. Это некий контекст, который инициализируется в worker процессе при вызове функции run. Также в контексте уже есть context.headers — это заголовки ответа для отделения метаданных от ответа. В примере с callback-функцией именно этот словарь передается в _on_response.

Заголовки сбрасываются после каждого ответа, а вот context.settings — нет. Я использую context.settings для передачи в функции worker(ы) соединения с базой данных и вообще любого другого объекта.

Также worker обрабатывает ключи запуска, их не много:

$ python worker.py --help
Usage: worker.py [options]

Options:
  -h, --help            show this help message and exit
  -v, --verbose         make lots of noise
  --logging=LOGGING     Logging level
  -H HOST, --host=HOST  RabbitMQ host
  -P PORT, --port=PORT  RabbitMQ port

URL подключения к базе и прочие переменные можно брать из переменный окружения. Поэтому worker в параметрах ждет только как ему соединиться c AMQP (хост и порт) и уровень логирования.

Итак, запускаем все и проверяем:

$ python master.py & python worker.py

image

Работает, но что случилось за ширмой?

При запуске tornado-сервера tornado подключился к RabbitMQ, создал Exchange DLX и начал слушать очередь DLX. Это Dead-Letter-Exchange — специальная очередь, в которую попадают задачи, которые не взял ни один worker за определенный timeout. Также он создал очередь с уникальным идентификатором, куда будут поступать ответы от workerов.

После запуска worker создал по очереди на каждую обернутую декоратором Task очередь и подписался на них. При поступлении задачи воркер main-loop создает один поток, контролируя в главном потоке время исполнения задачи и выполняет обернутую функцию. После return из обернутой функции сериализует его и ставит в очередь ответов сервера.

После поступления запроса tornado-сервер cтавит задачу в соответствующую очередь, указывая при этом идентификатор своей уникальной очереди, в которую должен поступить ответ. Если ни один воркер не взял задачу, тогда RabbitMQ перенаправляет задачу в exchange DLX и tornado-сервер получает сообщение о том, что истек таймаут пребывания очереди, генерируя исключение.

Зависшая задача

Чтобы продемонстрировать, как работает механизм завершения задач, которые повисли в процессе выполнения, напишем еще один веб-метод и задачу в worker.

В файл master.py добавим:

class FastHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        try:
            resp = yield self.application.crew.call(
                'dead', persistent=False, priority=255, expiration=3,
            )
            self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
        except TimeoutError:
            self.write('Timeout')
        except ExpirationError:
            self.write('All workers are gone')

И добавим его в список хендлеров:

application = tornado.web.Application(
    [
        (r"/", MainHandler),
        (r"/stat", StatHandler),
    ],
    autoreload=True,
    debug=True,
)

А в worker.py:

@Task('dead')
def infinite_loop_task(req):
    while True:
        pass

Как видно из приведенного выше примера, задача уйдет в бесконечный цикл. Однако, если задача не выполнится за 3 секунды (считая время получения из очереди), main-loop в воркере пошлет потоку исключение SystemExit. И да, вам придется обработать его.

Контекст

Как уже упоминалось выше, контекст — это такой специальный объект, который импортируется и имеет несколько встроенных переменных.

Давайте сделаем простую статистику по ответам нашего worker.

В файл master.py добавим следующий handler:

class StatHandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def get(self):
        resp = yield self.application.crew.call('stat', persistent=False, priority=0)
        self.write("{0}: {1}".format(type(resp).__name__, str(resp)))

Также зарегистрируем в списке обработчиков запросов:

application = tornado.web.Application(
    [
        (r"/", MainHandler),
        (r"/fast", FastHandler),
        (r"/stat", StatHandler),
    ],
    autoreload=True,
    debug=True,
)

Этот handler не очень отличается от предыдущих, просто возвращает значение, которое ему передал worker.

Теперь сама задача.

В файл worker.py добавим:

@Task('stat')
def get_counter(req):
    context.settings.counter += 1
    return 'I'm worker "%s". And I serve %s tasks' % (context.settings.uuid, context.settings.counter)

Функция возвращает строку, с информацией о количестве задач, обработанных workerом.

PubSub и Long polling

Теперь реализуем пару обработчиков. Один при запросе будет просто висеть и ждать, а второй будет принимать POST данные. После передачи последних первый будет их отдавать.

master.py:

class LongPoolingHandler(tornado.web.RequestHandler):
    LISTENERS = []

    @tornado.web.asynchronous
    def get(self):
        self.LISTENERS.append(self.response)

    def response(self, data):
        self.finish(str(data))

    @classmethod
    def responder(cls, data):
        for cb in cls.LISTENERS:
            cb(data)

        cls.LISTENERS = []

class PublishHandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def post(self, *args, **kwargs):
        resp = yield self.application.crew.call('publish', self.request.body)
        self.finish(str(resp))

...

application = tornado.web.Application(
    [
        (r"/", MainHandler),
        (r"/stat", StatHandler),
        (r"/fast", FastHandler),
        (r'/subscribe', LongPoolingHandler),
        (r'/publish', PublishHandler),
    ],
    autoreload=True,
    debug=True,
)

application.crew = Client()
application.crew.subscribe('test', LongPoolingHandler.responder)

if __name__ == "__main__":
    application.crew.connect()
    tornado.options.parse_command_line()
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Напишем задачу publish.

worker.py:

@Task('publish')
def publish(req):
    context.pubsub.publish('test', req)

Если же вам не нужно передавать управление в worker, можно просто публиковать прямо из tornado-сервера

class PublishHandler2(tornado.web.RequestHandler):

    def post(self, *args, **kwargs):
        self.application.crew.publish('test', self.request.body)

Параллельное выполнение заданий

Часто бывает ситуация, когда мы можем выполнить несколько заданий параллельно. В crew есть для этого небольшой синтаксический сахар:

class Multitaskhandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        with self.application.crew.parallel() as mc:
            # mc - multiple calls
            mc.call('test')
            mc.call('stat')
            test_result, stat_result = yield mc.result()
            self.set_header('Content-Type', 'text/plain')
            self.write("Test result: {0}nStat result: {1}".format(test_result, stat_result))

В этом случае, задаче будут поставлены две задачи параллельно и выход из with будет произведен по окончании последней.

Но нужно быть осторожным, так как какая-то задача может вызвать исключение. Оно будет приравнено непосредственно переменной. Таким образом, вам нужно проверить, не является ли test_result и stat_result экземплярами класса Exception.

Планы на будущее

Когда eigrad предложил написать прослойку, которой можно запустить любое wsgi приложение с помощью crew, мне эта идея сразу понравилась. Только представьте, запросы хлынут не на ваше wsgi приложение, а будут равномерно поступать через очередь на wsgi-worker.

Я никогда не писал wsgi сервер и даже не знаю, с чего начать. Но вы можете мне помочь, pull-requestы я принимаю.

Также думаю дописать client для еще одного популярного асинхронного фреймворка, для twisted. Но пока разбираюсь с ним, да и свободного времени не хватает.

Благодарности

Спасибо разработчикам RabbitMQ и AMQP. Замечательные идеи.

Также спасибо вам, читатели. Надеюсь, что вы не зря потратили время.

Автор: orlovdl

Источник


  1. vlch:

    Спасибо автору! Подробная статья и большое количество грамотного кода, то что нужно

  2. vlch:

    Спасибо автору! Не так много подробных статей по tornado с грамотным нетривиальным кодом

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js