Понимание событийной архитектуры Node.js

в 13:38, , рубрики: javascript, node.js, Блог компании Mail.Ru Group, никто не читает теги, Программирование, Разработка веб-сайтов

Понимание событийной архитектуры Node.js - 1

Большинство Node-объектов — вроде HTTP-запросов, ответов и потоков (streams) — реализуют модуль EventEmitter, благодаря которому они могут генерировать и прослушивать события.

const EventEmitter = require('events')

Простейшая форма управления по событиям — это callback-стиль некоторых популярных Node.js-функций, к примеру fs.readFile. По этой аналогии событие генерируется однократно (когда Node готов к вызову коллбэка), а коллбэк действует как обработчик события. Давайте сначала разберём эту базовую форму событийно-управляемой архитектуры.

Вызови меня, когда будешь готов, Node!

Изначально Node обрабатывал асинхронные события с помощью коллбэков. Это было давно, ещё до того как в JavaScript появилась нативная поддержка промисов и фича async/await. Коллбэки — это просто функции, которые вы передаёте другим функциям. Такое возможно в JavaScript, потому что функции — это объекты первого класса.

Важно понимать, что коллбэки не индикаторы асинхронного вызова в коде. Функция может вызывать коллбэк как синхронно, так и асинхронно. Например, хост-функция fileSize принимает коллбэк-функцию cb, причём вызывает её синхронно или асинхронно в зависимости от условия:

function fileSize (fileName, cb) {
  if (typeof fileName !== 'string') {
    return cb(new TypeError('argument should be string')); // Sync
  }
  fs.stat(fileName, (err, stats) => {
    if (err) { return cb(err); } // Async
    cb(null, stats.size); // Async
  });
}

Это плохой подход, приводящий к неожиданным ошибкам. Создавайте такие хост-функции, которые принимают коллбэки либо всегда синхронно, либо всегда асинхронно.

Давайте разберём простой пример типичной асинхронной Node-функции, написанной в коллбэк-стиле:

const readFileAsArray = function(file, cb) {
  fs.readFile(file, function(err, data) {
    if (err) {
      return cb(err);
    }
    const lines = data.toString().trim().split('n');
    cb(null, lines);
  });
};

readFileAsArray берёт путь файла и коллбэк-функцию. Считывает содержимое файла, разбивает на массив строк и вызывает применительно к этому массиву коллбэк-функцию. Вот как это можно использовать. Допустим, файл numbers.txt лежит в одной директории с таким контентом:

10
11
12
13
14
15

Если у нас есть задача посчитать числа в этом файле, то для упрощения кода можно воспользоваться readFileAsArray:

readFileAsArray('./numbers.txt', (err, lines) => {
  if (err) throw err;
  const numbers = lines.map(Number);
  const oddNumbers = numbers.filter(n => n%2 === 1);
  console.log('Odd numbers count:', oddNumbers.length);
});

Этот код читает в массиве строк числовой контент, парсит его как числа и выполняет подсчёт.
Здесь работает характерный для Node коллбэк-стиль. У коллбэка есть error-first-аргумент err, который может принимать значение null. Мы передаём этот коллбэк в качестве последнего аргумента хост-функции. Всегда делайте так в своих функциях, потому что пользователи наверняка будут на это рассчитывать. Пусть ваша хост-функция получает коллбэк в виде последнего аргумента, и пусть коллбэк ожидает в качестве своего первого аргумента error-объект.

Современные JS-альтернативы коллбэкам

В современном JavaScript есть такие объекты, как промисы. Они могут быть альтернативой коллбэкам в случае асинхронных API. Вместо передачи коллбэка в качестве аргумента и обработки ошибки в том же месте промис позволяет отдельно обрабатывать успешные и ошибочные ситуации, а также соединять несколько асинхронных вызовов в цепочки, а не делать их вложенными.

Если функция readFileAsArray поддерживает промисы, то мы можем использовать её следующим образом:

readFileAsArray('./numbers.txt')
  .then(lines => {
    const numbers = lines.map(Number);
    const oddNumbers = numbers.filter(n => n%2 === 1);
    console.log('Odd numbers count:', oddNumbers.length);
  })
  .catch(console.error);

Вместо передачи коллбэка мы вызываем функцию .then применительно к возвращаемому значению хост-функции. Обычно .then даёт нам доступ к тем же строкам массива, которые мы получаем в коллбэк-версии, поэтому можем работать как раньше. Для обработки ошибок добавим вызов .catch применительно к результату, что обеспечит нам доступ к ошибке, если она возникнет.

Благодаря новому объекту Promise в современном JavaScript стало легче реализовать поддержку промис-интерфейса хост-функцией. Вот функция readFileAsArray, модифицированная так, чтобы она поддерживала промис-интерфейс в дополнение к уже поддерживаемому коллбэк-интерфейсу:

const readFileAsArray = function(file, cb = () => {}) {
  return new Promise((resolve, reject) => {
    fs.readFile(file, function(err, data) {
      if (err) {
        reject(err);
        return cb(err);
      }
      const lines = data.toString().trim().split('n');
      resolve(lines);
      cb(null, lines);
    });
  });
};

Функция возвращает объект Promise, в который обёртывается асинхронный вызов fs.readFile. У промиса два аргумента: функции resolve и reject. Если нам нужно вызвать коллбэк с ошибкой, то используем промис-функцию reject, а для коллбэка с данными — промис-функцию resolve.

Единственное отличие заключается в том, что нам нужно иметь значение по умолчанию для коллбэк-аргумента на тот случай, если код используется с промис-интерфейсом. Например, в качестве аргумента можно использовать простую, по умолчанию пустую функцию () => {}.

Потребление промисов с помощью async/await

Добавление промис-интерфейса позволяет гораздо легче работать с вашим кодом, если нужно использовать асинхронную функцию в цикле. С коллбэками ситуация усложняется. Промисы немного улучшают положение дел, как и генератор функций. Иными словами, более свежая альтернатива для работы с асинхронным кодом — функция async. Она позволяет обращаться с асинхронным кодом как с синхронным, что сильно улучшает читабельность кода.

Вот как можно использовать функцию readFileAsArray с помощью async/await:

async function countOdd () {
  try {
    const lines = await readFileAsArray('./numbers');
    const numbers = lines.map(Number);
    const oddCount = numbers.filter(n => n%2 === 1).length;
    console.log('Odd numbers count:', oddCount);
  } catch(err) {
    console.error(err);
  }
}
countOdd();

Сначала создаём асинхронную функцию — обычную функцию со словом async в начале. Внутри неё мы вызываем функцию readFileAsArray, словно она возвращает переменную lines, и для этого мы используем ключевое слово await. Если вызов readFileAsArray был синхронным, то продолжаем код. Чтобы выполнить получившееся, мы исполняем функцию async. Так получается просто и читабельно. Для работы с ошибками нам нужно обернуть вызов async в выражение try/catch.

Благодаря фиче async/await нам не потребовался специальный API (вроде .then и .catch). Мы лишь иначе маркировали функции и взяли чистый JavaScript.

Мы можем использовать async/await с любой функцией, поддерживающей промис-интерфейс. Но не можем — с асинхронными функциями в коллбэк-стиле (например, setTimeout).

Модуль EventEmitter

EventEmitter — это модуль, содействующий коммуникации между объектами в Node. Он является ядром асинхронной событийно-управляемой архитектуры. Многие из встроенных в Node модулей наследуют от EventEmitter.

Его идея проста: emitter-объекты генерируют именованные события, которые приводят к вызову ранее зарегистрированных прослушивателей. Так что у эмиттера есть две основные функции:

  • Генерирование именованных событий.
  • Регистрация и дерегистрация функций-прослушивателей.

Для работы с EventEmitter нужно создать расширяющий его класс.

class MyEmitter extends EventEmitter {

}

Эмиттеры — это то, что мы инстанцируем из классов на основе EventEmitter:

const myEmitter = new MyEmitter();

В любой момент жизненного цикла эмиттеров мы можем воспользоваться функцией emit и сгенерировать любое именованное событие.

myEmitter.emit('something-happened');

Генерирование события — это сигнал того, что соблюдено какое-то условие. Обычно речь идёт об изменении состояния генерирующего объекта. С помощью метода on можно добавить функции-прослушиватели, которые будут исполняться каждый раз, когда эмиттеры генерируют свои ассоциированные именованные события.

События !== асинхронность

Взгляните на пример:

const EventEmitter = require('events');

class WithLog extends EventEmitter {
  execute(taskFunc) {
    console.log('Before executing');
    this.emit('begin');
    taskFunc();
    this.emit('end');
    console.log('After executing');
  }
}

const withLog = new WithLog();

withLog.on('begin', () => console.log('About to execute'));
withLog.on('end', () => console.log('Done with execute'));

withLog.execute(() => console.log('*** Executing task ***'));

Класс WithLog — это эмиттер. Он определяет один экземпляр функции execute. Она получает один аргумент — функцию задачи (task function) — и оборачивает её исполнение в log-выражения. События генерируются до и после исполнения.

Чтобы увидеть, в какой очерёдности всё работает, зарегистрируем прослушивателей для именованных событий и выполним пример задачи по запуску всей цепочки.

Результат:

Before executing
About to execute
*** Executing task ***
Done with execute
After executing

Что я хочу отметить касательно результата исполнения кода: здесь нет ничего асинхронного.

  • Сначала получаем строку «Before executing».
  • Затем именованное событие begin приводит к появлению строки «About to execute».
  • Далее реально исполняемая строка генерирует строку «*** Executing task ***».
  • Потом именованное событие end приводит к появлению строки «Done with execute».
  • В конце получаем строку «After executing».

Совсем как старые добрые коллбэки, не предполагающие, что события характерны для синхронного или асинхронного кода. Это важно, потому что если мы передаём в execute асинхронную taskFunc, то генерируемые события больше не будут точны.

Можно эмулировать эту ситуацию с помощью вызова setImmediate:

// ...

withLog.execute(() => {
  setImmediate(() => {
    console.log('*** Executing task ***')
  });
});

Теперь результат будет такой:

Before executing
About to execute
Done with execute
After executing
*** Executing task ***

Это неправильно. Строки после асинхронного вызова, приводящие к появлению вызовов «Done with execute» и «After executing», появляются в неправильной очерёдности.

Для генерирования события после завершения асинхронной функции нам нужно скомбинировать коллбэки (или промисы) с этой событийно-управляемой коммуникацией. Это демонстрируется на нижеприведённом примере.

Одно из преимуществ использования событий вместо обычных коллбэков — то, что мы можем много раз реагировать на один и тот же сигнал благодаря определению многочисленных прослушивателей. Чтобы сделать то же самое с помощью коллбэков, придётся написать больше логики внутри одного доступного коллбэка. События — прекрасный способ реализовать многочисленные внешние плагины, добавляющие функциональность к ядру приложения. Можно считать их «разъёмами» для кастомизации поведения при изменении состояния.

Асинхронные события

Давайте преобразуем наш синхронный пример в нечто асинхронное и немного более полезное.

const fs = require('fs');
const EventEmitter = require('events');

class WithTime extends EventEmitter {
  execute(asyncFunc, ...args) {
    this.emit('begin');
    console.time('execute');
    asyncFunc(...args, (err, data) => {
      if (err) {
        return this.emit('error', err);
      }

      this.emit('data', data);
      console.timeEnd('execute');
      this.emit('end');
    });
  }
}

const withTime = new WithTime();

withTime.on('begin', () => console.log('About to execute'));
withTime.on('end', () => console.log('Done with execute'));

withTime.execute(fs.readFile, __filename);

Класс WithTime исполняет asyncFunc и с помощью вызовов console.time и console.timeEnd сообщает о времени, затраченном этой asyncFunc. Он генерирует правильную последовательность событий до и после исполнения. Также он генерирует error/data-события для работы с обычными сигналами асинхронных вызовов.

Протестируем эмиттер withTime, передав ему вызов асинхронной функции fs.readFile. Вместо обработки данных из файла с помощью коллбэка мы теперь можем прослушивать data-событие.

Выполнив этот код, мы, как и ожидалось, получаем правильную последовательность событий, а также отчёт о времени выполнения:

About to execute
execute: 4.507ms
Done with execute

Обратите внимание, что для этого нам нужно было скомбинировать коллбэк с эмиттером. Если бы asynFunc также поддерживала и промисы, то всё то же самое можно было бы реализовать с помощью async/await:

class WithTime extends EventEmitter {
  async execute(asyncFunc, ...args) {
    this.emit('begin');
    try {
      console.time('execute');
      const data = await asyncFunc(...args);
      this.emit('data', data);
      console.timeEnd('execute');
      this.emit('end');
    } catch(err) {
      this.emit('error', err);
    }
  }
}

Не знаю, как для вас, но для меня это выглядит гораздо читабельнее, чем код на основе коллбэков или строк с .then/.catch. Фича async/await максимально приближает нас к JavaScript, что я считаю большим достижением.

Аргументы событий и ошибки

В предыдущем примере было два события, сгенерированных с дополнительными аргументами. Error-cобытие сгенерировано error-объектом.

this.emit('error', err);

Data-cобытие сгенерировано data-объектом.

this.emit('data', data);

После именованного события мы можем использовать столько аргументов, сколько нужно, и все они будут доступны внутри функций-прослушивателей, которые мы зарегистрировали для этих именованных событий.

Например, для работы с data-событием зарегистрированная функция-прослушиватель получит доступ к data-аргументу, который был передан сгенерированному событию. И этот data-объект — именно то, что предоставляет asyncFunc.

withTime.on('data', (data) => {
  // do something with data
});

Обычно событие error специальное. В примере с коллбэками — если мы не обрабатываем error-событие с помощью прослушивателя, то Node-процесс завершается.

Чтобы продемонстрировать это поведение, снова вызовем исполнение метода с плохим аргументом:

class WithTime extends EventEmitter {
  execute(asyncFunc, ...args) {
    console.time('execute');
    asyncFunc(...args, (err, data) => {
      if (err) {
        return this.emit('error', err); // Not Handled
      }

      console.timeEnd('execute');
    });
  }
}

const withTime = new WithTime();

withTime.execute(fs.readFile, ''); // BAD CALL
withTime.execute(fs.readFile, __filename);

Первый вызов исполнения (execute call) приведёт к ошибке. Node-процесс упадёт или завершится:

events.js:163
      throw er; // Unhandled 'error' event
      ^
Error: ENOENT: no such file or directory, open ''

Это падение повлияет на второй вызов исполнения, который может вообще не быть выполнен.

Если зарегистрировать прослушивателя для специального события error, то поведение Node-процесса изменится. Например:

withTime.on('error', (err) => {
  // do something with err, for example log it somewhere
  console.log(err)
});

В данном случае будет сообщено об ошибке первого вызова исполнения, но Node-процесс не упадёт и не завершится. Второй вызов исполнения нормально закончится:

{ Error: ENOENT: no such file or directory, open '' errno: -2, code: 'ENOENT', syscall: 'open', path: '' }
execute: 4.276ms

Обратите внимание, что сейчас Node ведёт себя иначе с функциями на основе промисов, он лишь выдаёт предупреждение, но в конце концов это изменится:

UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): Error: ENOENT: no such file or directory, open ''
DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

Другой способ обработки исключений из-за сгенерированных ошибок — регистрация прослушивателя глобального события процесса uncaughtException. Однако глобальная ловля ошибок при таком событии — идея плохая.

Стандартный совет относительно uncaughtException: избегайте его использования. Но если вам это необходимо (например, для отчёта о случившемся или для очисток), то позвольте процессу в любом случае завершиться:

process.on('uncaughtException', (err) => {
  // something went unhandled.
  // Do any cleanup and exit anyway!

  console.error(err); // don't do just that.

  // FORCE exit the process too.
  process.exit(1);
});

Однако представим, что одновременно произошло несколько error-событий. Это означает, что прослушиватель uncaughtException запущен несколько раз, что может стать проблемой при очистке кода. Такое бывает, к примеру, когда многочисленные вызовы приводят к завершению работы базы данных.

Модуль EventEmitter предоставляет метод once. Он сигнализирует о том, что хватит и одного вызова прослушивателя. Метод практично использовать с uncaughtException, потому что при первом непойманном исключении мы начнём выполнять чистку, зная, что в любом случае процесс завершится.

Порядок прослушивателей

Если для одного события зарегистрировать несколько прослушивателей, то они станут вызываться в каком-то порядке. Первый зарегистрированный будет и первым вызванным.

// प्रथम
withTime.on('data', (data) => {
  console.log(`Length: ${data.length}`);
});

// दूसरा
withTime.on('data', (data) => {
  console.log(`Characters: ${data.toString().length}`);
});

withTime.execute(fs.readFile, __filename);

Если выполнить этот код, то сначала в лог будет занесена строка «Length», а потом «Characters», потому что именно в таком порядке мы определили их прослушивателей.

Если нужно определить нового прослушивателя, но чтобы он вызывался первым, можно воспользоваться методом prependListener:

// प्रथम
withTime.on('data', (data) => {
  console.log(`Length: ${data.length}`);
});

// दूसरा
withTime.prependListener('data', (data) => {
  console.log(`Characters: ${data.toString().length}`);
});

withTime.execute(fs.readFile, __filename);

В этом случае в логе сначала появится строка «Characters».

И наконец, если вам нужно убрать прослушивателя, то воспользуйтесь методом removeListener.

На этом всё.

Автор: AloneCoder

Источник

Поделиться

* - обязательные к заполнению поля