- PVSM.RU - https://www.pvsm.ru -
На прошлой неделе мы отметили одну круглую дату — в базе данных Likeastore [1] скопилось, ни много, ни мало — один миллион пользовательских «лайков».
Мы используем JavaScript, все текущие сервисы написаны на JavaScript/Node.js. В общем и целом, я не жалею о использовании Node.js в нашем проекте, он отлично зарекомендовал себя как лучшее средство реализвации HTTP API. Но для сбора «лайков», это должен быть daemon
, который работает постоянно. Наверно, не самая типичная задача для Node.js — про специфику реализации и некоторые подводные камни, читаем далее.
Коллектор (collector), ключевая компонента архитектуры Likeastore. Он собирает данные, через открытые API сервисов, обрабатывает ответы, сохраняет данные и свое текущее состояние. По сути, это «бесконечный цикл», который строит список выполняемых задач, запускает их, после этого процедура повторяется.
Для максимальной эффективности работы, мы запускаем 2 экземпляра коллектора, работающего в разных режимах: initial, normal. В initial режиме, коллектор только обрабатывает только что подключенные сети. Тем самым, пользователь максимально быстро получает «лайки», после подключения сети. После того, как все «лайки» были выгружены, сеть переходит в normal mode, обрабатывется уже другим инстансом, где время между сборами значительно выше.
var argv = require('optimist').argv;
var env = process.env.NODE_ENV = process.env.NODE_ENV || 'development';
var mode = process.env.COLLECTOR_MODE = process.env.COLLECTOR_MODE || argv.mode || 'normal';
var scheduler = require('./source/engine/scheduler');
scheduler(mode).run();
Планировщик, по сути является тем самым циклом while(true)
, написанным для Node.js. Признаюсь честно, переключение
Одним из вариантов, было использование async.queue [4], которое казалось очевидным, но не лучшим для этой задачи. После нескольких попыткок, самым лучшим вариантом ассинхронного while(true) оказался setTimeout.
function scheduler (mode) {
function schedulerLoop() {
runCollectingTasks(restartScheduler);
}
function restartScheduler (err, results) {
if (err) {
logger.error(err);
}
// http://stackoverflow.com/questions/16072699/nodejs-settimeout-memory-leak
var timeout = setTimeout(schedulerLoop, config.collector.schedulerRestart);
}
// ...
return {
run: function () {
schedulerLoop();
}
};
}
Тут следует отметить тот самый подводный камень daemon'ов на Node.js — утечки памяти. Я заметил, что после продолжительной работы collector'а он начинал потреблять большое кол-во памяти в и самый неожиданный момент просто останавливался. Обратите внимание на комментарий в коде с вопросом на SO [5]. После того как я добавил var timeout =
, ситуация улучшилась, но не радикально.
Другая причина открылась после эпичного поста [6] об утечках памяти и расследнования инженеров Joyent и Wallmart. С переходом на Node.js v0.10.22 коллектор стал работать еще стабильней, но гораздо меньшей переодичностью он останавливается, причину понять крайне тяжело.
Когда пользователь подключает новую сеть, мы кладем в коллекцию networks
документ, который содержит: идентификатор пользователя, токен доступа и прочую служебную информацию. В этот же документ, коллектор денормализует свое текущее состояние (в каком режиме он работает, были ли ошибки, сколько их, какая текущая страница данных обрабатывается). Т.е. по сути, это один и тотже документ, на основе которого создается исполняемая задача.
function runCollectingTasks(callback) {
prepareCollectingTasks(function (err, tasks) {
if (err) {
return callback(err);
}
runTasks(tasks, 'collecting', callback);
});
}
function prepareCollectingTasks(callback) {
networks.findByMode(mode, function (err, states) {
if (err) {
return callback({message: 'error during networks query', err: err});
}
if (!states) {
return callback({message: 'failed to read networks states'});
}
callback(null, createCollectingTasks(states));
});
}
На основе состояния, мы создаем список исполняемых задач. Почти все открытые API популярных сервисов имеют ограничения по количеству реквестов на период времени. Задача коллектора сводится к тому, чтобы выполнить наиболее эффективное число запросов и не уйти в rate-limit.
К запуску разрешены только те задачи, которые были запланированы после текущего момента времени.
function createCollectingTasks(states) {
var tasks = states.map(function (state) {
return allowedToExecute(state) ? collectingTask(state) : null;
}).filter(function (task) {
return task !== null;
});
return tasks;
}
function allowedToExecute (state) {
return moment().diff(state.scheduledTo) > 0;
}
function collectingTask(state) {
return function (callback) { return executor(state, connectors, callback); };
}
Массив данных, преобразуется в массив функций, которые идут на вход runTasks
. runTasks
последовательно выполняет каждую задачу, через async.series
.
function runTasks(tasks, type, callback) {
async.series(tasks, function (err) {
if (err) {
// report error but continue execution to do not break execution chain..
logger.error(err);
}
callback(null, tasks.length);
});
}
Обобщенная функция, которая принимает текущее состояние, список существующих коннекторов и фунцию обратного вызова (я поубирал всякую обработку ошибок и логгирование, чтобы показать ее суть).
function executor(state, connectors, callback) {
var service = state.service;
var connector = connectors[service];
var connectorStarted = moment();
connector(state, user, connectorExecuted);
function connectorExecuted(err, updatedState, results) {
saveConnectorState(state, connectorStateSaved);
function saveConnectorState (state, callback) {
// ...
}
function connectorStateSaved (err) {
// ...
saveConnectorResults(results, connectorResultsSaved);
}
function saveConnectorResults(results, callback) {
// ...
connectorResultsSaved(results, connectorResultsSaved);
}
function connectorResultsSaved (err, saveDuration) {
// ...
callback(null);
}
}
}
Коннектор (connector), это функция, которая осуществяет HTTP реквест к АПИ, обрабатывает его ответ, обновляет состояние задачи (планируемый следующий запуск) и возвращает собранные данные. Именно коннектор, различает в каком состоянии находится сбор «лайков», в зависимости от этого делает нужный реквест (строит нужный request URI).
На данный момент, у нас реализованно 9 коннекторов, код который более менее схожий. Первоначально я всегда искал готовые библиотеки доступа к API, но это оказалась неверная стратегия: надо выбирать из нескольких вариантов, они не работают либо имеют не удобный интерфейс, отстают от текущей версии API и т.д. Самым гибким и простым решением оказалось использование request [7] (лучший HTTP клиент для Node.js).
Приведу пример кода для GitHub (опять же сокращу детали, чтобы показать саму суть).
var API = 'https://api.github.com';
function connector(state, user, callback) {
var accessToken = state.accessToken;
if (!accessToken) {
return callback('missing accessToken for user: ' + state.user);
}
initState(state);
var uri = formatRequestUri(accessToken, state);
var headers = { 'Content-Type': 'application/json', 'User-Agent': 'likeastore/collector'};
request({uri: uri, headers: headers, timeout: config.collector.request.timeout, json: true}, function (err, response, body) {
if (err) {
return handleUnexpected(response, body, state, err, function (err) {
callback (err, state);
});
}
return handleResponse(response, body);
});
function initState(state) {
// intialize state fields (page, errors, mode etc.) ...
}
function formatRequestUri(accessToken, state) {
// create request URI based on values from state ...
}
function handleResponse(response, body) {
var rateLimit = +response.headers['x-ratelimit-remaining'];
var stars = body.map(function (r) {
return {
// transforms response into likeastore item
};
});
return callback(null, scheduleTo(updateState(state, stars, rateLimit, false)), stars);
}
function updateState(state, data, rateLimit, failed) {
state.lastExecution = moment().toDate();
// update other state fields (next page, mode) ...
return state;
}
}
Наконец, когда коннектор выполнился и состояние обновленно, нужно расчитать следующий момент запуска. Он расчитывается на основании ограничений API и режима работы коллектора (для initial mode пауза минимальна, для normal mode больше, как правило 15 минут, если коннектор уходи в rate limit то пауза максимальная).
function scheduleTo(state) {
var currentMoment = moment();
var service = state.service;
var scheduleForMode = {
initial: config.collector.quotes[service].runAfter,
normal: config.collector.nextNormalRunAfter,
rateLimit: config.collector.nextRateLimitRunAfter
};
var next = scheduleForMode[state.mode];
var scheduledTo = currentMoment.add(next, 'milliseconds');
state.scheduledTo = scheduledTo.toDate();
return state;
}
Вот такой вот незамысловатый код, который «устоялся» примерно в сентябре прошлого года и все что мы добавляем с тех времен, это новые коннекторы, сам движок остается не изменным. Я задумываюсь о том, чтобы выделить отдельную библиотеку, для запуска очередей задач в Node.js, но не уверен на сколько это обобщенная задача.
Время идет, количество пользователей растет и на данный момент момент обработка 3 тысяч задач занимает около 30 минут, что довольно долго (стараемся держать время цикла не более 15 минут). Думаю, что в будущем архитектура коллектора изменится в сторону очередей сообщений и разделения коллекоторов не по режиму работу, а по другому признаку (тип сети, кластер пользователей) для более легкой горизонтальной маштабируемости.
Автор: alexbeletsky
Источник [8]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/news/56463
Ссылки в тексте:
[1] Likeastore: https://likeastore.com
[2] мышления: http://www.braintools.ru
[3] вопрос на SO: http://stackoverflow.com/questions/15886096/infinite-execution-of-tasks-for-nodejs-application
[4] async.queue: https://github.com/caolan/async#queue
[5] вопросом на SO: http://stackoverflow.com/questions/16072699/nodejs-settimeout-memory-leak
[6] эпичного поста: http://www.joyent.com/blog/walmart-node-js-memory-leak
[7] request: https://github.com/mikeal/request
[8] Источник: http://habrahabr.ru/post/214781/
Нажмите здесь для печати.