- PVSM.RU - https://www.pvsm.ru -

Собрать миллион «лайков» или очереди задач в Node.js

На прошлой неделе мы отметили одну круглую дату — в базе данных Likeastore [1] скопилось, ни много, ни мало — один миллион пользовательских «лайков».

Мы используем JavaScript, все текущие сервисы написаны на JavaScript/Node.js. В общем и целом, я не жалею о использовании Node.js в нашем проекте, он отлично зарекомендовал себя как лучшее средство реализвации HTTP API. Но для сбора «лайков», это должен быть daemon, который работает постоянно. Наверно, не самая типичная задача для Node.js — про специфику реализации и некоторые подводные камни, читаем далее.

Сollector

Коллектор (collector), ключевая компонента архитектуры Likeastore. Он собирает данные, через открытые API сервисов, обрабатывает ответы, сохраняет данные и свое текущее состояние. По сути, это «бесконечный цикл», который строит список выполняемых задач, запускает их, после этого процедура повторяется.

Для максимальной эффективности работы, мы запускаем 2 экземпляра коллектора, работающего в разных режимах: initial, normal. В initial режиме, коллектор только обрабатывает только что подключенные сети. Тем самым, пользователь максимально быстро получает «лайки», после подключения сети. После того, как все «лайки» были выгружены, сеть переходит в normal mode, обрабатывется уже другим инстансом, где время между сборами значительно выше.

var argv = require('optimist').argv;

var env = process.env.NODE_ENV = process.env.NODE_ENV || 'development';
var mode = process.env.COLLECTOR_MODE = process.env.COLLECTOR_MODE || argv.mode || 'normal';

var scheduler = require('./source/engine/scheduler');
scheduler(mode).run();

Scheduler

Планировщик, по сути является тем самым циклом while(true), написанным для Node.js. Признаюсь честно, переключение мышления [2] с «синхронного» в «асинхронный» режим, был не самым простым процессом для меня. Запуск бесконечного числа задач в Node.js казался не простой задачей, в результате раздумий родился этот вопрос на SO [3].

Одним из вариантов, было использование async.queue [4], которое казалось очевидным, но не лучшим для этой задачи. После нескольких попыткок, самым лучшим вариантом ассинхронного while(true) оказался setTimeout.

function scheduler (mode) {
	function schedulerLoop() {
		runCollectingTasks(restartScheduler);
	}

	function restartScheduler (err, results) {
		if (err) {
			logger.error(err);
		}

		// http://stackoverflow.com/questions/16072699/nodejs-settimeout-memory-leak
		var timeout = setTimeout(schedulerLoop, config.collector.schedulerRestart);
	}

	// ...

	return {
		run: function () {
			schedulerLoop();
		}
	};
}

Тут следует отметить тот самый подводный камень daemon'ов на Node.js — утечки памяти. Я заметил, что после продолжительной работы collector'а он начинал потреблять большое кол-во памяти в и самый неожиданный момент просто останавливался. Обратите внимание на комментарий в коде с вопросом на SO [5]. После того как я добавил var timeout =, ситуация улучшилась, но не радикально.

Другая причина открылась после эпичного поста [6] об утечках памяти и расследнования инженеров Joyent и Wallmart. С переходом на Node.js v0.10.22 коллектор стал работать еще стабильней, но гораздо меньшей переодичностью он останавливается, причину понять крайне тяжело.

Networks and states

Когда пользователь подключает новую сеть, мы кладем в коллекцию networks документ, который содержит: идентификатор пользователя, токен доступа и прочую служебную информацию. В этот же документ, коллектор денормализует свое текущее состояние (в каком режиме он работает, были ли ошибки, сколько их, какая текущая страница данных обрабатывается). Т.е. по сути, это один и тотже документ, на основе которого создается исполняемая задача.

function runCollectingTasks(callback) {
	prepareCollectingTasks(function (err, tasks) {
		if (err) {
			return callback(err);
		}

		runTasks(tasks, 'collecting', callback);
	});
}

function prepareCollectingTasks(callback) {
	networks.findByMode(mode, function (err, states) {
		if (err) {
			return callback({message: 'error during networks query', err: err});
		}

		if (!states) {
			return callback({message: 'failed to read networks states'});
		}

		callback(null, createCollectingTasks(states));
	});
}

Tasks

На основе состояния, мы создаем список исполняемых задач. Почти все открытые API популярных сервисов имеют ограничения по количеству реквестов на период времени. Задача коллектора сводится к тому, чтобы выполнить наиболее эффективное число запросов и не уйти в rate-limit.

К запуску разрешены только те задачи, которые были запланированы после текущего момента времени.

function createCollectingTasks(states) {
	var tasks = states.map(function (state) {
		return allowedToExecute(state) ? collectingTask(state) : null;
	}).filter(function (task) {
		return task !== null;
	});

	return tasks;
}

function allowedToExecute (state) {
	return moment().diff(state.scheduledTo) > 0;
}

function collectingTask(state) {
	return function (callback) { return executor(state, connectors, callback); };
}

Массив данных, преобразуется в массив функций, которые идут на вход runTasks. runTasks последовательно выполняет каждую задачу, через async.series.

function runTasks(tasks, type, callback) {
	async.series(tasks, function (err) {
		if (err) {
			// report error but continue execution to do not break execution chain..
			logger.error(err);
		}

		callback(null, tasks.length);
	});
}

Executor

Обобщенная функция, которая принимает текущее состояние, список существующих коннекторов и фунцию обратного вызова (я поубирал всякую обработку ошибок и логгирование, чтобы показать ее суть).

function executor(state, connectors, callback) {
	var service = state.service;
	var connector = connectors[service];
	var connectorStarted = moment();

	connector(state, user, connectorExecuted);

	function connectorExecuted(err, updatedState, results) {
		saveConnectorState(state, connectorStateSaved);

		function saveConnectorState (state, callback) {
			// ...
		}

		function connectorStateSaved (err) {
			// ...

			saveConnectorResults(results, connectorResultsSaved);
		}

		function saveConnectorResults(results, callback) {
			// ...

			connectorResultsSaved(results, connectorResultsSaved);
		}

		function connectorResultsSaved (err, saveDuration) {
			// ...

			callback(null);
		}
	}
}

Connectors

Коннектор (connector), это функция, которая осуществяет HTTP реквест к АПИ, обрабатывает его ответ, обновляет состояние задачи (планируемый следующий запуск) и возвращает собранные данные. Именно коннектор, различает в каком состоянии находится сбор «лайков», в зависимости от этого делает нужный реквест (строит нужный request URI).

На данный момент, у нас реализованно 9 коннекторов, код который более менее схожий. Первоначально я всегда искал готовые библиотеки доступа к API, но это оказалась неверная стратегия: надо выбирать из нескольких вариантов, они не работают либо имеют не удобный интерфейс, отстают от текущей версии API и т.д. Самым гибким и простым решением оказалось использование request [7] (лучший HTTP клиент для Node.js).

Приведу пример кода для GitHub (опять же сокращу детали, чтобы показать саму суть).

var API = 'https://api.github.com';

function connector(state, user, callback) {
	var accessToken = state.accessToken;

	if (!accessToken) {
		return callback('missing accessToken for user: ' + state.user);
	}

	initState(state);

	var uri = formatRequestUri(accessToken, state);
	var headers = { 'Content-Type': 'application/json', 'User-Agent': 'likeastore/collector'};

	request({uri: uri, headers: headers, timeout: config.collector.request.timeout, json: true}, function (err, response, body) {
		if (err) {
			return handleUnexpected(response, body, state, err, function (err) {
				callback (err, state);
			});
		}

		return handleResponse(response, body);
	});

	function initState(state) {
		// intialize state fields (page, errors, mode etc.) ...
	}

	function formatRequestUri(accessToken, state) {
		// create request URI based on values from state ...
	}

	function handleResponse(response, body) {
		var rateLimit = +response.headers['x-ratelimit-remaining'];

		var stars = body.map(function (r) {
			return {
				// transforms response into likeastore item
			};
		});

		return callback(null, scheduleTo(updateState(state, stars, rateLimit, false)), stars);
	}

	function updateState(state, data, rateLimit, failed) {
		state.lastExecution = moment().toDate();

		// update other state fields (next page, mode) ...

		return state;
	}
}

scheduleTo

Наконец, когда коннектор выполнился и состояние обновленно, нужно расчитать следующий момент запуска. Он расчитывается на основании ограничений API и режима работы коллектора (для initial mode пауза минимальна, для normal mode больше, как правило 15 минут, если коннектор уходи в rate limit то пауза максимальная).

function scheduleTo(state) {
	var currentMoment = moment();
	var service = state.service;

	var scheduleForMode = {
		initial: config.collector.quotes[service].runAfter,
		normal: config.collector.nextNormalRunAfter,
		rateLimit: config.collector.nextRateLimitRunAfter
	};

	var next = scheduleForMode[state.mode];
	var scheduledTo = currentMoment.add(next, 'milliseconds');

	state.scheduledTo = scheduledTo.toDate();

	return state;
}

Вот такой вот незамысловатый код, который «устоялся» примерно в сентябре прошлого года и все что мы добавляем с тех времен, это новые коннекторы, сам движок остается не изменным. Я задумываюсь о том, чтобы выделить отдельную библиотеку, для запуска очередей задач в Node.js, но не уверен на сколько это обобщенная задача.

Время идет, количество пользователей растет и на данный момент момент обработка 3 тысяч задач занимает около 30 минут, что довольно долго (стараемся держать время цикла не более 15 минут). Думаю, что в будущем архитектура коллектора изменится в сторону очередей сообщений и разделения коллекоторов не по режиму работу, а по другому признаку (тип сети, кластер пользователей) для более легкой горизонтальной маштабируемости.

Автор: alexbeletsky

Источник [8]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/news/56463

Ссылки в тексте:

[1] Likeastore: https://likeastore.com

[2] мышления: http://www.braintools.ru

[3] вопрос на SO: http://stackoverflow.com/questions/15886096/infinite-execution-of-tasks-for-nodejs-application

[4] async.queue: https://github.com/caolan/async#queue

[5] вопросом на SO: http://stackoverflow.com/questions/16072699/nodejs-settimeout-memory-leak

[6] эпичного поста: http://www.joyent.com/blog/walmart-node-js-memory-leak

[7] request: https://github.com/mikeal/request

[8] Источник: http://habrahabr.ru/post/214781/