Conveyor — поэлементная обработка данных с временной задержкой

в 10:55, , рубрики: conveyor, io.js, javascript, node.js

Что и зачем

Понадобилось мне однажды спарсить с одного сайта информацию. Взял я в руки Node.js и взялся за дело.
Сайт состоял из разделов, каждый раздел состоял из страниц. Для обработки одного раздела приходилось делать много запросов, по количеству страниц.

В тот момент пришлось столкнуться с ограничениями: сайт стал выдавал ошибку при слишком частых запросах (чаще нескольких запросов в секунду). Ну, не проблема, подумал я и решил это известным способом, сделав эдакий «асинхронный цикл». То есть в конце обработки одной страницы стал запускать таймер для обработки следующей.

Далее я вспомнил, что надо парсить разные разделы этого сайта и понял, что становится уже слишком неудобно. Поэтому сделал инструмент Conveyor, умеющий обрабатывать некие «элементы данных» (т.е. применять функцию-обработчик к заданных объектам) с временной задержкой между обработкой. Удобным это оказалось и для «тяжёлых» вычислений, которые могут долго выполняться в цикле.

Код Conveyor лежит на гитхабе, поставить можно через npm (называется dataconveyor). Более структурированная справка также на гитхабе. Использовать его можно как угодно и где угодно, без ограничений.

Ниже описание инструмента Conveyor.

Как использовать

Для начала следует создать экземпляр объекта Conveyor, задав ему обработчик данных:

var conveyor = new Conveyor(function(element) {
    console.log(element);
}, {
    period: 100
});

Здесь мы создаём объект, который будет записывать данные в консоль с интервалом 100 мс. После инициализации следует задать данные:

conveyor.add(12);
conveyor.add("Ahoj, Habr!");
conveyor.add([firstElement, secondElement]);

Следует отметить, что в случае с массивом элементы firstElement и secondElement будут обрабатываться по отдельности, а не массив целиком. Новые данные можно добавлять в процессе обработки данных, т.е. conveyor.add() можно использовать и внутри обработчика, установленного в конструкторе.

Итак, когда мы добавили данные для обработки (они, кстати, начинают обрабатываться сразу после добавления), мы можем задать функцию, которая будет вызвана после запуска обработчиков всех событий и выжидания интервала:

conveyor.whenStop(function() {
    console.log('Done.');
});

Вот таким нехитрым образом мы можем запускать обработку данных с нужной нам частотой. Это решило задачу с загрузкой информации из многих страниц. Но обнаружилась другая проблема.

Сделав функцию типа parseAllPages() (которая загружает информацию со всех страниц одного раздела) я не предусмотрел, что мне захочется вызвать её для разных разделов одновременно и асинхронно. Для загрузки информации из различных категорий я запускал эту условную функцию parseAllPages() в другом элементе Conveyor. Но несколько Conveyor-ов, не синхронизованы между собой и потому могут выполнять больше запросов в секунду, чем допустимо ограничениями.

Для устранения недостатка в параметры Conveyor был добавлен флаг (булев параметр) useQueue (по умолчанию false), взведение которого означает последовательную обработку данных (следующий элемент будет обработан только после того, как обработан предыдущий). Такой тип обработки позволяет синхронизировать несколько взаимосвязанных объектов Conveyor. Пример:

var categoriesConveyor = new Conveyor(function (category, cb) {
    parseAllPages(category, function() {
        cb();
    }
}, {
    period: 100,
    useQueue: true
});

То есть категории я обрабатывал последовательно, а страницы внутри категории — не последовательно. Ну а дальше по описанному алгоритму.

Также реализована функция Conveyor.wait(count) на случай, если элементы для обработки будут добавляться позднее вызова функции whenStop. То есть функция из whenStop не будет вызвана, пока не будет вызвана count раз функция conveyor.add(). Или, если данных добавлять уже не нужно, можно вызвать функцию Conveyor.unwait(count). Счетчик ожидаемых элементов также можно задать при инициализации Conveyor, указав значение параметру expectedElementsCounter.

А если нужно остановить обработку (игнорируя необработанные элементы), следует вызвать функцию Conveyor.forceStop().

Эта штука очень помогла мне. Надеюсь, что кому-нибудь она также будет полезной.

Буду благодарен за фидбэк. Особенно полезен он будет по codestyle в js.

Автор: topa

Источник

Поделиться

* - обязательные к заполнению поля