Асинхронные циклы и Stream API в Node.js 10

в 19:27, , рубрики: javascript, node.js, nodejs, Stream, streaming api, Программирование

Асинхронные циклы и Stream API в Node.js 10 - 1

В этом месяце выходит десятая версия Node.js, в которой нас ждет изменение поведения потоков (readable-stream), вызванное появлением асинхронных циклов for-await-of. Давайте разберемся что это такое и к чему нам готовиться.

Конструкция for-await-of.

Для начала давайте разберемся с тем как работают асинхронные циклы на простом примере. Для наглядности добавим завершившиеся промисы.

const promises = [
    Promise.resolve(1),
    Promise.resolve(2),
    Promise.resolve(3),
];

Обычный цикл пройдется по массиву promises и вернет сами значения:

for (const value of promises) {
    console.log(value);
}
// > Promise({resolved: 1})
// > Promise({resolved: 2})
// > Promise({resolved: 3})

Асинхронный цикл дождется разрешения промиса и вернет возвращаемое промисом значение:

for await (const value of promises) {
    console.log(value);
}
// > 1
// > 2
// > 3

ReadableStream и for-await-of

Объект ReadableStream получил свойство Symbol.asyncIterator, что позволяет ему также быть переданным в for-await-of цикл. Возьмем для примера fs.createReadableStream:

const readStream = fs.createReadStream(file);

const chunks = [];
for await (const chunk of readStream) {
    chunks.push(chunk);
}

console.log(Buffer.concat(chunks));

Как видно из примера теперь мы избавились от вызовов on('data', ... и on('end', ..., а сам код стал выглядеть нагляднее и предсказуемее.

Асинхронные генераторы

В некоторых случаях может понадобится дополнительная обработка получаемых данных, для этого используются асинхронные генераторы. Мы можем реализовать поиск по файлу регулярного выражения:

async function * search(needle, chunks) {
    let pos = 0;

    for await (const chunk of chunks) {
        let string = chunk.toString();

        while (string.length) {
            const match = string.match(needle);

            if (! match) {
                pos += string.length;
                break;
            }

            yield {
              index: pos + match.index,
              value: match[0],
            };

            string = string.slice(match.index + match[0].length);
            pos += match.index;
        }
    }
}

Посмотрим что получилось:

const stream = fs.createReadStream(file);

for await (const {index, value} of search(/(a|b)c/, stream)) {
    console.log('found "%s" at %s', value, index);
}

Согласитесь, достаточно удобно, мы на лету превратили строки в объекты и нам не понадобилось использовать TransformStream и думать, как перехватывать ошибки, которые могут возникнуть в двух разных стримах и т.п.

Пример Unix-like потоков

Задача с чтением файла достаточно распространенная, но не исчерпывающая. Давайте рассмотрим случаи, когда требуется потоковая обработка вывода наподобие unix-конвейеров. Для этого воспользуемся асинхронными генераторами, через которые пропустим результат выполнения команды ls.

Сначала мы создадим дочерний процесс const subproc = spawn('ls') и затем будем читать стандартный вывод:

for await (const chunk of subproc.stdout) {
    // ...
}

А так как stdout генерирует вывод в виде объектов Buffer, то первым делом добавим генератор, который будет приводить вывод из типа Buffer в String:

async function *toString(chunks) {
    for await (const chunk of chunks) {
        yield chunk.toString();
    }
}

Далее сделаем простой генератор, который будет разбивать вывод построчно. Тут важно учесть, что порция данных, передаваемая из createReadStream, имеет ограниченную максимальную длинну, а это значит, что нам может прийти как строка целиком или кусок очень длинной строки, так и несколько строк одновременно:

async function *chunksToLines(chunks) {
    let previous = '';
    for await (const chunk of chunks) {
        previous += chunk;
        while (true) {
            const i = previous.indexOf('n');
            if (i < 0) {
                break;
            }
            yield previous.slice(0, i + 1);
            previous = previous.slice(i + 1);
        }
    }

    if (previous.length > 0) {
        yield previous;
    }
}

Так как каждое найденное значение все еще содержит перенос строки, создадим генератор для очистки значения от висящих пробельных символов:

async function *trim(values) {
    for await (const value of values) {
        yield value.trim();
    }
}

Последним действием будет непосредственно построчный вывод в консоль:

async function print(values) {
    for await (const value of values) {
        console.log(value);
    }
}

Объединим полученный код:

async function main() {
    const subproc = spawn('ls');

    await print(trim(chunksToLines(toString(subproc.stdout))));

    console.log('DONE');
}

Как видим, код получился несколько трудночитаемым. Если мы захотим добавить еще несколько вызовов или параметры, то в результате получим кашу. Чтобы избежать и сделать код более линейным, давайте добавим функцию pipe:

function pipe(value, ...fns) {
    let result = value;

    for (const fn of fns) {
        result = fn(result);
    }

    return result;
}

Теперь вызов можно привести к следующему виду:

async function main() {
    const subproc = spawn('ls');

    await pipe(
        subproc.stdout,
        toString,
        chunksToLines,
        trim,
        print,
    );

    console.log('DONE');
}

Оператор |>

Нужно иметь в виду, что в скором времени в стандарт JS должен войти новый оператор конвейера |> позволяющий делать тоже самое что сейчас делает pipe:

async function main() {
    const subproc = spawn('ls');

    await subproc.stdout
    |> toString
    |> chunksToLines
    |> trim
    |> print;

    console.log('DONE');
}

Вывод

Как видите асинхронные циклы и итераторы сделали язык еще более выразительным, ёмким и понятным. Ад из коллбеков уходит все дальше в прошлое и скоро станет страшилкой, которой мы будем пугать внуков. А генераторы похоже займут свое место в иерархии JS и будут применяться по назначению.

Основу для данной статьи составил материал Акселя Раушмайера Using async iteration natively in Node.js.

В продолжение темы

Автор: Rumkin

Источник

Поделиться