Применение принципов функционального программирования при проектировании ERP

в 22:01, , рубрики: big data, Deno, ERP-системы, nosql, NoSQL ERP, TypeScript, Анализ и проектирование систем, Функциональная СУБД, функциональное программирование

Привет!

В этой статье мы попробуем взглянуть на архитектуру учетных систем (ERP, CRM, WMS, MES, B2B, ...) с позиций функционального программирования. Существующие системы сложны. Они базируются на реляционной схеме данных, и имеют огромный мутабельный стейт в виде сотен связаных таблиц. При этом единственным «источником правды» в таких системах является хронологически-упорядоченный журнал первичных документов (отпечатков событий реального мира), которые, очевидно, должны быть иммутабельными (и это правило соблюдается в аудируемых системах, где корректировки «задним числом» запрещены). Журнал документов составляет от силы 20% объема БД, а все остальное — промежуточные абстракции и агрегаты, с которыми удобно работать на языке SQL, но которые требуют постоянной синхронизации с документами, и между собой.

Если вернуться к истокам (устранить избыточность данных и отказаться от хранения агрегатов), а все бизнес-алгоритмы реализовать в виде функций, применяемых непосредственно к потоку первичных документов — мы получим функциональную СУБД, и построенную на ней функциональную ERP. Проблема производительности решается благодаря мемоизации, а объем функционального кода будет вполне соизмерим с объемом декларативного SQL, и не сложнее для понимания. В данной статье мы продемонстрируем подход, разработав простейшую файловую СУБД на языке TypeScript и рантайме Deno (аналог Node.js), а также протестируем производительность сверток на примере типичных бизнес-задач.

Почему это актуально

1) Мутабельный стейт + избыточность данных — это плохо, особенно когда необходимо обеспечивать его постоянную синхронизацию с потоком документов. Это источник потенциальных расхождений учетных данных (баланс не сходится) и трудно обнаруживаемых побочных эффектов.

2) Жесткая реляционная схема хранения исходных и промежуточных данных дорого обходится в Big Data, гетерогенных системах, и в условиях быстрых изменений — то есть по сути везде. Мы предлагаем хранить документы в исходном виде, упорядочив по времени, разрешив связи «от новых к старым» и никогда наоборот. Это позволит рассчитывать большинство агрегатов однопроходными алгоритмами прямо из документов, а все остальные таблицы — не нужны.

3) SQL устарел, так как предполагает доступность любых данных в любой момент, а в распределенных системах это очевидно не так — при разработке алгоритмов Big Data нужно быть готовым к тому, что часть необходимых данных появится позже, а часть уже появлялась раньше. Это требует небольшого переосмысления языка запросов, и сознательной заботы о кэшировании.

4) Современные ЯП позволяют создать отзывчивую систему, оперирующую миллионами записей локально на ноутбуке, где РСУБД просто не установится. А если говорить о серверах — предлагаемая схема имеет больше возможностей для параллелизма, в том числе на кластерах типа SPARK.

Предыстория вопроса

Проработав достаточно долго с различным бизнес-ПО (системы учета, планирования, WMS), практически везде сталкивался с двумя проблемами — сложность внесения изменений в схему данных, и частое падение производительности, когда эти изменения таки вносились. Вообще, эти системы имеют сложную структуру, поскольку к ним предъявляются противоречивые требования:

1) Аудируемость. Нужно хранить все первичные документы в неизменном виде. Разделение на справочники и операции весьма условно, во взрослых системах справочники огранизованы с версионированием, где каждое изменение оформляется специальным документом. Таким образом, исходные документы — это иммутабельная часть системы, и она является единственным «источником правды», а все остальные данные могут быть восстановлены из нее.

2) Производительность запросов. Например, при создании строки заказа на продажу система должна рассчитать цену товара с учетом скидок, для чего необходимо извлечь статус клиента, его текущий баланс, историю покупок, текущие акции в регионе, и т.д. Естественно, вся необходимая информация не может быть вычислена «на лету», а должна быть доступной в полу-готовом виде. Поэтому существующие системы хранят удобные абстракции над строками документов (проводки), а также заранее рассчитанные агрегаты (регистры накопления, временные срезы, текущие остатки, сводные проводки, и т.д.). Их объем составляет до 80% размера БД, структура таблиц жестко фиксирована, при любых изменениях в алгоритмах — программист должен позаботиться о правильном обновлении агрегатов. По сути агрегаты это и есть мутабельное состояние системы.

3) Транзакционная производительность. При проведении любого документа нужно пересчитать все агретаты, а это обычно блокирующая операция. Поэтому алгоритмы обновления агрегатов — самая болезненная точка системы, и при внесении большого количества изменений имеется существенный риск что-то сломать, и тогда данные «разъедутся», то есть агрегаты перестанут соответствовать документам. Эта ситуация — бич всех проектов внедрения и последующей поддержки.

Устанавливаем основы новой архитектуры

1) Хранение. Основа БД — хронологически упорядоченный журнал документов, отражающих свершившиеся факты реального мира. Справочники это тоже документы, просто длительного действия. И документ, и каждая версия записи справочника — иммутабельны. Никаких других данных в виде проводок / регистров / остатков в системе не хранится (сильное провокационное утверждение, в жизни бывает по разному, но нужно стремиться к совершенству). Документ имеет ряд системных атрибутов:

{
    "sys": {
        "code": "partner.1",  // человеко-читаемый код, идентификатор группы
        "ts": 1578263624612,  // временная метка записи
        "id": "partner.1^1578263624612",  // составной уникальный глобальный ID
        "cache": 1  // признак необходимости принудительного кэширования
    },
    ...
}

Документы с одинаковым code и разными ts образуют историческую группу, где актуальной считается последняя запись, остальные — историческими. Если установлен атрибут cache, последняя запись из группы попадают в так называемый топ-кэш, и одновременно все записи попадают в фулл-кэш, таким образом мы можем быстро извлечь запись справочника как по id, так и по code.

Документы могут дописываться в конец журнала, и никогда в середину. Изменение или удаление (отмена) старого документа — это всегда новый документ, записываемый в журнал с текущим ts. Таким образом, причинно-следственная связь определяется положением документа в журнале, скачки в прошлое или будущее запрещены (при этом даты принятия к учету, даты исполнения планов могут быть любыми, но с точки зрения ядра системы это просто атрибуты).

2) Связи. Документы могут ссылаться друг на друга по id. В отличие от «sql foreign key» — указывать тип сущности, на которую ссылаемся, нет необходимости, так как сущности лежат вперемешку, а id уникален. Связи от ранних документов к более поздним запрещены. Это означает, что в любом пользовательском алгоритме при обработке текущего документа могут быть востребованы связанные документы, уже встречавшиеся в выборке ранее (и по идее они должны быть кэшированы — либо ядром, либо пользовательским алгоритмом).

3) Горизонт иммутабельности. Часть документов, с которыми ведется активная работа (т.н. открытые документы) не может быть зафиксирована, поэтому вводится понятие горизонта иммутабельности, а база данных разделяется на 2 физических хранилища — иммутабельное хранилище и текущее хранилище. Все документы в первом хранилище имеют временную метку меньше горизонта, они неизменны, а результаты всех сверток кэшируются и переиспользуются. Все что позднее — называется текущим периодом, и при каждом запросе второе хранилище сканируется заново. Такая схема дает линейное время. Горизонт иммутабельности — термин, хорошо знакомый коллегам из 1С, и бухгалтерам. Производительность системы зависит исключительно от размера бардака текущего периода, и в этом вопросе мировая бизнес-практика беспощадна — чем он меньше, тем лучше.

4) Алгоритмы. Журнал документов может храниться в любом виде — последовательный файл, документная БД, таблица РСУБД, поступать из внешнего стрима — главное чтобы они были извлекаемы в прямом либо обратном хронологическом порядке. Любой бизнес-алгоритм — это композиция функций filter(), reduce(), get(), gettop(), примененная к потоку документов. Ввиду отсутствия семантики JOIN, у пользователя остается возможность использовать вложенные подзапросы к БД, либо пытаться ограничиться одним проходом, помещая в пользовательский кэш все, что может потребоваться в будущем. Естественно, помогает системный кэш, хранящий как отдельные документы, к которым уже были запросы по id / code, так и результаты расчетов, выполненных ранее (при полном совпадении входных параметров этих расчетов).

5) Мемоизация, или кэширование. Результаты запросов и расчетов попадают в кэш в случаях:
— документ имеет атрибут cache, при первом reduce() он записывается в фулл-кэш, и обновляет запись в топ-кэше;
— документ извлечен запросом по id / code, и он находится в иммутабельном хранилище;
reduce() завершил расчет по иммутабельному хранилищу, промежуточный результат клонируется и записывается в кэш, а расчет продолжается по текущему хранилищу.
Мы видим, что в отличие от жестко-структурированного «кэша» в системах, основанных на РСУБД, мы имеем адаптивный кэш, наполняемый по мере работы системы. Необходимость экономить память заставляет ограничивать объем кэшируемой информации, поэтому, например, результат функции filter() не кэшируется, а результат reduce() — обязательно. Пользователю даются ограниченные инструменты управления кэшем.

6) Поиск бывает 3-х видов. Первый — когда при обработке текущего документа нам нужно найти несколько связанных документов по неточным критериям. В этом случае либо подзапрос, либо в своем reduce() заранее сохраням все что может потенциально потребоваться, и когда потребовалось — ищем в этой выборке. Второй вид поиска — когда нам нужно получить актуальные элементы справочника, без исторических данных (т.н. текущий срез). В этом случае используется топ-кэш, который как раз хранит такие элементы. В третьем случае это fullscan по базе в обратной хронологической последовательности. Насколько целесообразно кэшировать результаты пользовательских поисков — вопрос дискуссионный, в какой-то мере очевидно необходимо, например с ограничением объема выборки.

7) Добавление документов и перемещение горизонта. При добавлении нового документа актуализируется только топ-кэш. При перемещении документа из текущего хранилища в иммутабельное — по идее нужно до-обновить все кэшированные агрегаты, эта операция тяжелая (зависит от количества переносимых документов и количества результатов в кэше), и должна выполняться по аналогии с закрытием учетного периода — в часы наименьшей нагрузки.

Простая функциональная СУБД

Итак, попробуем что-нибудь написать. Язык TypeScript выбран за идеальное сочетание скриптового динамизма и типизации, рантайм Deno — за удобную поддержку TypeScript и WASM, а также наличия Rust API, что теоретически дает нам шанс ускорить некоторые алгоритмы (хотя это неточно).
Документы в нашей СУБД будут храниться в виде 2-х последовательных файлов, содержащих объекты JSON, разделенные символом "x01", так как это позволяет написать быстрый потоковый парсер. API чтения состоит пока всего из 3-х функций:

type Document = any
type Result = any

public async get(id: string): Promise<Document | undefined>

public async gettop(code: string): Promise<Document | undefined>

public async reduce(
    filter: (result: Result, doc: Document) => Promise<boolean>, 
    reducer: (result: Result, doc: Document) => Promise<void>,
    result: Result
): Promise<Result>

Первая функция возвращает документ по id, вторая возвращает последний документ с заданным code, третья осуществляет фильтрацию и свертку, принимая на вход функцию фильтрации, функцию свертки и начальное значение аккумулятора. Мы сознательно не использовуем цепочку filter().reduce(), так как хотим кэшировать итоговый результат, а в случае цепочки — кэшировать отдельно результат фильтрации расточительно, а кэшировать результат свертки без знания условий фильтрации — бессмысленно. Поэтому reduce() получает на вход сразу все необходимое для расчета, и использует составной хэш от трех параметров в качестве ключа мемоизации.

Собственно, весь пользовательский алгортм представляет собой реализацию колбэков filter и reducer, а аккумулятор-результат может быть любым сериализуемым объектом. Обратите внимание, что оба колбэка возвращают промис, то есть внутри них разрешены вложенные запросы get() и reduce(). Благодаря промисам вложенный цикл (например по строкам текущего документ) можно параллелить (см. второй тест).

Исходные данные

Рассмотрим простейшую систему учета покупок и продаж. Нам нужны справочники контрагентов и номенклатур, а также документы покупки и продажи. Если мы хотим считать себестоимость расходов и маржу, нужен еще один документ — сопоставление приходов с расходами, но это уже тема отдельной статьи.

Партнеры и номенклатуры

{
    "sys": {
        "code": "partner.1",
        "ts": 1578263624612,
        "id": "partner.1^1578263624612",
        "cache": 1     
    },
    "type": "partner.retail",
    "name": "Рога и копыта ООО"
}
{
    "sys": {
        "code": "invent.1",
        "ts": 1578263624612,
        "id": "invent.1^1578263624612",
        "cache": 1     
    },
    "type": "invent.material",
    "name": "Гвоздь строительный 20мм"
}

Атрибут type — пользовательский, его иерархия никак не используется ядром, а лишь в пользовательских алгоритмах. Также не имеет значение семантика атрибута code — для ядра это просто строка.

Покупки и продажи

{
    "sys": {
        "code": "purch.1",
        "ts": 1578263624613,
        "id": "purch.1^1578263624613"  
    },
    "type": "purch",
    "date": "2020-01-07",
    "partner": "partner.3^1578263624612",
    "lines": [
        {
            "invent": "invent.0^1578263624612",
            "qty": 2,
            "price": 232.62838134273366
        },
        {
            "invent": "invent.1^1578263624917",
            "qty": 24,
            "price": 174.0459600393788
        }
    ]
}

Документы отличаются только типом (purch | sale), cтроки хранятся прямо в документе (в реляционной схеме они лежали бы в отдельной таблице).

Реализация алгоритмов

Анализ продаж
Считаем общую сумму продаж, средний чек, и среднее количество строк на документ.

import { FuncDB } from "./FuncDB.ts"
const db = FuncDB.open('./sample_database/')

let res = await db.reduce(
    async (_, doc) => doc.type == 'sale',  // фильтруем только продажи
    async (result, doc) => {
        result.doccou++
        doc.lines.forEach(line => {  // цикл по строкам документа
            result.linecou++
            result.amount += line.price * line.qty
        })
    },
    {amount: 0, doccou: 0, linecou: 0}  // инициализируем аккумулятор
)

console.log(`
    amount total = ${res.amount}
    amount per document = ${res.amount / res.doccou}
    lines per document = ${res.linecou / res.doccou}`
)

Обороты в разрезе номенклатур и партнеров
По сути это сводная таблица, поэтому в качестве аккумулятора используем Map.

class ResultRow { // строка результирующей таблицы
    invent_name = ''
    partner_name = ''
    debit_qty = 0
    debit_amount = 0
    credit_qty = 0
    credit_amount = 0
}

let res = await db.reduce(
    async (_, doc) => doc.type == 'purch' || doc.type == 'sale',
    async (result, doc) => {
        // поскольку внутри цикла у нас await - параллелим обработку строк
        const promises = doc.lines.map(async (line) => {
            const key = line.invent + doc.partner
            let row = result.get(key)
            if (row === undefined) {
                row = new ResultRow()
                // наименования получаем подзапросами к базе (они кэшируются)
                const invent = await db.get(line.invent)
                const partner = await db.get(doc.partner)
                row.invent_name = invent ? invent.name : line.invent + ' not found'
                row.partner_name = partner ? partner.name : doc.partner + ' not found'
                result.set(key, row)
            }
            if (doc.type == 'purch') {
                row.debit_qty += line.qty
                row.debit_amount += line.qty * line.price
            } else if (doc.type == 'sale') {
                row.credit_qty += line.qty
                row.credit_amount += line.qty * line.price
            }
        })
        await Promise.all(promises)
    },
    new Map<string, ResultRow>() // результирующая таблица (аккумулятор)
)

Мы видим, что половину кода составляет извлечение наименований подзапросами. Это легко исправить, написав сервисную функцию, но для общего понимания оставлю так. Обратите внимание, что мы параллелим обработку строк — в случае если номенклатуры нет в кэше — запускается fullscan, результата которого в нашем случае ждать необязательно.

Бенчмаркинг

Тестируем на сгенерированных данных:
иммутабельное хранилище: 100 номенклатур + 100 контрагентов + 100 тыс. документов
текущее хранилище: 10 номенклатур + 10 контрагентов + 10 тыс. документов
Использую доисторический ноутбук с процессором Intel Celeron CPU N2830 @ 2.16 GHz

В первом тесте демонстрируем кэширование — сначала запускаем анализ продаж, затем добавляем новый документ продажи, и снова запускаем расчет. Видно что второй раз иммутабельное хранилище не обрабатывается, и расчет происходит в 10 раз быстрее.

Результаты - 100 тыс. документов за 11.1 секунды:

file: database_immutable.json:
 100200 docs parsed (0 errors)
 50018 docs processed (0 errors)
11.098s elapsed
file: database_current.json:
 10020 docs parsed (0 errors)
 4987 docs processed (0 errors)
1.036s elapsed
amount total = 623422871.2641689
amount per document = 11389.839613851627
lines per document = 3.6682561432355896

file: database_current.json:
 10021 docs parsed (0 errors)
 4988 docs processed (0 errors)
1.034s elapsed
amount total = 623433860.2641689
amount per document = 11389.832290707558
lines per document = 3.6682073954983925

Если честно, я рассчитывал минимум на миллион документов в секунду. Разберемся, где у нас основная задержка на примере обработки первого файла:
8.8s — чтение файла и извлечение строковых JSON, разделенных символом "x01"
1.9s — парсинг JSON в объекты
0.4s — кэширование + пользовательский алгоритм
Заглянув в исходники Deno, я понял, основная задержка возникает при декодировании unicode, ведь V8 в качестве байто-дробилки подходит плохо. Это значит, что переписать критические куски на WASM/Rust будет очень просто, а если в качестве хранилища использовать нормальную объектную БД, то и парсинга JSON можно избежать, и тогда достичь миллиона записей в секунду — более чем реально. И это я не говорю про нормальное железо.

Второй тест мы запускаем сначала на свеже-сгенерированной базе, затем после прогона первого теста. Второй раз производительность упала в 3 раза, потому что в первом тесте мы добавили документ продажи, ссылающийся на несуществующего партнера и номенклатуру, и, не найдя их в кэше, система вынуждена дважды запускать fullscan. Но эта ситуация по сути аварийная, а нормальная выглядит так:

Результаты - 100 тыс. документов за 13.3 секунды:

file: database_immutable.json:
 100200 docs parsed (0 errors)
 100000 docs processed (0 errors)
13.307s elapsed
file: database_current.json:
 10020 docs parsed (0 errors)
 10000 docs processed (0 errors)
1.296s elapsed

Наконец-то пошла рабочая нагрузка, мы делаем 2 вложенных асинхронных запроса, и получаем затраты на пользовательский алгоритм — 2.6s. Допускаю, что оборачивание любой функции в промис накладно, а в нашем случае запрос к кэшу выполняется синхронно, и возможно это место стоит оптимизировать.

Резюме

В целом я доволен результатом, схема получилась вполне рабочая, проект можно развивать, если у кого есть мысли — пишите. Буду благодарен за ссылку на публичные обфусцированные данные, приближенные к реальности (счета-фактуры, EDI, или что-то подобное), необходимые для полноценного тестирования.

Полный код на гитхабе

Спасибо за внимание.

Автор: Евгений Баладжа

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js