- PVSM.RU - https://www.pvsm.ru -
Я попросил нашу команду маркетинга нарисовать иллюстрацию и долго объяснял, что такое вебхуки
Не так давно передо мной встала задача реализовать работу вебхуков в Личном кабинете владельца кассы компании Дримкас. Как оказалось, в сети практически нет описания и туториалов, как это сделать. Я расскажу, как мы это реализовали без тяжелых кронов по БД.
Статья будет полезна для middle node.js-разработчиков.
Для понимания специфики, придется начать издалека.
Дримкас производит онлайн-кассы и облачный сервис Кабинет Дримкас. Все кассовые аппараты в реальном времени отправляют данные о продажах по интернету в налоговую — это требование нового закона. Подключаясь к Кабинету, владелец кассы получает удаленный доступ к этой статистике продаж и другие инструменты.
Кабинет Дримкас позволяет владельцу кассы из веб-интерфейса следить за продажами, работать с отчетами, создавать, редактировать и автоматически загружать на все кассы товарную базу, подключать внешние товароучетные системы.
Вебхуки нам понадобились, когда мы подключали к Кабинету интернет-магазины. Для онлайн-торговли тоже нужна касса, только бумажный чек не печатается. Мы решили создать для них инструмент, чтобы они могли из обычного json-а с данными о покупке записать данные о продаже в ФН и передать их в ОФД.
Так как операция фискализации может затянуться на длительное время, превышающее обычный HTTP запрос, нам потребовалось дать возможность узнавать статус этого чека. Каждый раз стучаться в Кабинет за статусом чека не выгодно ни нам, ни Интернет магазину. А с вебхуками убиваем сразу двух зайцев: Кабинет делает запрос только один раз, а интернет-магазин получит чек, как только он будет готов.
Когда мы начали их внедрять, решили дать доступ к этому функционалу сервисам интеграторов. С их помощью сторонние сервисы, которые подключены к Кабинету, получают уведомления о продажах, открытии/закрытии смен, создании и редактировании товаров, внесении и изъятии денег. Мы не остановились до сих пор, и все важные события мы сразу переводим на вебхуки.
Мы пишем на node.js. В качестве веб фреймворка выбран koa. У нас две базы данных. Postrges с sequelize, где хранятся сильно связанные данные, например, кассы и пользователи. Для хранения несвязанных и неизменяемых данных — чеков, смен — мы используем MongoDB. Ещё повсеместно используются очереди на rabbitMQ, для сглаживания скачкообразных нагрузок. Плюс redis для кэша.
Для начала определим места, где хотим вызывать вебхуки. На уровне модели мы можем пользоваться хуками в mongoose и в большинстве случаев в sequelize.
Исторически так сложилось, что в нашей sequelize-модели нельзя создать товар сразу с данными. Мы создаем пустой товар и сразу его изменяем, поэтому пришлось руками во всех контроллерах добавить обработчики вызовов вебхуков.
Когда нет такой проблемы, всё достаточно просто. Пример из модели mongoose:
schema.static('setStatus', async function (_id, status, data) {
// логика изменения статуса
const res = await this.update({ _id }, { … });
await Webhook.send({ ... });
return res;
});
Чтобы определить понятие подписки на определенные события, мы используем битовые маски.
В бэкэнде мы храним всю информацию о типах событий одним числом, а фронту отправляем готовый json объект:
{
"types": {
"products": true,
"receipts": false,
"shifts": true,
"encashments": false,
"devices": false,
"operations": true
},
}
Чтобы упаковать число в json и извлечь его обратно, мы создаем виртуальные атрибуты в sequelize. В них устанавливаем геттеры и сеттеры. Виртуальные поля вычисляются на лету, изменяют на поля в таблице, но при этом не хранятся БД.
// Статические методы, которые хранятся в отдельном файле
import _ from 'lodash';
export const scopeBits = {
products: 0,
receipts: 1,
shifts: 2,
encashments: 3,
devices: 4,
operations: 5,
};
/**
* Этот маппинг появился, потому что мы захотели,
* чтобы по вебхуками прилетало название модели
* и тип операции в UPPER CASE и в единственном числе.
*/
/* eslint-disable key-spacing */
const typeToTypes = {
PRODUCT: { products: true },
RECEIPT: { receipts: true },
SHIFT: { shifts: true },
ENCASHMENT: { encashments: true },
DEVICE: { devices: true },
OPERATION: { operations: true },
};
/* eslint-enable key-spacing */
export function formMask(scope) {
if (_.isEmpty(scope)) {
return 0;
}
return _.reduce(Object.keys(scope), (mask, key) => {
if (scope[key]) {
mask |= 1 << scopeBits[key];
}
return mask;
}, 0);
}
export function formEvents(mask) {
return _.reduce(scopeBits, (memo, bit, scope) => {
if (mask & (1 << bit)) {
memo[scope] = true;
} else {
memo[scope] = false;
}
return memo;
}, {});
}
// В описании модели:
subscribes: {
type: DataTypes.INTEGER,
allowNull: false,
},
types: {
type: DataTypes.VIRTUAL(DataTypes.INTEGER, ['subscribes']),
get() {
return this.constructor.formEvents(this.get('subscribes'));
},
set(types) {
this.setDataValue('subscribes', this.constructor.formMask(types));
},
},
Пользователь управляет вебхуками из веб-интерфейса или через API. Поэтому нам нужны стандартные CRUD для этой модели.
import _ from 'lodash';
const editCols = ['url', 'types', 'isActive'];
export async function create(ctx) {
const fields = _.pick(ctx.request.body.fields, editCols);
fields.userId = ctx.state.user.id;
const webhook = await Webhook.create(fields);
ctx.body = { id: webhook.id };
ctx.status = 201;
}
Мы не вызываем вебхуки в статическом методе класса Webhook — это позволяет сберечь ресурсы основного сайта. Это работа воркеров — делать фоновые задачи, не мешая работе с REST-API.
Когда на сайте генерируется событие, мы оповещаем воркеров об этом:
import _ from 'lodash';
import { getClient } from '../../storage/redis';
import { stringify, getChannel } from '../../storage/rabbitmq';
/**
* получаем вебхуки, которых нужно вызвать для этого события у этого юзера
* types: { products: true, devices: false, ...}
*/
async function search({ userId, types }) {
const mask = formMask(types);
/**
* Ищем флаг нужного события через битовую операцию "и"
* и проверяем результат с самим числом, таким образом заложились на будущее,
* если вдруг захотим искать сразу по двум флагам
*/
return Webhook.sequelize.query(`SELECT id, url, subscribes
FROM "Webhook" WHERE subscribes & ? = ?
AND "userId" = ?
AND "isActive" = TRUE`,
{
type: Webhook.sequelize.QueryTypes.SELECT,
replacements: [mask, mask, userId],
},
);
}
/**
* Я не нашел в документации, как сделать этот запрос средствами sequelize
* Поэтому здесь использован сырой SQL-запрос
*/
/**
* Вставка в очередь задания «Дернуть вебхук»
* type=PRODUCT|DEVICE|ENCASHMENT|RECEIPT|OPERATION|...
* action=CREATE|UPDATE|DELETE
*/
export async function send({ userId, type, action, itemId }) {
// поиск по Redis
const client = getClient();
const key = `webhooks:${userId}:${type}`;
const isWebhooksExist = await client.existsAsync(key);
let webhooks;
if (!isWebhooksExist) {
// поиск в Postgres
const types = typeToTypes[type];
webhooks = await search({ userId, types });
// Кэшируем в Redis, даже если не нашли
await client.setAsync(key, JSON.stringify(webhooks), 'EX', 10);
} else {
webhooks = JSON.parse(await client.getAsync(key));
}
_.each(webhooks, (w) => {
const payload = stringify({
url: w.url,
itemId,
action,
type,
timestamp: Date.now(),
});
/**
* Ставим задачу в очередь. Устанавливаем время, какой URL вызвать и свойства:
* тип сущности (товар, чек, касса, ...), тип операции (создание, удаление, ..)
* и id сущности
*/
getChannel().sendToQueue('kab-webhooks-delayed-0', payload, { persistent: true });
});
}
Вкратце, что мы делаем: ищем в БД все вебхуки у данного пользователя, у которого есть подписка на текущее событие. Кэшируем их, даже если ничего не нашли — если пользователь загружает кучу товаров, будут лишние запросы в БД. Когда есть вебхук, кидаем в очередь задачу с временной меткой, ссылкой, идентификатором и типом события.
Тут есть нюанс: мы экономим ресурсы сайта, и кидаем в очередь только идентификатор объекта. Если есть возможность, лучше кидать сам объект. Когда создают объект и сразу удаляют его, в очередь падает два задания. Первое задание при выполнении не сможет вытащить из базы тело объекта. Если кидать всё тело объекта, таких проблем не будет.
У нас в стеке используется очереди сообщений. Мы выбрали 5 временных промежутков, и на каждый создали очередь. Если вызов не удался при первой попытке, вебхук переходит в следующую очередь. Когда воркер получает на вход задачу, он откладывает его выполнение на требуемое количество времени от 0 миллисекунд до суток. Через 24 часа мы вызываем вебхук в последний раз и удаляем.
Пример вебхука, который не могут принять в течение суток.
Каждая следующая задача в очереди не может быть вызвана раньше текущей, так как добавилась туда позже. Поэтому когда мы взяли из очереди задачу и увидели, что вызывать вебхук ещё рано, мы не завершаем эту задачу, чтобы не получить следующую.
import Bluebird from 'bluebird';
import request from 'request';
import { parse, getChannel, stringify } from '../../lib/storage/rabbitmq';
const requestPostAsync = Bluebird.promisify(request.post);
const times = {
0: 0,
'5sec': 5 * 1000,
'1min': 1 * 60 * 1000,
'1hour': 1 * 60 * 60 * 1000,
'3hours': 3 * 60 * 60 * 1000,
'1day': 24 * 60 * 60 * 1000,
};
const getBodyById = async ({ itemId, type, action }) => {
/** Достаем из БД актуальное состояние сущности */
};
const handle = async (channel, msg, waitENUM, nextQueue) => {
const task = parse(msg);
const { url, itemId, type, action, timestamp } = task;
const data = await getBodyById({ itemId, type, action });
const estimatedTime = Date.now() - (new Date(timestamp).getTime());
const wait = times[waitENUM];
if (estimatedTime < wait) {
await Bluebird.delay(wait - estimatedTime);
}
try {
const response = await requestPostAsync(url, {
body: {
action,
type,
data,
},
headers: {
'content-type': 'application/json',
},
json: true,
timeout: 20 * 1000,
});
if (response.statusCode < 200
|| response.statusCode >= 300) {
throw new Error();
}
channel.ack(msg);
} catch (err) {
if (nextQueue) {
getChannel().sendToQueue(nextQueue, stringify(task));
}
channel.nack(msg, false, false);
}
};
/* eslint-disable no-multi-spaces */
export default function startConsume(channel) {
channel.prefetch(2);
channel.consume('kab-webhooks-delayed-0', msg => handle(channel, msg, 0,
'kab-webhooks-delayed-1'), { noAck: false });
channel.consume('kab-webhooks-delayed-1', msg => handle(channel, msg, '5sec',
'kab-webhooks-delayed-2'), { noAck: false });
channel.consume('kab-webhooks-delayed-2', msg => handle(channel, msg, '1min',
'kab-webhooks-delayed-3'), { noAck: false });
channel.consume('kab-webhooks-delayed-3', msg => handle(channel, msg, '1hour',
'kab-webhooks-delayed-4'), { noAck: false });
channel.consume('kab-webhooks-delayed-4', msg => handle(channel, msg, '3hour',
'kab-webhooks-delayed-5'), { noAck: false });
channel.consume('kab-webhooks-delayed-5', msg => handle(channel, msg, '1day',
''), { noAck: false });}
/* eslint-enable no-multi-spaces */
Автор: ilnuribat
Источник [1]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/postgresql/265818
Ссылки в тексте:
[1] Источник: https://habrahabr.ru/post/332798/
Нажмите здесь для печати.