- PVSM.RU - https://www.pvsm.ru -

RabbitMQ tutorial 3 — Публикация/Подписка

Хочу продолжить серию [1] перевода уроков с официального сайта [2]. Примеры будут на php, но их можно реализовать на большинстве популярных ЯП [3].

Публикация/Подписка

В предыдущей статье было рассмотрено создание рабочей очереди сообщений. Было сделано допущение, что каждое сообщение будет направлено одному обработчику(worker). В этой статье усложним задачу – отправим сообщение нескольким подписчикам. Этот паттерн известен как "publish/subscribe [4]" (публикация/подписка).
Чтобы понять этот шаблон, создадим простую систему логирования. Она будет состоять из двух программ – первая будет создавать логи, вторая считывать и печатать их.
В нашей систему логирования каждая программа подписчик будет получать каждое сообщение. Благодаря этому, мы сможем запустить одного подписчика на сохранение логов на диск, а потом в любое время сможем создать другого подписчика для отображения логов на экран.
По существу, каждое сообщение будет транслироваться каждому подписчику.

Точки обмена(exchanges)

В предыдущих статьях для отправки и принятия сообщений мы работали с очередью. Теперь рассмотрим расширенную модель отправки сообщений Rabbit.
Напомним термины предыдущей статьи:

  • Producer (поставщик) ‒ программа, отправляющая сообщения
  • Queue (очередь) – буффер, хранящий сообщение
  • Consumer (подписчик) ‒ программа, принимающая сообщения.

Основная идея в модели отправки сообщений Rabbit – Поставщик(producer) никогда не отправляет сообщения напрямую в очередь. Фактически, довольно часто поставщик не знает, дошло ли его сообщение до конкретной очереди.
Вместо этого поставщик отправляет сообщение в точку доступа. В точке доступа нет ничего сложного. Точка доступа выполняет две функции:
— получает сообщения от поставщика
— отправляет эти сообщения в очередь.
Точка доступа точно знает, что делать с поступившими сообщениями. Отправить сообщение в конкретную очередь, либо в несколько очередей, либо не отправлять никому и удалить его. Эти правила описываются в типе точки доступа (exchange type).

image

Существуют несколько типов: direct, topic, headers и fanout. Мы остановимся на последнем типе fanout. Создадим точку с доступа с этим типом и назовем её – logs:

$channel->exchange_declare('logs', 'fanout', false, false, false);

Тип fanout – очень прост. Он копирует все сообщения которые поступают к нему во все очереди, которые ему доступны. Это то что нам нужно для нашей системы логирования.

Просмотр списка точек доступа:

Чтобы посмотреть все точки доступа на сервере, необходимо выполнить команду rabbitmqctl:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

Мы видим список точек доступа с наименованием amq.* и точку доступа без имени, которая используется по умолчанию (она не подходит для выполнения нашей задачи).

Наименование точек доступа.

В предыдущих статьях мы ничего не знали о точках доступа, но всё-таки могли отправлять письма в очередь. Это было возможно, потому что использовали точку доступа по умолчанию, которая идентифицируется пустой строкой “”.
Вспомним как раньше мы отправляли письма:

$channel->basic_publish($msg, '', 'hello');

Здесь используется точка доступа по умолчанию или безымянная точка доступа: сообщение направляется в очередь, идентифицированную через ключ “routing_key”. Ключ “routing_key” передается через третий параметр функции basic_publish.

Теперь мы можем отправить сообщение в нашу именованную точку доступа.

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

Временные очереди:

Всё это время мы использовали наименование очередей (“hello“ или “task_queue”). Возможность давать наименования помогает указать обработчикам (workers) определенную очередь, а также делить очередь между продюсерами и подписчиками.
Но наша система логирования требует, чтобы в очередь поступали все сообщения, а не только часть. Также мы хотим, чтобы сообщения были актуальными, а не старыми. Для этого нам понадобиться 2 вещи:
Каждый раз когда мы соединяемся с Rabbit, мы создаем новую очередь, или даем создать серверу случайное наименование.
Каждый раз когда подписчик отключается от Rabbit, мы удаляем очередь.
В php-amqplib клиенте, когда мы обращаемся к очереди без наименовании, мы создаем временную очередь и автоматически сгенерированным наименованием:

list($queue_name, ,) = $channel->queue_declare("");

Метод вернет автоматически сгенерированное имя очереди. Она может быть такой – ‘amq.gen-JzTY20BRgKO-HjmUJj0wLg.’.
Когда заявленное соединение оборвется, очередь автоматически удалиться.

Переплеты(Bindings)

image

Итак, у нас есть точка доступа с типом fanout и очередь. Сейчас нам нужно сказать точке доступа, чтобы она отправила сообщение в очередь. Отношение между точкой доступа и очередью называется bindings.

$channel->queue_bind($queue_name, 'logs');

С этого момента, сообщения для нашей очереди проходят через точку доступа
Посмотреть список binding-ов можно используя команду rabbitmqctl list_bindings

Отправка во все очереди:

image
Программа продюсер, которая создает сообщения, не изменилась с предыдущей статьи. Единственное важное отличие – теперь мы направляем сообщения в нашу именованную точку доступа ‘logs’, вместо точки доступа по умолчанию. Нам нужно было указать имя очереди при отправки сообщения. Но для точки доступа с типом fanout в этом нет необходимости. Рассмотрим код скрипта emit_log.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo " [x] Sent ", $data, "n";

$channel->close();
$connection->close();

?>

(emit_log.php source) [5]

Как вы видите, после установки соединения мы создаем точку доступа. Этот шаг необходим, так как использование несуществующей точки доступа – запрещено.
Сообщение в точке доступа будут потеряны, так как ни одна очередь не связана с точкой доступа. Но это хорошо для нас: пока нет ни одного подписчика нашей точки доступа, все сообщения могут безопасно удалятся.
Код подписчика receive_logs.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPConnection;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo ' [*] Waiting for logs. To exit press CTRL+C', "n";

$callback = function($msg){
  echo ' [x] ', $msg->body, "n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

(receive_logs.php source) [6]

Если вы хотите сохранить логи в файл, вам потребуется открыть консоль и набрать:

$ php receive_logs.php > logs_from_rabbit.log

Если вы хотите отобразить логи на экран, откройте еще одно окно и наберите:
$ php receive_logs.php

Ну и конечно запуск продюсера сообщений:
$ php emit_log.php

С помощью команды rabbitmqctl list_bindings мы можем удостовериться, что код правильно создал очередь и связал её с точкой доступа. С двумя открытыми программами receive_logs.php у вас должно получиться следующее:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.

Здесь отображено, что данные точки доступа logs отправляются в две очереди, имена которых созданы автоматически. Это именно то, чего мы добивались.
В следующей статье будет описано, как прослушать только часть сообщений.

Автор: dlux66

Источник [7]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/php-2/47601

Ссылки в тексте:

[1] серию: http://habrahabr.ru/post/150134/

[2] официального сайта: http://www.rabbitmq.com/tutorials/tutorial-two-python.html

[3] ЯП: http://www.rabbitmq.com/devtools.html

[4] publish/subscribe: http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern

[5] (emit_log.php source): https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/php/emit_log.php

[6] (receive_logs.php source): https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/php/receive_logs.php

[7] Источник: http://habrahabr.ru/post/200870/