RabbitMQ — SQL Server

в 6:58, , рубрики: .net, AMQP, big data, C#, clr, Microsoft SQL Server, sql server; rabbitmq; c#; .net

Неделю или две назад я увидел сообщение на форуме RabbitMQ Users, о том, как наладить отправку сообщений из SQL Server в RabbitMQ. Поскольку мы плотно с этим работаем в Derivco, я оставил там некоторые предложения, а также сказал, что пишу в блоге о том, как это можно сделать. Часть моего сообщения была не совсем верной — по крайней мере, до этого момента (сорри, Бро, был очень занят).

Потрясающая штука, этот ваш SQL Server. С его помощью очень легко поместить информацию в базу данных. Получить данные из базы с помощью запроса столь же просто. А вот получить только что обновленные или вставленные данные уже немного сложнее. Подумайте о событиях в реальном времени; совершена покупка — кого-то нужно уведомить об этом в тот же момент, как только это произошло. Возможно, кто-то скажет, что такие данные должны выталкиваться не из базы данных, а откуда-то еще. Безусловно, так оно и есть, но довольно часто у нас попросту нет выбора.

У нас была задача: отправлять события из базы данных вовне для дальнейшей обработки и стоял вопрос — как это сделать?

SQL Server и внешние коммуникации

За время существования SQL Server было несколько попыток организовать коммуникации за пределы базы данных; SQL Server Notification Services (NS), которая появилась в SQL Server 2000, а после, в SQL Server 2005, появился SQL Server Service Broker (SSB). Я описывал их в своей книге A First Look at SQL Server 2005 for Developers, вместе с Бобом Бошеменом и Дэном Салливаном. NS появился в SQL Server 2000, как я уже говорил, и был перепроектирован в бета-версии SQL Server 2005. Однако из готовой к продаже (RTM) версии SQL Server 2005 NS был выпилен исключен полностью.

Примечание: Если Вы читали книгу, то найдете там ряд особенностей, которых не было в RTM версии.

SSB выжил, и в пакете дополнительных компонентов SQL Server 2008 Microsoft представила Service Broker External Activator (ЕА). Он дает возможность через SSB осуществлять взаимодействие за пределами локальной базы данных. Теоретически звучит неплохо, но на практике — он громоздок и запутан. Мы провели несколько тестов и быстро поняли, что он не выполняет того, что нам нужно. Кроме того, SSB не дал нам той производительности, которая была необходима, поэтому нам пришлось выдумывать нечто иное.

SQLCLR

То, к чему мы пришли в результате, было основано на технологии SQLCLR. SQLCLR — это платформа .NET, встроенная в ядро SQL Server и с ее помощью можно выполнить .NET код внутри ядра. Поскольку мы выполняем .NET код, то имеем возможность делать почти всё, что и в обычном .NET приложении.

Примечание: выше я написал «почти», потому как на самом деле есть некоторые ограничения. В данном контексте эти ограничения почти не влияют на то, что мы собираемся сделать.

Принцип работы SQLCLR заключается в следующем: код компилируется в dll библиотеку, а затем эта библиотека регистрируется средствами SQL Server:

Создание сборки

CREATE ASSEMBLY [RabbitMQ.SqlServer]
AUTHORIZATION rmq
FROM 'F:some_pathRabbitMQSqlClr4.dll'
WITH PERMISSION_SET = UNSAFE;
GO

Фрагмент кода 1: Создание сборки по абсолютному пути

Код выполняет следующие действия:

  • CREATE ASSEMBLY — создает сборку с заданным именем (независимо от того, что это должно быть).
  • AUTHORIZATION — указывает на владельца сборки. В данном случае rmq является заранее определенной ролью SQL Server.
  • FROM — определяет, где расположена оригинальная сборка. В предложении FROM также можно указывать путь в двоичном или UNC форматах. Установочные файлы для этого проекта используют двоичное представление.
  • WITH PERMISSION_SET — устанавливает разрешения. UNSAFE является наименее строгим, и необходим в данном случае.

Примечание: независимо от того, роль или имя входа было использовано в предложении AUTHORIZATION, класс appdomain должен быть создан с тем же именем, как и при загрузке сборки в домен. Рекомендуется разделять сборки разными именами appdomain классов, чтобы при падении одной сборки не свалились остальные. Однако если сборки имеют зависимости друг от друга, их нельзя разделить в разные классы.

Когда сборка создана, делаем в ней обертки методов .NET:

CREATE PROCEDURE rmq.pr_clr_PostRabbitMsg @EndpointID int, @Message nvarchar(max)
AS
EXTERNAL NAME  [RabbitMQ.SqlServer].[RabbitMQSqlClr.RabbitMQSqlServer].[pr_clr_PostRabbitMsg];
GO

Фрагмент кода 2: обертка метода .NET

Код выполняет следующие действия:

  • Создает хранимую процедуру Т-SQL с именем rmq.pr_clr_PostRabbitMsg, принимающую два параметра; @EndpointID и @Message.
  • Вместо тела процедуры используется внешний источник, который состоит из:
    • Сборка с именем RabbitMQ.SqlServer, т. е. агрегат, который мы создали выше во фрагменте кода 1.
    • Полный тип (пространство имен и класс): RabbitMQSqlClr.RabbitMQSqlServer
    • Метод из приведенного выше пространства имен и класса: pr_clr_PostRabbitMsg.

При выполнении процедуры rmq.pr_clr_PostRabbitMsg, будет вызываться метод pr_clr_PostRabbitMsg.

Примечание: при создании процедуры, имя сборки не чувствительно к регистру, в отличие от полного имени типа и метода. Не обязательно чтобы имя создаваемой процедуры совпадало с именем метода. Однако конечные типы данных для параметров должны совпадать.

Как я говорил ранее, у нас в Derivco есть необходимость отправлять данные за пределы SQL Server, поэтому мы используем SQLCLR и RabbitMQ (RMQ).

RabbitMQ

RMQ — это брокер сообщений с открытым исходным кодом, который реализует протокол AMQP (Advanced Message Queuing Protocol) и написан на языке Erlang.

Поскольку RMQ является брокером сообщений, для подключения к нему необходимы клиентские библиотеки AMQP. Приложение ссылается на клиентские библиотеки и с их помощью открывает соединение и отправляет сообщения — как, например, идет обращение через ADO.NET к SQL Server. Но в отличие от ADO.NET, где, скорее всего, соединение открывается каждый раз при обращении к базе данных, здесь соединение остается открытым в течение всего периода работы приложения.

Таким образом, для того, чтобы иметь возможность взаимодействовать из базы данных с RabbitMQ нам нужно приложение и клиентская библиотеки .NET для RabbitMQ.

Примечание: в последующей части данной статьи будут встречаться фрагменты кода RabbitMQ, но без детальных пояснений что они делают. Если вы новичок в работе с RabbitMQ, то предлагаю взглянуть на различные уроки по RabbitMQ, чтобы понимать назначение кода. Обучающий урок Hello World по C# — хорошее начало. Одно из отличий между учебниками и примерами кода заключается в том, что в примерах не объявляются обменники. Предполагается, что они предопределены.

RabbitMQ.SqlServer

RabbitMQ.SqlServer — сборка, использующая клиентскую .NET библиотеку для RabbitMQ и предоставляет возможность отправки сообщений из базы данных в одну или несколько конечных точек RabbitMQ (VHosts и обменники). Код можно скачать/ответвить из моего репозитория RabbitMQ-SqlServer на GitHub. Он содержит исходники сборок и установочные файлы (т.е. Вам не придется компилировать их самостоятельно).

Примечание: это просто пример, чтобы показать как SQL Server может взаимодействовать с RabbitMQ. Это НЕ готовый продукт и даже не часть его. Если этот код разрывает вам мозг — не надо на меня пенять, ибо это просто пример.

Функциональность

При загрузке сборки, или при явном вызове ее инициализации, либо при косвенном обращении, в момент вызова процедуры-обертки, сборка загружает строку соединения в локальную базу данных, в которую она была установлена, как и конечные точки RabbitMQ, к которым она подключается:

Подключение

internal bool InternalConnect()
{
  try
  {
    connFactory = new ConnectionFactory();
    connFactory.Uri = connString;
    connFactory.AutomaticRecoveryEnabled = true;
    connFactory.TopologyRecoveryEnabled = true;
    RabbitConn = connFactory.CreateConnection();
 
 
    for (int x = 0; x < channels; x++)
    {
      var ch = RabbitConn.CreateModel();
      rabbitChannels.Push(ch);
    }
 
    return true;
  }
  catch(Exception ex)
  {
    return false;
  }
}

Фрагмент кода 3: подключение к конечной точке

В это же время часть подключения к конечной точкой также создает IModels на соединении, и они используются при отправке (добавлении в очередь) сообщений:

Отправка сообщения

internal bool Post(string exchange, byte[] msg, string topic)
{
  IModel value = null;
  int channelTryCount = 0;
  try
  {
    while ((!rabbitChannels.TryPop(out value)) && channelTryCount < 100)
    {
      channelTryCount += 1;
      Thread.Sleep(50);
    }
 
    if (channelTryCount == 100)
    {
      var errMsg = $"Channel pool blocked when trying to post message to Exchange: {exchange}.";
      throw new ApplicationException(errMsg);
      }
 
    value.BasicPublish(exchange, topic, false, null, msg);
    rabbitChannels.Push(value);
    return true;
 
  }
  catch (Exception ex)
  {
    if (value != null)
    {
      _rabbitChannels.Push(value);
    }
    throw;
  }
}

Метод Post вызывается из метода pr_clr_PostRabbitMsg(int endPointId, string msgToPost), который был представлен в качестве процедуры с помощью предложения CREATE PROCEDURE во фрагменте кода 2:

Способ вызова Post

public static void pr_clr_PostRabbitMsg(int endPointId, string msgToPost)
{
  try
  {
    if(endPointId == 0)
    {
      throw new ApplicationException("EndpointId cannot be 0");
    }
    if (!isInitialised)
    {
      pr_clr_InitialiseRabbitMq();
    }
    var msg = Encoding.UTF8.GetBytes(msgToPost);
    if (endPointId == -1)
    {
      foreach (var rep in remoteEndpoints)
      {
        var exch = rep.Value.Exchange;
        var topic = rep.Value.RoutingKey;
        foreach (var pub in rabbitPublishers.Values)
        {
          pub.Post(exch, msg, topic);
        }
      }
    }
    else
    {
      RabbitPublisher pub;
      if (rabbitPublishers.TryGetValue(endPointId, out pub))
      {
        pub.Post(remoteEndpoints[endPointId].Exchange, msg, remoteEndpoints[endPointId].RoutingKey);
      }
      else
      {
        throw new ApplicationException($"EndpointId: {endPointId}, does not exist");
      }
    }
  }
  catch
  {
    throw;
  }
}  

Фрагмент кода 5: Представление метода в виде процедуры

При выполнении метода предполагается, что вызывающий объект отправляет идентификатор конечной точки, в которую необходимо передать сообщение, и, собственно, само сообщение. Если в качестве идентификатора конечной точки передается значение -1, то мы перебираем все точки и отправляем сообщение каждой из них. Сообщение приходит в виде строки, из которой мы получаем байты с помощью Encoding.UTF8.GetBytes. В рабочей среде вызов Encoding.UTF8.GetBytes следует заменить на сериализацию.

Установка

Чтобы установить и запустить пример, нужны все файлы в папке srcSQL. Для установки выполните следующие действия:

  • Запустите скрипт 01.create_database_and_role.sql. Он создаст:
    • тестовую базу данных RabbitMQTest, где будет создана сборка.
    • роль rmq, которая будет назначена в качестве владельца сборки
    • схему, которая тоже будет назваться rmq. В этой схеме создаются различные объекты базы данных.

  • Запустите файл 02.create_database_objects.sql. Он создаст:
    • таблицу rmq.tb_RabbitSetting, в которой будет храниться строка подключения к локальной БД.
    • таблицу rmq.tb_RabbitEndpoint, в которой будет храниться одна или несколько конечных точек RabbitMQ.

  • В файле 03.create_localhost_connstring.sql измените значение переменной @connString на правильную строку подключения к базе RabbitMQTest, созданной на шаге 1 и запустите скрипт.

Прежде чем продолжить, необходимо иметь запущенный экземпляр брокера RabbitMQ и VHost (по умолчанию VHost представлен как /). Как правило, у нас есть несколько VHost, просто для изоляции. Этот хост также нуждается в обменнике, в примере мы используем amq.topic. Когда у вас готов брокер RabbitMQ, отредактируйте параметры процедуры rmq.pr_UpsertRabbitEndpoint, которая находится в файле 04.upsert_rabbit_endpoint.sql:

Конечная точка RabbitMQ

EXEC rmq.pr_UpsertRabbitEndpoint @Alias = 'rabbitEp1',
                                 @ServerName = 'RabbitServer',
                                 @Port = 5672,
                                 @VHost = 'testHost',
                                 @LoginName = 'rabbitAdmin',
                                 @LoginPassword = 'some_secret_password',
                                 @Exchange = 'amq.topic',
                                 @RoutingKey = '#',
                                 @ConnectionChannels = 5,
                                 @IsEnabled = 1

Фрагмент кода 6: создание конечной точки в RabbitMQ

На данном этапе настало время развернуть сборки. В развертываемых вариантах есть отличия для версий SQL Server, предшествующих SQL Server 2014 (2005, 2008, 2008R2, 2012), и для 2014 и выше. Разница заключается в поддерживаемой версии CLR. До SQL Server 2014 платформа .NET выполнялась в среде CLR версии 2, а в SQL Server 2014 и выше используется версия 4.

SQL Server 2005 — 2012

Давайте начнем с версий SQL Server, которые работают на CLR 2, так как там есть свои особенности. Нам нужно развернуть созданную сборку, а вместе с тем развернуть клиентскую .NET библиотеку RabbitMQ (RabbitMQ.Client). Из нашей сборки будем ссылаться на клиентскую библиотеку RabbitMQ. Т.к. мы планировали использовать CLR 2, то наша сборка и RabbitMQ.Client должны быть скомпилирована на основе .NET 3.5. Тут возникают проблемы.

Все последние версии библиотеки RabbitMQ.Client скомпилированы для среды CLR 4, поэтому они не могут использоваться в нашей сборке. Последняя версия клиентских библиотек для CLR 2 собрана на .NET 3.4.3. Но даже если мы попытаемся развернуть эту сборку, то получим сообщение об ошибке:

RabbitMQ — SQL Server - 1
Рисунок 1: Отсутствует сборка System.ServiceModel

Эта версия RabbitMQ.Client ссылается на сборку, не являющуюся частью CLR SQL Server. Это WCF сборка, и это одно из ограничений в SQLCLR, о которых я говорил выше: эта конкретная сборка предназначена для таких типов задач, которые не допускается выполнять в SQL Server. Последние версии RabbitMQ.Client не имеют этих зависимостей, поэтому могут использоваться без каких-либо проблем, если не считать досадные требования среды CLR 4. Что делать?

Как известно, RabbitMQ имеет открытый исходный код, ну, а мы ведь с вами разработчики, верно? ;) Так давайте же перекомпилим! В варианте до последних релизов (т.е. версии <3.5.0) RabbitMQ.Client я удалил ссылки на System.ServiceModel и перекомпилил. Мне пришлось изменить пару строчек кода, использующих функционал System.ServiceModel, но это были незначительные изменения.

В этом примере я не использовал версию клиента 3.4.3, а взял стабильный релиз 3.6.6 и перекомпилировал с использованием .NET 3.5 (CLR 2). Это почти сработало :), за исключением того, что более поздние релизы RabbitMQ.Client используют Task'и, которые изначально не являются частью .NET 3.5.

К счастью, есть версия System.Threading.dll для .NET 3.5, которая включает Task. Я скачал её, настроил ссылки и все поехало! Тут главная фишка в том, что System.Threading.dll должна быть установлена вместе со сборкой.

Примечание: исходник RabbitMQ.Client, из которого я собрал версию на .NET 3.5, есть в моем репозитории на GitHub RabbitMQ Client 3.6.6 .NET 3.5. Бинарник dll вместе с System.Threading.dll для .NET 3.5 также лежит в каталоге libNET3.5 репозитория (RabbitMQ-SqlServer).

Для установки необходимых сборок (System.Threading, RabbitMQ.Client и RabbitMQ.SqlServer) запустите установочные скрипты из каталога srcsql в следующем порядке:

  1. 05.51.System.Threading.sql2k5-12.sql — System.Threading
  2. 05.52.RabbitMQ.Client.sql2k5-12.sql — RabbitMQ.Client
  3. 05.53.RabbitMQ.SqlServer.sql2k5-12.sql — RabbitMQ.SqlServer

SQL Server 2014+

В SQL Server 2014 и более поздних версиях сборка компилируется под .NET 4.ХХ (мой пример на 4.5.2), и вы можете ссылаться на любую из последних версий RabbitMQ.Client, которую можно получить с помощью NuGet. В своем примере я использую 4.1.1. RabbitMQ.Client, которая так же есть в каталоге libNET4 репозитория (RabbitMQ-SqlServer).

Для установки запустите скрипты из каталога srcsql в следующем порядке:

  1. 05.141.RabbitMQ.Client.sql2k14+.sql — RabbitMQ.Client
  2. 05.142.RabbitMQ.SqlServer.sql2k14+.sql — RabbitMQ.SqlServer

Обертки методов SQL

Чтобы создать процедуры, которые будут использоваться из нашей сборки (3.5 или 4), запустите скрипт 06.create_sqlclr_procedures.sql. Он создаст Т-SQL процедуры для трех .NET-методов:

  • rmq.pr_clr_InitialiseRabbitMq вызывает pr_clr_InitialiseRabbitMq. Используется для загрузки и инициализации сборки RabbitMQ.SqlServer.
  • rmq.pr_clr_ReloadRabbitEndpoints вызывает pr_clr_ReloadRabbitEndpoints. Загружает различные конечные точки RabbitMQ.
  • rmq.pr_clr_PostRabbitMsg вызывает pr_clr_PostRabbitMsg. Используется для отправки сообщения в RabbitMQ.

Скрипт также создает простую T-SQL процедуру — rmq.pr_PostRabbitMsg, которая применяется к rmq.pr_clr_PostRabbitMsg. Это процедура-обертка, которая знает, что делать с данными, обрабатывает исключения и т.д. В рабочей среде у нас есть несколько подобных процедур, обрабатывающих различные типы сообщений. Подробнее об этом читайте ниже.

Использование

Из всего вышеперечисленного видно, что для отправки сообщений в RabbitMQ мы вызываем rmq.pr_PostRabbitMsg или rmq.pr_clr_PostRabbitMsg, передав в параметрах идентификатор конечной точки и само сообщение в виде строки. Все это, конечно, круто, но хотелось бы видеть как это будет работать в реальности.

Что мы делаем в рабочих средах — в хранимых процедурах, обрабатывающих данные, которые должны быть отправлены в RabbitMQ, мы собираем данные для отправки и в блоке подключения вызываем процедуру, подобную rmq.pr_PostRabbitMsg. Ниже приведен очень упрощенный пример такой процедуры:

Процедура обработки данных

ALTER PROCEDURE dbo.pr_SomeProcessingStuff @id int
AS
BEGIN
  SET NOCOUNT ON;
  BEGIN TRY
    --создаем переменную для конечной точки
    DECLARE @endPointId int;
    --создаем переменную для сообщения
    DECLARE @msg nvarchar(max) = '{'
    --выполняем необходимые действия и собираем данные для сообщения
    SET @msg = @msg + '"Id":' + CAST(@id AS varchar(10)) + ','
    -- делаем что-то еще
    SET @msg = @msg + '"FName":"Hello",';
    SET @msg = @msg + '"LName":"World"';
    SET @msg = @msg + '}';
 
    --снова что-то делаем
    -- получаем идентификатор конечной точки откуда-то, по каким-то условиям
    SELECT @endPointId = 1;
    --здесь начинается блок подключения
    --вызываем процедуру для отправки сообщения
    EXEC rmq.pr_PostRabbitMsg @Message = @msg, @EndpointID = @endPointId;
  END TRY
  BEGIN CATCH
    DECLARE @errMsg nvarchar(max);
    DECLARE @errLine int;
    SELECT @errMsg = ERROR_MESSAGE(), @errLine = ERROR_LINE();
    RAISERROR('Error: %s at line: %d', 16, -1, @errMsg, @errLine);
  END CATCH
END

Во фрагменте кода 7 мы видим, как в процедуре захватываются и обрабатываются нужные данные и после обработки отправляются. Чтобы использовать эту процедуру, выполните скрипт 07.create_processing_procedure.sql из каталога srcSQL.

Давайте все это запустим

На этом этапе вы должны быть готовы отправить несколько сообщений. Перед тестированием убедитесь, что у вас в RabbitMQ есть очереди, привязанные к обменнику конечной точки в rmq.tb_RabbitEndpoint.

Итак, для запуска нужно проделать следующее:
Откройте файл 99.test_send_message.sql.
Выполните

EXEC rmq.pr_clr_InitialiseRabbitMq;

чтобы инициализировать сборку и загрузить конечные точки RabbitMQ. Это не обязательное действие, но рекомендуется предварительно загрузить сборку, после ее создания или изменения.

Выполните

EXEC dbo.pr_SomeProcessingStuff @id = 101

(можно использовать любой другой идентификатор, какой нравится).

Если все отработало без ошибок, то в очереди RabbitMQ должно появиться сообщение! Вот Вы и воспользовались SQLCLR для отправки сообщения в RabbitMQ.

Конгратулейшенс!

Автор: rt001

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js