- PVSM.RU - https://www.pvsm.ru -

RabbitMQ — SQL Server

Неделю или две назад я увидел сообщение [1] на форуме RabbitMQ Users [2], о том, как наладить отправку сообщений из SQL Server в RabbitMQ. Поскольку мы плотно с этим работаем в Derivco [3], я оставил там некоторые предложения, а также сказал, что пишу в блоге о том, как это можно сделать. Часть моего сообщения была не совсем верной — по крайней мере, до этого момента (сорри, Бро, был очень занят).

Потрясающая штука, этот ваш SQL Server. С его помощью очень легко поместить информацию в базу данных. Получить данные из базы с помощью запроса столь же просто. А вот получить только что обновленные или вставленные данные уже немного сложнее. Подумайте о событиях в реальном времени; совершена покупка — кого-то нужно уведомить об этом в тот же момент, как только это произошло. Возможно, кто-то скажет, что такие данные должны выталкиваться не из базы данных, а откуда-то еще. Безусловно, так оно и есть, но довольно часто у нас попросту нет выбора.

У нас [3] была задача: отправлять события из базы данных вовне для дальнейшей обработки и стоял вопрос — как это сделать?

SQL Server и внешние коммуникации

За время существования SQL Server было несколько попыток организовать коммуникации за пределы базы данных; SQL Server Notification Services (NS), которая появилась в SQL Server 2000, а после, в SQL Server 2005, появился SQL Server Service Broker (SSB). Я описывал их в своей книге A First Look at SQL Server 2005 for Developers [4], вместе с Бобом Бошеменом и Дэном Салливаном. NS появился в SQL Server 2000, как я уже говорил, и был перепроектирован в бета-версии SQL Server 2005. Однако из готовой к продаже (RTM) версии SQL Server 2005 NS был выпилен исключен полностью.

Примечание: Если Вы читали книгу, то найдете там ряд особенностей, которых не было в RTM версии.

SSB выжил, и в пакете дополнительных компонентов SQL Server 2008 Microsoft представила Service Broker External Activator [5] (ЕА). Он дает возможность через SSB осуществлять взаимодействие за пределами локальной базы данных. Теоретически звучит неплохо, но на практике — он громоздок и запутан. Мы провели несколько тестов и быстро поняли, что он не выполняет того, что нам нужно. Кроме того, SSB не дал нам той производительности, которая была необходима, поэтому нам пришлось выдумывать нечто иное.

SQLCLR

То, к чему мы пришли в результате, было основано на технологии SQLCLR. SQLCLR — это платформа .NET, встроенная в ядро SQL Server и с ее помощью можно выполнить .NET код внутри ядра. Поскольку мы выполняем .NET код, то имеем возможность делать почти всё, что и в обычном .NET приложении.

Примечание: выше я написал «почти», потому как на самом деле есть некоторые ограничения. В данном контексте эти ограничения почти не влияют на то, что мы собираемся сделать.

Принцип работы SQLCLR заключается в следующем: код компилируется в dll библиотеку, а затем эта библиотека регистрируется средствами SQL Server:

Создание сборки

CREATE ASSEMBLY [RabbitMQ.SqlServer]
AUTHORIZATION rmq
FROM 'F:some_pathRabbitMQSqlClr4.dll'
WITH PERMISSION_SET = UNSAFE;
GO

Фрагмент кода 1: Создание сборки по абсолютному пути

Код выполняет следующие действия:

  • CREATE ASSEMBLY — создает сборку с заданным именем (независимо от того, что это должно быть).
  • AUTHORIZATION — указывает на владельца сборки. В данном случае rmq является заранее определенной ролью SQL Server.
  • FROM — определяет, где расположена оригинальная сборка. В предложении FROM также можно указывать путь в двоичном или UNC форматах. Установочные файлы для этого проекта используют двоичное представление.
  • WITH PERMISSION_SET — устанавливает разрешения. UNSAFE является наименее строгим, и необходим в данном случае.

Примечание: независимо от того, роль или имя входа было использовано в предложении AUTHORIZATION, класс appdomain должен быть создан с тем же именем, как и при загрузке сборки в домен. Рекомендуется разделять сборки разными именами appdomain классов, чтобы при падении одной сборки не свалились остальные. Однако если сборки имеют зависимости друг от друга, их нельзя разделить в разные классы.

Когда сборка создана, делаем в ней обертки методов .NET:

CREATE PROCEDURE rmq.pr_clr_PostRabbitMsg @EndpointID int, @Message nvarchar(max)
AS
EXTERNAL NAME  [RabbitMQ.SqlServer].[RabbitMQSqlClr.RabbitMQSqlServer].[pr_clr_PostRabbitMsg];
GO

Фрагмент кода 2: обертка метода .NET

Код выполняет следующие действия:

  • Создает хранимую процедуру Т-SQL с именем rmq.pr_clr_PostRabbitMsg, принимающую два параметра; @EndpointID и @Message.
  • Вместо тела процедуры используется внешний источник, который состоит из:
    • Сборка с именем RabbitMQ.SqlServer, т. е. агрегат, который мы создали выше во фрагменте кода 1.
    • Полный тип (пространство имен и класс): RabbitMQSqlClr.RabbitMQSqlServer
    • Метод из приведенного выше пространства имен и класса: pr_clr_PostRabbitMsg.

При выполнении процедуры rmq.pr_clr_PostRabbitMsg, будет вызываться метод pr_clr_PostRabbitMsg.

Примечание: при создании процедуры, имя сборки не чувствительно к регистру, в отличие от полного имени типа и метода. Не обязательно чтобы имя создаваемой процедуры совпадало с именем метода. Однако конечные типы данных для параметров должны совпадать.

Как я говорил ранее, у нас в Derivco есть необходимость отправлять данные за пределы SQL Server, поэтому мы используем SQLCLR и RabbitMQ [6] (RMQ).

RabbitMQ

RMQ — это брокер сообщений с открытым исходным кодом, который реализует протокол AMQP (Advanced Message Queuing Protocol) и написан на языке Erlang.

Поскольку RMQ является брокером сообщений, для подключения к нему необходимы клиентские библиотеки AMQP. Приложение ссылается на клиентские библиотеки и с их помощью открывает соединение и отправляет сообщения — как, например, идет обращение через ADO.NET к SQL Server. Но в отличие от ADO.NET, где, скорее всего, соединение открывается каждый раз при обращении к базе данных, здесь соединение остается открытым в течение всего периода работы приложения.

Таким образом, для того, чтобы иметь возможность взаимодействовать из базы данных с RabbitMQ нам нужно приложение и клиентская библиотеки .NET для RabbitMQ.

Примечание: в последующей части данной статьи будут встречаться фрагменты кода RabbitMQ, но без детальных пояснений что они делают. Если вы новичок в работе с RabbitMQ, то предлагаю взглянуть на различные уроки по RabbitMQ [7], чтобы понимать назначение кода. Обучающий урок Hello World [8] по C# — хорошее начало. Одно из отличий между учебниками и примерами кода заключается в том, что в примерах не объявляются обменники. Предполагается, что они предопределены.

RabbitMQ.SqlServer

RabbitMQ.SqlServer — сборка, использующая клиентскую .NET библиотеку для RabbitMQ и предоставляет возможность отправки сообщений из базы данных в одну или несколько конечных точек RabbitMQ (VHosts и обменники). Код можно скачать/ответвить из моего репозитория RabbitMQ-SqlServer [9] на GitHub. Он содержит исходники сборок и установочные файлы (т.е. Вам не придется компилировать их самостоятельно).

Примечание: это просто пример, чтобы показать как SQL Server может взаимодействовать с RabbitMQ. Это НЕ готовый продукт и даже не часть его. Если этот код разрывает вам мозг [10] — не надо на меня пенять, ибо это просто пример.

Функциональность

При загрузке сборки, или при явном вызове ее инициализации, либо при косвенном обращении, в момент вызова процедуры-обертки, сборка загружает строку соединения в локальную базу данных, в которую она была установлена, как и конечные точки RabbitMQ, к которым она подключается:

Подключение

internal bool InternalConnect()
{
  try
  {
    connFactory = new ConnectionFactory();
    connFactory.Uri = connString;
    connFactory.AutomaticRecoveryEnabled = true;
    connFactory.TopologyRecoveryEnabled = true;
    RabbitConn = connFactory.CreateConnection();
 
 
    for (int x = 0; x < channels; x++)
    {
      var ch = RabbitConn.CreateModel();
      rabbitChannels.Push(ch);
    }
 
    return true;
  }
  catch(Exception ex)
  {
    return false;
  }
}

Фрагмент кода 3: подключение к конечной точке

В это же время часть подключения к конечной точкой также создает IModels на соединении, и они используются при отправке (добавлении в очередь) сообщений:

Отправка сообщения

internal bool Post(string exchange, byte[] msg, string topic)
{
  IModel value = null;
  int channelTryCount = 0;
  try
  {
    while ((!rabbitChannels.TryPop(out value)) && channelTryCount < 100)
    {
      channelTryCount += 1;
      Thread.Sleep(50);
    }
 
    if (channelTryCount == 100)
    {
      var errMsg = $"Channel pool blocked when trying to post message to Exchange: {exchange}.";
      throw new ApplicationException(errMsg);
      }
 
    value.BasicPublish(exchange, topic, false, null, msg);
    rabbitChannels.Push(value);
    return true;
 
  }
  catch (Exception ex)
  {
    if (value != null)
    {
      _rabbitChannels.Push(value);
    }
    throw;
  }
}

Метод Post вызывается из метода pr_clr_PostRabbitMsg(int endPointId, string msgToPost), который был представлен в качестве процедуры с помощью предложения CREATE PROCEDURE во фрагменте кода 2:

Способ вызова Post

public static void pr_clr_PostRabbitMsg(int endPointId, string msgToPost)
{
  try
  {
    if(endPointId == 0)
    {
      throw new ApplicationException("EndpointId cannot be 0");
    }
    if (!isInitialised)
    {
      pr_clr_InitialiseRabbitMq();
    }
    var msg = Encoding.UTF8.GetBytes(msgToPost);
    if (endPointId == -1)
    {
      foreach (var rep in remoteEndpoints)
      {
        var exch = rep.Value.Exchange;
        var topic = rep.Value.RoutingKey;
        foreach (var pub in rabbitPublishers.Values)
        {
          pub.Post(exch, msg, topic);
        }
      }
    }
    else
    {
      RabbitPublisher pub;
      if (rabbitPublishers.TryGetValue(endPointId, out pub))
      {
        pub.Post(remoteEndpoints[endPointId].Exchange, msg, remoteEndpoints[endPointId].RoutingKey);
      }
      else
      {
        throw new ApplicationException($"EndpointId: {endPointId}, does not exist");
      }
    }
  }
  catch
  {
    throw;
  }
}  

Фрагмент кода 5: Представление метода в виде процедуры

При выполнении метода предполагается, что вызывающий объект отправляет идентификатор конечной точки, в которую необходимо передать сообщение, и, собственно, само сообщение. Если в качестве идентификатора конечной точки передается значение -1, то мы перебираем все точки и отправляем сообщение каждой из них. Сообщение приходит в виде строки, из которой мы получаем байты с помощью Encoding.UTF8.GetBytes. В рабочей среде вызов Encoding.UTF8.GetBytes следует заменить на сериализацию.

Установка

Чтобы установить и запустить пример, нужны все файлы в папке srcSQL. Для установки выполните следующие действия:

  • Запустите скрипт 01.create_database_and_role.sql. Он создаст:
    • тестовую базу данных RabbitMQTest, где будет создана сборка.
    • роль rmq, которая будет назначена в качестве владельца сборки
    • схему, которая тоже будет назваться rmq. В этой схеме создаются различные объекты базы данных.

  • Запустите файл 02.create_database_objects.sql. Он создаст:
    • таблицу rmq.tb_RabbitSetting, в которой будет храниться строка подключения к локальной БД.
    • таблицу rmq.tb_RabbitEndpoint, в которой будет храниться одна или несколько конечных точек RabbitMQ.

  • В файле 03.create_localhost_connstring.sql измените значение переменной @connString на правильную строку подключения к базе RabbitMQTest, созданной на шаге 1 и запустите скрипт.

Прежде чем продолжить, необходимо иметь запущенный экземпляр брокера RabbitMQ и VHost (по умолчанию VHost представлен как /). Как правило, у нас есть несколько VHost, просто для изоляции. Этот хост также нуждается в обменнике, в примере мы используем amq.topic. Когда у вас готов брокер RabbitMQ, отредактируйте параметры процедуры rmq.pr_UpsertRabbitEndpoint, которая находится в файле 04.upsert_rabbit_endpoint.sql:

Конечная точка RabbitMQ

EXEC rmq.pr_UpsertRabbitEndpoint @Alias = 'rabbitEp1',
                                 @ServerName = 'RabbitServer',
                                 @Port = 5672,
                                 @VHost = 'testHost',
                                 @LoginName = 'rabbitAdmin',
                                 @LoginPassword = 'some_secret_password',
                                 @Exchange = 'amq.topic',
                                 @RoutingKey = '#',
                                 @ConnectionChannels = 5,
                                 @IsEnabled = 1

Фрагмент кода 6: создание конечной точки в RabbitMQ

На данном этапе настало время развернуть сборки. В развертываемых вариантах есть отличия для версий SQL Server, предшествующих SQL Server 2014 (2005, 2008, 2008R2, 2012), и для 2014 и выше. Разница заключается в поддерживаемой версии CLR. До SQL Server 2014 платформа .NET выполнялась в среде CLR версии 2, а в SQL Server 2014 и выше используется версия 4.

SQL Server 2005 — 2012

Давайте начнем с версий SQL Server, которые работают на CLR 2, так как там есть свои особенности. Нам нужно развернуть созданную сборку, а вместе с тем развернуть клиентскую .NET библиотеку RabbitMQ (RabbitMQ.Client). Из нашей сборки будем ссылаться на клиентскую библиотеку RabbitMQ. Т.к. мы планировали использовать CLR 2, то наша сборка и RabbitMQ.Client должны быть скомпилирована на основе .NET 3.5. Тут возникают проблемы.

Все последние версии библиотеки RabbitMQ.Client скомпилированы для среды CLR 4, поэтому они не могут использоваться в нашей сборке. Последняя версия клиентских библиотек для CLR 2 собрана на .NET 3.4.3. Но даже если мы попытаемся развернуть эту сборку, то получим сообщение об ошибке:

RabbitMQ — SQL Server - 1
Рисунок 1: Отсутствует сборка System.ServiceModel

Эта версия RabbitMQ.Client ссылается на сборку, не являющуюся частью CLR SQL Server. Это WCF сборка, и это одно из ограничений в SQLCLR, о которых я говорил выше: эта конкретная сборка предназначена для таких типов задач, которые не допускается выполнять в SQL Server. Последние версии RabbitMQ.Client не имеют этих зависимостей, поэтому могут использоваться без каких-либо проблем, если не считать досадные требования среды CLR 4. Что делать?

Как известно, RabbitMQ имеет открытый исходный код, ну, а мы ведь с вами разработчики, верно? ;) Так давайте же перекомпилим! В варианте до последних релизов (т.е. версии <3.5.0) RabbitMQ.Client я удалил ссылки на System.ServiceModel и перекомпилил. Мне пришлось изменить пару строчек кода, использующих функционал System.ServiceModel, но это были незначительные изменения.

В этом примере я не использовал версию клиента 3.4.3, а взял стабильный релиз 3.6.6 [11] и перекомпилировал с использованием .NET 3.5 (CLR 2). Это почти сработало :), за исключением того, что более поздние релизы RabbitMQ.Client используют Task'и, которые изначально не являются частью .NET 3.5.

К счастью, есть версия System.Threading.dll [12] для .NET 3.5, которая включает Task. Я скачал её, настроил ссылки и все поехало! Тут главная фишка в том, что System.Threading.dll должна быть установлена вместе со сборкой.

Примечание: исходник RabbitMQ.Client, из которого я собрал версию на .NET 3.5, есть в моем репозитории на GitHub RabbitMQ Client 3.6.6 .NET 3.5 [13]. Бинарник dll вместе с System.Threading.dll для .NET 3.5 также лежит в каталоге libNET3.5 репозитория (RabbitMQ-SqlServer) [9].

Для установки необходимых сборок (System.Threading, RabbitMQ.Client и RabbitMQ.SqlServer) запустите установочные скрипты из каталога srcsql в следующем порядке:

  1. 05.51.System.Threading.sql2k5-12.sql — System.Threading
  2. 05.52.RabbitMQ.Client.sql2k5-12.sql — RabbitMQ.Client
  3. 05.53.RabbitMQ.SqlServer.sql2k5-12.sql — RabbitMQ.SqlServer

SQL Server 2014+

В SQL Server 2014 и более поздних версиях сборка компилируется под .NET 4.ХХ (мой пример на 4.5.2), и вы можете ссылаться на любую из последних версий RabbitMQ.Client, которую можно получить с помощью NuGet [14]. В своем примере я использую 4.1.1. RabbitMQ.Client, которая так же есть в каталоге libNET4 репозитория (RabbitMQ-SqlServer) [9].

Для установки запустите скрипты из каталога srcsql в следующем порядке:

  1. 05.141.RabbitMQ.Client.sql2k14+.sql — RabbitMQ.Client
  2. 05.142.RabbitMQ.SqlServer.sql2k14+.sql — RabbitMQ.SqlServer

Обертки методов SQL

Чтобы создать процедуры, которые будут использоваться из нашей сборки (3.5 или 4), запустите скрипт 06.create_sqlclr_procedures.sql. Он создаст Т-SQL процедуры для трех .NET-методов:

  • rmq.pr_clr_InitialiseRabbitMq вызывает pr_clr_InitialiseRabbitMq. Используется для загрузки и инициализации сборки RabbitMQ.SqlServer.
  • rmq.pr_clr_ReloadRabbitEndpoints вызывает pr_clr_ReloadRabbitEndpoints. Загружает различные конечные точки RabbitMQ.
  • rmq.pr_clr_PostRabbitMsg вызывает pr_clr_PostRabbitMsg. Используется для отправки сообщения в RabbitMQ.

Скрипт также создает простую T-SQL процедуру — rmq.pr_PostRabbitMsg, которая применяется к rmq.pr_clr_PostRabbitMsg. Это процедура-обертка, которая знает, что делать с данными, обрабатывает исключения и т.д. В рабочей среде у нас есть несколько подобных процедур, обрабатывающих различные типы сообщений. Подробнее об этом читайте ниже.

Использование

Из всего вышеперечисленного видно, что для отправки сообщений в RabbitMQ мы вызываем rmq.pr_PostRabbitMsg или rmq.pr_clr_PostRabbitMsg, передав в параметрах идентификатор конечной точки и само сообщение в виде строки. Все это, конечно, круто, но хотелось бы видеть как это будет работать в реальности.

Что мы делаем в рабочих средах — в хранимых процедурах, обрабатывающих данные, которые должны быть отправлены в RabbitMQ, мы собираем данные для отправки и в блоке подключения вызываем процедуру, подобную rmq.pr_PostRabbitMsg. Ниже приведен очень упрощенный пример такой процедуры:

Процедура обработки данных

ALTER PROCEDURE dbo.pr_SomeProcessingStuff @id int
AS
BEGIN
  SET NOCOUNT ON;
  BEGIN TRY
    --создаем переменную для конечной точки
    DECLARE @endPointId int;
    --создаем переменную для сообщения
    DECLARE @msg nvarchar(max) = '{'
    --выполняем необходимые действия и собираем данные для сообщения
    SET @msg = @msg + '"Id":' + CAST(@id AS varchar(10)) + ','
    -- делаем что-то еще
    SET @msg = @msg + '"FName":"Hello",';
    SET @msg = @msg + '"LName":"World"';
    SET @msg = @msg + '}';
 
    --снова что-то делаем
    -- получаем идентификатор конечной точки откуда-то, по каким-то условиям
    SELECT @endPointId = 1;
    --здесь начинается блок подключения
    --вызываем процедуру для отправки сообщения
    EXEC rmq.pr_PostRabbitMsg @Message = @msg, @EndpointID = @endPointId;
  END TRY
  BEGIN CATCH
    DECLARE @errMsg nvarchar(max);
    DECLARE @errLine int;
    SELECT @errMsg = ERROR_MESSAGE(), @errLine = ERROR_LINE();
    RAISERROR('Error: %s at line: %d', 16, -1, @errMsg, @errLine);
  END CATCH
END

Во фрагменте кода 7 мы видим, как в процедуре захватываются и обрабатываются нужные данные и после обработки отправляются. Чтобы использовать эту процедуру, выполните скрипт 07.create_processing_procedure.sql из каталога srcSQL.

Давайте все это запустим

На этом этапе вы должны быть готовы отправить несколько сообщений. Перед тестированием убедитесь, что у вас в RabbitMQ есть очереди, привязанные к обменнику конечной точки в rmq.tb_RabbitEndpoint.

Итак, для запуска нужно проделать следующее:
Откройте файл 99.test_send_message.sql.
Выполните

EXEC rmq.pr_clr_InitialiseRabbitMq;

чтобы инициализировать сборку и загрузить конечные точки RabbitMQ. Это не обязательное действие, но рекомендуется предварительно загрузить сборку, после ее создания или изменения.

Выполните

EXEC dbo.pr_SomeProcessingStuff @id = 101

(можно использовать любой другой идентификатор, какой нравится).

Если все отработало без ошибок, то в очереди RabbitMQ должно появиться сообщение! Вот Вы и воспользовались SQLCLR для отправки сообщения в RabbitMQ.

Конгратулейшенс!

Автор: rt001

Источник [15]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/c-2/288607

Ссылки в тексте:

[1] сообщение: https://groups.google.com/forum/#!searchin/rabbitmq-users/sql%2420server%7Csort:relevance/rabbitmq-users/T8fA1KdVtYE/bkRz3JkOCwAJ

[2] RabbitMQ Users: https://groups.google.com/forum/#!forum/rabbitmq-users

[3] Derivco: http://www.nielsberglund.com/Derivco

[4] A First Look at SQL Server 2005 for Developers: https://www.amazon.com/First-Look-Server-2005-Developers/dp/0321180593/ref=sr_1_1?ie=UTF8&qid=1486701050&sr=8-1&keywords=A+First+Look+at+SQL+Server+2005+for+Developers

[5] Service Broker External Activator: https://blogs.msdn.microsoft.com/sql_service_broker/2008/11/21/announcing-service-broker-external-activator/

[6] RabbitMQ: http://www.rabbitmq.com/

[7] уроки по RabbitMQ: http://www.rabbitmq.com/getstarted.html

[8] Hello World: http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

[9] RabbitMQ-SqlServer: https://github.com/nberglund/RabbitMQ-SqlServer

[10] мозг: http://www.braintools.ru

[11] стабильный релиз 3.6.6: https://github.com/rabbitmq/rabbitmq-dotnet-client/tree/stable

[12] System.Threading.dll: https://www.nuget.org/packages/System.Threading.dll/

[13] RabbitMQ Client 3.6.6 .NET 3.5: https://github.com/nberglund/rabbitmq-dotnet-client-3.6.6-stable_net_3.5

[14] NuGet: https://www.nuget.org/packages/RabbitMQ.Client

[15] Источник: https://habr.com/post/419457/?utm_source=habrahabr&utm_medium=rss&utm_campaign=419457