- PVSM.RU - https://www.pvsm.ru -

Потоковая обработка данных на С

Привет!

Кратко о том что такое потоковая обработка данных и в чем её отличие от пакетной.

Пакет данных, это часть информации поступающая в систему которая содержит законченный или не полный фрагмент данных. Большинство механизмов цифровой передачи информации в современных системах построены на пакетной передаче. Отличие потоковых и пакетных систем обработки в том, когда эти данные обрабатываются:

  • Потоковая обработка данных подразумевает обработку данных и реакцию на них непосредственно в момент получения с минимальными задержками.

  • Пакетная же в свою очередь занимается обработкой спустя установленные промежутки времени.

При проектировании систем потоковой обработки мы ожидаем получить ряд свойств:

  1. Минимальные задержки в обработке, получении и отправки данных

  2. Работу в режиме реального времени(предсказуемость времени выполнения)

  3. Возможность с минимальными простоями менять конфигурацию системы

Вышеизложенные свойства накладывают определенные ограничения на подобного рода системы:

  • Мы не можем использовать языки со "сборкой мусора", текущие подходы к освобождению ресурсов подразумевают произвольную остановку рабочих потоков приложения.

  • По тем же причинам не подходят языки в которых трансляция и оптимизация иcходного кода производится в процессе выполнения.

Абстракция

Для того чтобы лучше понять потоковую обработку данных, лучше всего представлять её в виде простых структур данных таких как списки, деревья, и графы.

Пусть единицей обработки данных будет узел графа(Нода)

Каждый узел может иметь тип:

  • INPUT - принимает поток данных из источника(Диск, сеть, БД, или любой другой механизм межпроцессного взаимодействия)

  • OUTPUT - отправляет или сохраняет данные любым из возможных способов аналогично INPUT

  • PROCESSING - обрабатывает данные

Путем переупорядочивания узлов мы можем получать разные свойства системы, необходимые для решения поставленной задачи.

Список

Самой простой структурой будет односвязный список, данные идут в одном направлении от источника к назначению. В односвязном списке просто добавлять и удалять новые узлы,но он не подходит для реализации сложных сценариев.

Список

Список

Дерево

В древовидных структурах, появляется возможность ветвления, в качестве листьев могут быть несколько OUTPUT узлов.

Дерево

Дерево

Граф

Самый распространенный и гибкий тип, это граф, есть возможность цикличных путей, петель, и даже не связных подграфов обработки.

Граф

Граф

Выбор подходящей структуры

При выборе структуры нужно отталкиваться от технического задания. Будет ли несколько источников? В зависимости от входных данных нужно ли иметь несколько сценариев обработки? Сколько будет источников данных?

Пример 1
Входное изображение всегда JPEG, декодируем его и передаем в обработку, подойдёт односвязный список.

Пример 2
Входными данными может быть поток h264 или HEVC, в зависимости от типа входного видеопоток нужно перенаправить данные в разные узлы, тут может подойти древовидная структура.

Пример 3
Более сложным сценарием может быть обработка сетевых пакетов.
Допустим на вход подается ICMP/UDP/TCP. Датаграммы и ICMP пойдут в одни узлы,
а TCP потребует хранения состояния соединения, появляется необходимость обхода узлов обработки в разных направлениях. В подобных сценариях подойдет граф.

Общее описание архитектуры системы

Структура приложений выглядит примерно так:

  • /plugins - хранит плагины в виде разделяемых библиотек(на Linux это .so)

  • /app - хранит исполняемый файл приложения

  • /config - хранит описание подключаемых модулей(опционально)

В текстовом виде алгоритм работы можно описать так.
Во время запуска приложения, определяется количество подключаемых узлов обработки, производится их валидация и инициализация, затем идет этап планирования - строится граф выполнения, то есть какой узел какому передает данные. После подготовительного этапа, запускаются рабочие потоки в которых планировщик перекладывает данные от узла к узлу и запускает обработку.

Важным отличием систем, будет этап планирования. Часто планирование выполняется в процессе выполнения, каждая нода во время обработки данных передаёт информацию о том, куда дальше передать данные на обработку. Этот подход позволяет более гибко строить граф обработки, и реализовывать очень нетривиальные сценарии.

Задача

Ниже на синтетическом примере будет разобран первый тип - односвязный список.

Написать модуль для платформы видео аналитики, который будет принимать кадры в формате JPEG из сети, и сохранять на диск в BMP формате, так же нужно сохранять файл с результатами аналитики в виде CSV. В качестве тестового плагина обработки, добавить плагин который будет менять значения пикселей R и G местами.Так же модуль должен иметь возможность добавления нового функционала в граф обработки/аналитики.

Для начала перед проектированием системы нужно обозначить границы ответственности системы. Декодирование данных и модули аналитики, будет основной зоной ответственности, получение и отправка это второстепенные модули так как на задержки сети, сбои в работе API, повлиять сложно.

Структура тестового приложения

app/
├── include/
│   └── infra.h
├── plugins/
│   ├── input_plugin.c
|   ├── processing_plugin.c
|   └── output_plugin.c
├── config/
|   └── conf.json
├──  core/
|   └── main.c
└── MakeFile

app - корневая директория

include - общие заголовочные файлы

plugins - исходные файлы плагинов

config - шаблоны конфигов

core - содержит main.c

В директории include создаем общий заголовочный файл infra.h, в котором определим общие для всех модулей типы данных.
Определим в виде перечисления три типа узлов.

infra.h

typedef enum NODE_TYPE_C {
    INPUT_NODE,
    OUTPUT_NODE,
    PROCESSING_NODE

} NODE_TYPE_T;

Определим тип данных который мы будем передавать между узлами.
infra.h

// Пиксели изображения всегда в формате RGB
typedef struct matrix_s {
  unsigned int weight_;
  unsigned int height_;
  unsigned char *data_;

} matrix_t;

typedef struct data_s {
  matrix_t matrix_; // Изображение
  char *metadata_; // Метаданные для дальнейшего хранения результатов аналитики

} data_t;

Каждый узел обработки имеет два указателя prev, next указывающие на то куда передавать поток данных.Так же может потребоваться предварительная инициализацию плагина, для этого в качестве прототипов функций используются указатели, для определения сигнатуры и последующего полиморфного поведения узлов.

infra.h

typedef void (*init_function_t)(void); // Прототип функции инициализации
typedef unsigned short (*processing_function_t)(data_t**, unsigned short); // Прототип функции обработки

Ниже определена структура узла системы.

infra.h

typedef struct node_s {
  char *name_; // Уникальное имя узла
  NODE_TYPE_T type_; // Тип INPUT/OUTPUT/PROCESSING
  char *prev_; // Имя предыдущего узла, для INPUT: NULL
  char *next_; // Имя следующего узла, для OUTPUT: NULL

  init_function_t init; // Метод инициализации ноды
  processing_function_t processing; // Метод обработки ноды
  
} node_t;

Основной механизм подключения и удаления узлов в Linux - через разделяемые библиотеки(.so). В разных источниках такие библиотеки называют модулями или плагинами, ниже под этими понятиями будет подразумеваться разделяемая библиотека. В С/Linux для связывания во время выполнения, определения функций находятся в заголовочном файле dlfcn.h.

В каждом плагине нужно инициализировать структуру ноды которая будет хранить все необходимые поля и функцию для экспорта данной структуры. Для удобства определения заведем макрос которым, будем генерировать нужные сигнатуры. Чтобы избежать конфликтов в процессе динамического связывания все объявляемые глобальные переменные и функции нужно помечать как static.

infra.h

typedef node_t* (*get_node_structure)(void); // Сигнатура функции экспорта для получения структуры node_t из плагина

#define REGISTER_NODE(plugin_name, type, processing_function, init_function, prev, next) 
    static node_t node = {.name_ = plugin_name, .type_ = type, 
                          .init = init_function, .processing = processing_function, 
                          .prev_ = prev, .next_ = next};  
node_t* getnode_structure() { return &node; }

Плагин получения потока данных(для простоты опущены части кода обращения к API).

input_plugin.c

static void init(void) {

init_internal(); // Инициализируем внутренне состояние, подключаемся к API и т.д.
  log("Input plugin loaded");
}

static unsigned short processing(data_t **data, unsigned short count) {
  if(data == NULL) return 0;

  unsigned short i = 0;
  for (; i < 5; ++i) { // Получаем изображения по сети, не более 5, чтобы не тормозить обработку, в реальных условиях настройки количество батчей нужно выносить в API
    unsigned char *jpeg_image = get_image(); // Получаем изображения
    
    if (jpeg_image == NULL) break;
    decode_image(data[i], jpeg_image); // Декодируем и сохраняем
  
    free(jpeg_image);
  }

  return i; // Хорошей практикой для планировщика будет возвращать количество получаемых данных
}

REGISTER_NODE("input-node", INPUT_NODE, processing, init, NULL, "processing-node"); // Регистрируем ноду, определяем в какой узел передавать данные

Плагин обработки потока данных, строится аналогиным образом.
processing_plugin.c

static void init(void) {
    log("Processing plugin loaded");
}

static unsigned short processing(data_t **data, unsigned short count) {
  unsigned short i = 0; 
  for (; i < count; ++i) {
    unsigned int sz = data[i].width_ * data[i].height_; // Вычисляем размер матрицы
    
    for (unsigned int j = 0; j < sz; j += 3) { // Так как j будет всегда будет указывать на R, меняем местами значение пикселей
      unsigned char pix = data[i].data_[j];
      data[i].data_[j] = data[i].data_[j + 1];
      data[i].data_[j + 1] = pix;
    }
  }

  return i;  //  Возвращаем количество обработанных изображения
}

REGISTER_NODE("processing-node", PROCESSING_NODE, processing, init, "input-node", "output-node");

Плагин отправки данных(для упрощения опущены обращения к API).
output_plugin.c

static void init(void) {
    log("Output plugin loaded");
}

static unsigned short processing(data_t **data, unsigned short count) {
  unsigned short i = 0;

  for (; i < count; ++i) {
    save_data(data[i]); // Сохраняем изображение в формате BMP и metadata_ в csv
    free(data[i].data_);
    free(data[i].metadata_);
  }

  free(data);
  return i;
}

REGISTER_NODE("output-node", OUTPUT_NODE, processing, init, "pricessing-node", NULL);

Основной цикл обработки.
main.c

#define MAX_NODES (256) // Фиксируем максимальное количество узлов
node_t *nodes[MAX_NODES]; // Определяем массив узлов
memset(nodes, NULL, MAX_NODES);
/*Читаем из конфига или командной строки, 
 список разделяемых библиотек экспортируем из них узлы.*/
fill_nodes(argc, argv, nodes); // В нашем случае если прочитаем по порядку в массиве будут лежать три зарегистрированные ноды["input-node", "processing-node", "ouput-node"]
init_nodes(nodes); // Вызываем функции инициализации

// Заполняем порядок, ищем какая нода на какую ссылается по имени, и заполняем индексы
// Определяем порядок, какая нада какой будет передавать данные
unsigned char order[MAX_NODES]; 
unsigned char order_counter = fill_order(&order, nodes); // [0, 1, 2] - если прочитали в порядке как указано выше

// Обход узлов списка, запуск обработки
while(1) {
  data_t **data = NULL;
  unsigned short count = 0;

  for (unsigned short idx = 0; idx < order_counter; ++idx) {
        count = nodes[order[idx]]->processing(data, count);
        if (count == 0) {
          if (nodes[order[idx]].type_ != INPUT_NODE) {
            // Обработки исключительной ситуации
          }
          continue;
        }
    }
}

Критический недостаток данной реализации новое считвание не происходит пока данные не пройдут по всем нодам, иллюстрация ниже.

Еденичный опрос истоника

Еденичный опрос истоника

В реальных же условиях поток данных не прерывно считывается из источника(ов), это позволяет не прерывно опрашивать источник уменьшая задержки, как это показанно на иллюстрации ниже.

Множественный опрос источника

Множественный опрос источника

Так же данный пример не учитывает многопоточную среду выполнения, что является одним из основных драйверов роста производительности на современных архитектурах.

Вывод

В качестве итогов, будут перечисленны плюсы данных подходов к разработке систем потоковой обработки данных:

  1. Грамотно проектирования система передачи информации о модулях системе и планировщику, позволяет динамично менять порядок обработки не прерывая поток выполнения, или прерывая с минимальными потерями данных. Как пример приложение всегда может держать открытый сокет куда будет передаваться обновленная конфигурация графа и список плагинов с подключаемыми узлами.

  2. Простота в масштабировании подобных архитектур в многоядерные среды. Каждый поток выполнения держит свой граф узлов, и работает с данными не конкурируя за ресурсы с другими обработчиками.

  3. В современных серверных архитектурах достаточные объёмы данных помещаются в кеши, при хранении состояния между нодами, минимизируются промахи по кешам.

  4. Данные можно обрабатывать пачками(батчами), что дает возможность дальнейших оптимизаций, например векторизации.

Ниже в комментариях напишите была ли статья полезна и о вашем опыте работы с аналогичными системами, и проектированием их. Особенно интересен ваш опыт работы с GStreamer или VPP.

Автор: uEvg

Источник [1]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/linux/449639

Ссылки в тексте:

[1] Источник: https://habr.com/ru/articles/1023522/?utm_source=habrahabr&utm_medium=rss&utm_campaign=1023522