Неделю или две назад я увидел сообщение[1] на форуме RabbitMQ Users[2], о том, как наладить отправку сообщений из SQL Server в RabbitMQ. Поскольку мы плотно с этим работаем в Derivco[3], я оставил там некоторые предложения, а также сказал, что пишу в блоге о том, как это можно сделать. Часть моего сообщения была не совсем верной — по крайней мере, до этого момента (сорри, Бро, был очень занят).
Потрясающая штука, этот ваш SQL Server. С его помощью очень легко поместить информацию в базу данных. Получить данные из базы с помощью запроса столь же просто. А вот получить только что обновленные или вставленные данные уже немного сложнее. Подумайте о событиях в реальном времени; совершена покупка — кого-то нужно уведомить об этом в тот же момент, как только это произошло. Возможно, кто-то скажет, что такие данные должны выталкиваться не из базы данных, а откуда-то еще. Безусловно, так оно и есть, но довольно часто у нас попросту нет выбора.
У нас[3] была задача: отправлять события из базы данных вовне для дальнейшей обработки и стоял вопрос — как это сделать?
SQL Server и внешние коммуникации
За время существования SQL Server было несколько попыток организовать коммуникации за пределы базы данных; SQL Server Notification Services (NS), которая появилась в SQL Server 2000, а после, в SQL Server 2005, появился SQL Server Service Broker (SSB). Я описывал их в своей книге A First Look at SQL Server 2005 for Developers[4], вместе с Бобом Бошеменом и Дэном Салливаном. NS появился в SQL Server 2000, как я уже говорил, и был перепроектирован в бета-версии SQL Server 2005. Однако из готовой к продаже (RTM) версии SQL Server 2005 NS был выпилен исключен полностью.
Примечание: Если Вы читали книгу, то найдете там ряд особенностей, которых не было в RTM версии.
SSB выжил, и в пакете дополнительных компонентов SQL Server 2008 Microsoft представила Service Broker External Activator[5] (ЕА). Он дает возможность через SSB осуществлять взаимодействие за пределами локальной базы данных. Теоретически звучит неплохо, но на практике — он громоздок и запутан. Мы провели несколько тестов и быстро поняли, что он не выполняет того, что нам нужно. Кроме того, SSB не дал нам той производительности, которая была необходима, поэтому нам пришлось выдумывать нечто иное.
SQLCLR
То, к чему мы пришли в результате, было основано на технологии SQLCLR. SQLCLR — это платформа .NET, встроенная в ядро SQL Server и с ее помощью можно выполнить .NET код внутри ядра. Поскольку мы выполняем .NET код, то имеем возможность делать почти всё, что и в обычном .NET приложении.
Примечание: выше я написал «почти», потому как на самом деле есть некоторые ограничения. В данном контексте эти ограничения почти не влияют на то, что мы собираемся сделать.
Принцип работы SQLCLR заключается в следующем: код компилируется в dll библиотеку, а затем эта библиотека регистрируется средствами SQL Server:
Создание сборки
CREATE ASSEMBLY [RabbitMQ.SqlServer]
AUTHORIZATION rmq
FROM 'F:some_pathRabbitMQSqlClr4.dll'
WITH PERMISSION_SET = UNSAFE;
GO
Фрагмент кода 1: Создание сборки по абсолютному пути
Код выполняет следующие действия:
CREATE ASSEMBLY — создает сборку с заданным именем (независимо от того, что это должно быть).
AUTHORIZATION — указывает на владельца сборки. В данном случае rmq является заранее определенной ролью SQL Server.
FROM — определяет, где расположена оригинальная сборка. В предложении FROM также можно указывать путь в двоичном или UNC форматах. Установочные файлы для этого проекта используют двоичное представление.
WITH PERMISSION_SET — устанавливает разрешения. UNSAFE является наименее строгим, и необходим в данном случае.
Примечание: независимо от того, роль или имя входа было использовано в предложении AUTHORIZATION, класс appdomain должен быть создан с тем же именем, как и при загрузке сборки в домен. Рекомендуется разделять сборки разными именами appdomain классов, чтобы при падении одной сборки не свалились остальные. Однако если сборки имеют зависимости друг от друга, их нельзя разделить в разные классы.
Когда сборка создана, делаем в ней обертки методов .NET:
CREATE PROCEDURE rmq.pr_clr_PostRabbitMsg @EndpointID int, @Message nvarchar(max)
AS
EXTERNAL NAME [RabbitMQ.SqlServer].[RabbitMQSqlClr.RabbitMQSqlServer].[pr_clr_PostRabbitMsg];
GO
Фрагмент кода 2: обертка метода .NET
Код выполняет следующие действия:
Создает хранимую процедуру Т-SQL с именем rmq.pr_clr_PostRabbitMsg, принимающую два параметра; @EndpointID и @Message.
Вместо тела процедуры используется внешний источник, который состоит из:
Сборка с именем RabbitMQ.SqlServer, т. е. агрегат, который мы создали выше во фрагменте кода 1.
Полный тип (пространство имен и класс): RabbitMQSqlClr.RabbitMQSqlServer
Метод из приведенного выше пространства имен и класса: pr_clr_PostRabbitMsg.
При выполнении процедуры rmq.pr_clr_PostRabbitMsg, будет вызываться метод pr_clr_PostRabbitMsg.
Примечание: при создании процедуры, имя сборки не чувствительно к регистру, в отличие от полного имени типа и метода. Не обязательно чтобы имя создаваемой процедуры совпадало с именем метода. Однако конечные типы данных для параметров должны совпадать.
Как я говорил ранее, у нас в Derivco есть необходимость отправлять данные за пределы SQL Server, поэтому мы используем SQLCLR и RabbitMQ[6] (RMQ).
RabbitMQ
RMQ — это брокер сообщений с открытым исходным кодом, который реализует протокол AMQP (Advanced Message Queuing Protocol) и написан на языке Erlang.
Поскольку RMQ является брокером сообщений, для подключения к нему необходимы клиентские библиотеки AMQP. Приложение ссылается на клиентские библиотеки и с их помощью открывает соединение и отправляет сообщения — как, например, идет обращение через ADO.NET к SQL Server. Но в отличие от ADO.NET, где, скорее всего, соединение открывается каждый раз при обращении к базе данных, здесь соединение остается открытым в течение всего периода работы приложения.
Таким образом, для того, чтобы иметь возможность взаимодействовать из базы данных с RabbitMQ нам нужно приложение и клиентская библиотеки .NET для RabbitMQ.
Примечание: в последующей части данной статьи будут встречаться фрагменты кода RabbitMQ, но без детальных пояснений что они делают. Если вы новичок в работе с RabbitMQ, то предлагаю взглянуть на различные уроки по RabbitMQ[7], чтобы понимать назначение кода. Обучающий урок Hello World[8] по C# — хорошее начало. Одно из отличий между учебниками и примерами кода заключается в том, что в примерах не объявляются обменники. Предполагается, что они предопределены.
RabbitMQ.SqlServer
RabbitMQ.SqlServer — сборка, использующая клиентскую .NET библиотеку для RabbitMQ и предоставляет возможность отправки сообщений из базы данных в одну или несколько конечных точек RabbitMQ (VHosts и обменники). Код можно скачать/ответвить из моего репозитория RabbitMQ-SqlServer[9] на GitHub. Он содержит исходники сборок и установочные файлы (т.е. Вам не придется компилировать их самостоятельно).
Примечание: это просто пример, чтобы показать как SQL Server может взаимодействовать с RabbitMQ. Это НЕ готовый продукт и даже не часть его. Если этот код разрывает вам мозг[10] — не надо на меня пенять, ибо это просто пример.
Функциональность
При загрузке сборки, или при явном вызове ее инициализации, либо при косвенном обращении, в момент вызова процедуры-обертки, сборка загружает строку соединения в локальную базу данных, в которую она была установлена, как и конечные точки RabbitMQ, к которым она подключается:
Подключение
internal bool InternalConnect()
{
try
{
connFactory = new ConnectionFactory();
connFactory.Uri = connString;
connFactory.AutomaticRecoveryEnabled = true;
connFactory.TopologyRecoveryEnabled = true;
RabbitConn = connFactory.CreateConnection();
for (int x = 0; x < channels; x++)
{
var ch = RabbitConn.CreateModel();
rabbitChannels.Push(ch);
}
return true;
}
catch(Exception ex)
{
return false;
}
}
Фрагмент кода 3: подключение к конечной точке
В это же время часть подключения к конечной точкой также создает IModels на соединении, и они используются при отправке (добавлении в очередь) сообщений:
Отправка сообщения
internal bool Post(string exchange, byte[] msg, string topic)
{
IModel value = null;
int channelTryCount = 0;
try
{
while ((!rabbitChannels.TryPop(out value)) && channelTryCount < 100)
{
channelTryCount += 1;
Thread.Sleep(50);
}
if (channelTryCount == 100)
{
var errMsg = $"Channel pool blocked when trying to post message to Exchange: {exchange}.";
throw new ApplicationException(errMsg);
}
value.BasicPublish(exchange, topic, false, null, msg);
rabbitChannels.Push(value);
return true;
}
catch (Exception ex)
{
if (value != null)
{
_rabbitChannels.Push(value);
}
throw;
}
}
Метод Post вызывается из метода pr_clr_PostRabbitMsg(int endPointId, string msgToPost), который был представлен в качестве процедуры с помощью предложения CREATE PROCEDURE во фрагменте кода 2:
Способ вызова Post
public static void pr_clr_PostRabbitMsg(int endPointId, string msgToPost)
{
try
{
if(endPointId == 0)
{
throw new ApplicationException("EndpointId cannot be 0");
}
if (!isInitialised)
{
pr_clr_InitialiseRabbitMq();
}
var msg = Encoding.UTF8.GetBytes(msgToPost);
if (endPointId == -1)
{
foreach (var rep in remoteEndpoints)
{
var exch = rep.Value.Exchange;
var topic = rep.Value.RoutingKey;
foreach (var pub in rabbitPublishers.Values)
{
pub.Post(exch, msg, topic);
}
}
}
else
{
RabbitPublisher pub;
if (rabbitPublishers.TryGetValue(endPointId, out pub))
{
pub.Post(remoteEndpoints[endPointId].Exchange, msg, remoteEndpoints[endPointId].RoutingKey);
}
else
{
throw new ApplicationException($"EndpointId: {endPointId}, does not exist");
}
}
}
catch
{
throw;
}
}
Фрагмент кода 5: Представление метода в виде процедуры
При выполнении метода предполагается, что вызывающий объект отправляет идентификатор конечной точки, в которую необходимо передать сообщение, и, собственно, само сообщение. Если в качестве идентификатора конечной точки передается значение -1, то мы перебираем все точки и отправляем сообщение каждой из них. Сообщение приходит в виде строки, из которой мы получаем байты с помощью Encoding.UTF8.GetBytes. В рабочей среде вызов Encoding.UTF8.GetBytes следует заменить на сериализацию.
Установка
Чтобы установить и запустить пример, нужны все файлы в папке srcSQL. Для установки выполните следующие действия:
Запустите скрипт 01.create_database_and_role.sql. Он создаст:
тестовую базу данных RabbitMQTest, где будет создана сборка.
роль rmq, которая будет назначена в качестве владельца сборки
схему, которая тоже будет назваться rmq. В этой схеме создаются различные объекты базы данных.
Запустите файл 02.create_database_objects.sql. Он создаст:
таблицу rmq.tb_RabbitSetting, в которой будет храниться строка подключения к локальной БД.
таблицу rmq.tb_RabbitEndpoint, в которой будет храниться одна или несколько конечных точек RabbitMQ.
В файле 03.create_localhost_connstring.sql измените значение переменной @connString на правильную строку подключения к базе RabbitMQTest, созданной на шаге 1 и запустите скрипт.
Прежде чем продолжить, необходимо иметь запущенный экземпляр брокера RabbitMQ и VHost (по умолчанию VHost представлен как /). Как правило, у нас есть несколько VHost, просто для изоляции. Этот хост также нуждается в обменнике, в примере мы используем amq.topic. Когда у вас готов брокер RabbitMQ, отредактируйте параметры процедуры rmq.pr_UpsertRabbitEndpoint, которая находится в файле 04.upsert_rabbit_endpoint.sql:
Фрагмент кода 6: создание конечной точки в RabbitMQ
На данном этапе настало время развернуть сборки. В развертываемых вариантах есть отличия для версий SQL Server, предшествующих SQL Server 2014 (2005, 2008, 2008R2, 2012), и для 2014 и выше. Разница заключается в поддерживаемой версии CLR. До SQL Server 2014 платформа .NET выполнялась в среде CLR версии 2, а в SQL Server 2014 и выше используется версия 4.
SQL Server 2005 — 2012
Давайте начнем с версий SQL Server, которые работают на CLR 2, так как там есть свои особенности. Нам нужно развернуть созданную сборку, а вместе с тем развернуть клиентскую .NET библиотеку RabbitMQ (RabbitMQ.Client). Из нашей сборки будем ссылаться на клиентскую библиотеку RabbitMQ. Т.к. мы планировали использовать CLR 2, то наша сборка и RabbitMQ.Client должны быть скомпилирована на основе .NET 3.5. Тут возникают проблемы.
Все последние версии библиотеки RabbitMQ.Client скомпилированы для среды CLR 4, поэтому они не могут использоваться в нашей сборке. Последняя версия клиентских библиотек для CLR 2 собрана на .NET 3.4.3. Но даже если мы попытаемся развернуть эту сборку, то получим сообщение об ошибке:
Рисунок 1: Отсутствует сборка System.ServiceModel
Эта версия RabbitMQ.Client ссылается на сборку, не являющуюся частью CLR SQL Server. Это WCF сборка, и это одно из ограничений в SQLCLR, о которых я говорил выше: эта конкретная сборка предназначена для таких типов задач, которые не допускается выполнять в SQL Server. Последние версии RabbitMQ.Client не имеют этих зависимостей, поэтому могут использоваться без каких-либо проблем, если не считать досадные требования среды CLR 4. Что делать?
Как известно, RabbitMQ имеет открытый исходный код, ну, а мы ведь с вами разработчики, верно? ;) Так давайте же перекомпилим! В варианте до последних релизов (т.е. версии <3.5.0) RabbitMQ.Client я удалил ссылки на System.ServiceModel и перекомпилил. Мне пришлось изменить пару строчек кода, использующих функционал System.ServiceModel, но это были незначительные изменения.
В этом примере я не использовал версию клиента 3.4.3, а взял стабильный релиз 3.6.6[11] и перекомпилировал с использованием .NET 3.5 (CLR 2). Это почти сработало :), за исключением того, что более поздние релизы RabbitMQ.Client используют Task'и, которые изначально не являются частью .NET 3.5.
К счастью, есть версия System.Threading.dll[12] для .NET 3.5, которая включает Task. Я скачал её, настроил ссылки и все поехало! Тут главная фишка в том, что System.Threading.dll должна быть установлена вместе со сборкой.
Примечание: исходник RabbitMQ.Client, из которого я собрал версию на .NET 3.5, есть в моем репозитории на GitHub RabbitMQ Client 3.6.6 .NET 3.5[13]. Бинарник dll вместе с System.Threading.dll для .NET 3.5 также лежит в каталоге libNET3.5 репозитория (RabbitMQ-SqlServer)[9].
Для установки необходимых сборок (System.Threading, RabbitMQ.Client и RabbitMQ.SqlServer) запустите установочные скрипты из каталога srcsql в следующем порядке:
В SQL Server 2014 и более поздних версиях сборка компилируется под .NET 4.ХХ (мой пример на 4.5.2), и вы можете ссылаться на любую из последних версий RabbitMQ.Client, которую можно получить с помощью NuGet[14]. В своем примере я использую 4.1.1. RabbitMQ.Client, которая так же есть в каталоге libNET4репозитория (RabbitMQ-SqlServer)[9].
Для установки запустите скрипты из каталога srcsql в следующем порядке:
Чтобы создать процедуры, которые будут использоваться из нашей сборки (3.5 или 4), запустите скрипт 06.create_sqlclr_procedures.sql. Он создаст Т-SQL процедуры для трех .NET-методов:
rmq.pr_clr_InitialiseRabbitMq вызывает pr_clr_InitialiseRabbitMq. Используется для загрузки и инициализации сборки RabbitMQ.SqlServer.
rmq.pr_clr_ReloadRabbitEndpoints вызывает pr_clr_ReloadRabbitEndpoints. Загружает различные конечные точки RabbitMQ.
rmq.pr_clr_PostRabbitMsg вызывает pr_clr_PostRabbitMsg. Используется для отправки сообщения в RabbitMQ.
Скрипт также создает простую T-SQL процедуру — rmq.pr_PostRabbitMsg, которая применяется к rmq.pr_clr_PostRabbitMsg. Это процедура-обертка, которая знает, что делать с данными, обрабатывает исключения и т.д. В рабочей среде у нас есть несколько подобных процедур, обрабатывающих различные типы сообщений. Подробнее об этом читайте ниже.
Использование
Из всего вышеперечисленного видно, что для отправки сообщений в RabbitMQ мы вызываем rmq.pr_PostRabbitMsg или rmq.pr_clr_PostRabbitMsg, передав в параметрах идентификатор конечной точки и само сообщение в виде строки. Все это, конечно, круто, но хотелось бы видеть как это будет работать в реальности.
Что мы делаем в рабочих средах — в хранимых процедурах, обрабатывающих данные, которые должны быть отправлены в RabbitMQ, мы собираем данные для отправки и в блоке подключения вызываем процедуру, подобную rmq.pr_PostRabbitMsg. Ниже приведен очень упрощенный пример такой процедуры:
Процедура обработки данных
ALTER PROCEDURE dbo.pr_SomeProcessingStuff @id int
AS
BEGIN
SET NOCOUNT ON;
BEGIN TRY
--создаем переменную для конечной точки
DECLARE @endPointId int;
--создаем переменную для сообщения
DECLARE @msg nvarchar(max) = '{'
--выполняем необходимые действия и собираем данные для сообщения
SET @msg = @msg + '"Id":' + CAST(@id AS varchar(10)) + ','
-- делаем что-то еще
SET @msg = @msg + '"FName":"Hello",';
SET @msg = @msg + '"LName":"World"';
SET @msg = @msg + '}';
--снова что-то делаем
-- получаем идентификатор конечной точки откуда-то, по каким-то условиям
SELECT @endPointId = 1;
--здесь начинается блок подключения
--вызываем процедуру для отправки сообщения
EXEC rmq.pr_PostRabbitMsg @Message = @msg, @EndpointID = @endPointId;
END TRY
BEGIN CATCH
DECLARE @errMsg nvarchar(max);
DECLARE @errLine int;
SELECT @errMsg = ERROR_MESSAGE(), @errLine = ERROR_LINE();
RAISERROR('Error: %s at line: %d', 16, -1, @errMsg, @errLine);
END CATCH
END
Во фрагменте кода 7 мы видим, как в процедуре захватываются и обрабатываются нужные данные и после обработки отправляются. Чтобы использовать эту процедуру, выполните скрипт 07.create_processing_procedure.sql из каталога srcSQL.
Давайте все это запустим
На этом этапе вы должны быть готовы отправить несколько сообщений. Перед тестированием убедитесь, что у вас в RabbitMQ есть очереди, привязанные к обменнику конечной точки в rmq.tb_RabbitEndpoint.
Итак, для запуска нужно проделать следующее:
Откройте файл 99.test_send_message.sql.
Выполните
EXEC rmq.pr_clr_InitialiseRabbitMq;
чтобы инициализировать сборку и загрузить конечные точки RabbitMQ. Это не обязательное действие, но рекомендуется предварительно загрузить сборку, после ее создания или изменения.
Выполните
EXEC dbo.pr_SomeProcessingStuff @id = 101
(можно использовать любой другой идентификатор, какой нравится).
Если все отработало без ошибок, то в очереди RabbitMQ должно появиться сообщение! Вот Вы и воспользовались SQLCLR для отправки сообщения в RabbitMQ.
[4] A First Look at SQL Server 2005 for Developers: https://www.amazon.com/First-Look-Server-2005-Developers/dp/0321180593/ref=sr_1_1?ie=UTF8&qid=1486701050&sr=8-1&keywords=A+First+Look+at+SQL+Server+2005+for+Developers
[5] Service Broker External Activator: https://blogs.msdn.microsoft.com/sql_service_broker/2008/11/21/announcing-service-broker-external-activator/
[6] RabbitMQ: http://www.rabbitmq.com/
[7] уроки по RabbitMQ: http://www.rabbitmq.com/getstarted.html