- PVSM.RU - https://www.pvsm.ru -
Сегодня я наткнулся на статью [1] за авторством @enamored_poc [2]. Увидев заголовок, я был в предвкушении: наконец-то кто-то взялся за этот гайд — в своё время я как раз искал что-то подобное. Однако, дочитав статью до конца, понял, что автор по сути просто пересказал раздел Bigger applications из официальной документации и лишь добавил пару замечаний оттуда же.
С одной стороны уже есть куча видео, статей, где можно изучить как пишутся правильно сервисы по всем заветам дядюшки Боба (автора "Чистой Архитектуры"). Но, с другой стороны, всё это обычно показывают на примере стандартного интернет-магазина или списка TODO, и в какой-то момент перестаёшь понимать, а зачем всё это вообще нужно.
Поэтому в этом гайде мы возьмём достаточно нетривиальную тему, которую я регулярно встречаю в вакансиях и на «галерах», и попробуем реализовать её на практике.
Владелец сети магазинов хочет загружать видео с камер видеонаблюдения на некоторый сервер. Это видео должно сохраняться в хранилище и покадрово обрабатываться нейросетями. В результате мы должны получить метаданные о том, сколько людей было в кадре, координаты их местоположения в кадре, а также их пол. Но больше всего владельцу нужно получать сводную информацию о том, сколько людей и какого пола фиксируется в разные моменты времени, чтобы он мог например менять выкладку продуктов. После сдачи MVP-версии заказчик может выдать нам контракт на дальнейшее развитие системы: анализ видео-трафика в реальном времени, распознавание лиц, выявление нарушителей, и, не дай бог, слежкой за сотрудниками и т.д.
Немного мотивации: в этой задачке (правда, не в этой части) мы затронем работу с RabbitMQ, Redis, S3, Postgres, а также пощупаем YOLO, opencv. Тем самым закроем большинство типичных требований к современным вакансиям. А еще эту задачку можно переложить на любую предметную область, связанную с видеоаналитикой.
Да, всё начинается с анализа предметной области, а не с создания структуры папок. Так что разложим задачку по полочкам.
На вход поступает видеофайл; при этом в будущем этот видеофайл будет загружаться в потоковом режиме или напрямую транслироваться с камеры видеонаблюдения.
Эти видеофайлы должны где-то и как-то храниться. Мы пока не можем спрогнозировать, сколько таких видео будет храниться, какие серверы готов выделить заказчик, какие требования будут к времени записи и чтения, будут ли SSD на этих серверах.
Все видеофайлы должны быть разбиты на кадры и скормлены нейросетям. Сейчас мы ничего не знаем о нейросетях (мы перекладыватели JSON, а не аналитики): о том, как быстро они способны обработать все кадры, как часто эти кадры нужно подсовывать и многое другое. Зато мы можем предположить, что пользователю не нужно ждать результата обработки в рамках одного запроса, так как его интересуют только аналитические данные.
Для каждого кадра нам необходимо выявить человека, его координаты в кадре, время и пол. В будущем заказчик хочет и распознавание воришек, а может, потом захочет следить и за кассирами, или за тем, какие тележки берут. Всё это тоже должно куда-то и в каком-то виде сохраняться. Мы слышали про NoSQL и Postgres, но пока не знаем, что лучше применить для будущих аналитических задач.
Вероятно, нужно подготовить REST API, которое позволит делать запросы о количестве людей в кадре и их поле, а также показывать красивые графики и таблицы для заказчика. Держим в голове, что позже появится ещё множество данных, которые он захочет получать.
Выглядит жутко, куча неизвестных... Хм.. С чего бы начать?? Может набросать endpoint-ы для загрузки видео? Или сделать endpoint "загрузить картинку", а потом запустить нейросеть, которую советует ChatGPT, все-равно же разбивать покадрово видео придется? А может спроектировать БД, где будут сохранятся методанные с видео, и уже от данных плясать?
НЕТ! На самом деле у нас уже есть всё необходимое, чтобы проектировать архитектуру, а все «неизвестные» — это всего лишь детали, которые будут сбивать вас с пути. Даже слово «endpoint» здесь лишнее, не говоря уже о FastAPI. Так что же делать дальше?
Как уже было сказано выше, у нас достаточно информации, чтобы выделить ключевые сущности домена. Начнём с самой базовой.
Источник записи. Нам важно сохранять информацию о том, откуда поступает видеофайл: связан ли он с одной конкретной камерой, группой камер или, например, с целой сетью магазинов. Пользователь может захотеть загружать как одиночные записи, так и десятки небольших роликов с одной и той же камеры.
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from uuid import UUID, uuid4
@dataclass(frozen=True)
class RecordingSourceId:
"""Уникальный идентификатор источника записи внутри нашей системы."""
value: UUID
@staticmethod
def new() -> "RecordingSourceId":
return RecordingSourceId(uuid4())
def __str__(self) -> str:
return str(self.value)
@dataclass(frozen=True)
class RecordingSourceExtra:
"""
Необязательные данные об источнике.
Сейчас здесь почти ничего нет, но именно сюда
позже поедут метаданные о местоположении камеры
"""
floor: Optional[int] = None
location_label: Optional[str] = None
meta: Dict[str, Any] = field(default_factory=dict)
class RecordingSource:
"""
Источник записи (камера/логический канал).
На уровне домена нам пока важно только то,
что есть его идентификатор и необязательные доп.данные.
"""
idx: RecordingSourceId
extra: RecordingSourceExtra = RecordingSourceExtra()
def __init__(
self,
idx: RecordingSourceId
extra: RecordingSourceExtra = RecordingSourceExtra()
) -> None:
self.idx: RecordingSourceId = idx
self.source_id: RecordingSourceExtra = extra
Видеофайл. Некоторый файл; при этом нам сейчас не важно, передан ли он нам сразу или записывается в потоковом режиме. Самое главное, что он точно должен содержать: уникальный идентификатор, время начала, уникальный идентификатор источника. При этом такие атрибуты, как время окончания, длительность, кодеки, размер кадра, уже будут опциональными и сейчас нам не особенно нужны.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, Any
from uuid import UUID, uuid4
from domain.entity.video_source import RecordingSourceId
@dataclass(frozen=True)
class VideoFileId:
"""Уникальный идентификатор видеозаписи внутри нашей системы."""
value: UUID
@staticmethod
def new() -> "VideoFileId":
return VideoFileId(uuid4())
def __str__(self) -> str:
return str(self.value)
@dataclass
class VideoFileExtra:
"""
Необязательные тех.детали видео.
Сейчас они нам не критичны, но сюда можно будет
добавить конец записи, кодеки, размер кадра и т.п.
"""
ended_at: Optional[datetime] = None
duration_seconds: Optional[float] = None
codec: Optional[str] = None
frame_width: Optional[int] = None
frame_height: Optional[int] = None
meta: Dict[str, Any] = field(default_factory=dict)
class VideoFile:
"""
Видеофайл/видеозапись.
Нас сейчас интересует:
- уникальный идентификатор видео;
- время начала записи;
- источник, с которого это видео пришло.
Остальное — опционально и уезжает в VideoFileExtra.
"""
def __init__(
self,
idx: VideoFileId,
source_id: "RecordingSourceId",
started_at: datetime,
extra: Optional[VideoFileExtra] = None,
) -> None:
self.idx: VideoFileId = idx
self.source_id: RecordingSourceId = source_id
self.started_at: datetime = started_at
self.extra: VideoFileExtra = extra or VideoFileExtra()
Кадр. Неважно, хранится ли он в виде картинки или нет, но у него точно будут следующие параметры: время, идентификатор видеофайла, сами данные в каком-то виде. Это всё, что нам пока нужно для идентификации; остальные атрибуты сейчас роли не играют.
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from domain.entity.video_file import VideoFileIdx
@dataclass(frozen=True)
class FrameData:
"""
Важный value object: «сырые» данные кадра.
Сейчас нам не важно, это байты картинки, JPEG/PNG, numpy-массив
или что-то ещё — главное, что это единый объект данных кадра.
"""
value: Any
class Frame:
"""
Один кадр видеозаписи.
Для идентификации кадра в домене нам сейчас достаточно:
- времени кадра;
- идентификатора видеозаписи;
- самих данных кадра (в каком-то виде).
Остальные атрибуты (номер кадра, размер, формат и т.п.)
считаем деталями реализации и пока в модель не тащим.
"""
def __init__(
self,
video_idx: "VideoFileIdx",
captured_at: datetime,
data: FrameData,
) -> None:
self.video_idx = video_idx
self.captured_at = captured_at
self.data = data
Объект. Как мы помним, заказчик на данном этапе требует просто находить людей в кадре, но в будущем это может быть что угодно (тележка, телефон в руках у кассира и т.д.). В кадрах мы будем идентифицировать именно объекты: у каждого объекта будет класс, дополнительные признаки (сейчас это только «пол»), а также ссылки на кадры, в которых этот объект был зафиксирован (заказчику ведь неинтересно, как клёво мы умеем работать с одной картинкой — ему нужны агрегированные данные).
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Dict, Any, List, Optional
from uuid import UUID, uuid4
from domain.entity.video_file import VideoFileIdx
@dataclass(frozen=True)
class ObjectIdx:
"""Уникальный идентификатор объекта (человек, тележка, телефон и т.д.)."""
value: UUID
@staticmethod
def new() -> "ObjectIdx":
return ObjectIdx(uuid4())
def __str__(self) -> str:
return str(self.value)
@dataclass(frozen=True)
class ObjectClass:
"""
Класс объекта:
- сейчас нас интересует 'person',
- в будущем: 'cart', 'phone', 'employee', 'thief' и т.д.
"""
value: str # например: "person", "cart", "phone"
@dataclass
class ObjectAttributes:
"""
Дополнительные признаки объекта.
Их мы будем заполнять пока в виде словаря,
но потом мы сможем лучше конкретизировать эти объекты.
Главное, что нам из-за этих изменений не придется переделывать
саму сущность.
"""
meta: Dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class Coordinate:
"""
Координаты объекта в кадре
"""
x: int
y: int
width: int
height: int
@dataclass(frozen=True)
class FrameRef:
"""
Ссылка на кадр, в котором объект был зафиксирован.
Мы не навязываем отдельный идентификатор кадра, а ссылаемся
через пару (video_idx, captured_at), чего достаточно в домене,
чтобы однозначно указать кадр.
"""
video_idx: "VideoFileIdx"
captured_at: datetime
coordinate: Coordinate
# === Сущность ===
class DetectedObject:
"""
Объект, обнаруженный в видеопотоке.
На уровне домена нас интересует:
- уникальный идентификатор объекта;
- его класс (person/cart/phone/…);
- дополнительные признаки (сейчас — пол);
- список кадров, в которых этот объект встречается
(для дальнейшей агрегации и аналитики).
"""
def __init__(
self,
idx: ObjectIdx,
object_class: ObjectClass,
attributes: Optional[ObjectAttributes] = None,
frames: Optional[List[FrameRef]] = None,
) -> None:
self.idx = idx
self.object_class = object_class
self.attributes = attributes or ObjectAttributes()
self.frames: List[FrameRef] = frames or []
Отчет. Результат для заказчика. Конечно, его можно формировать «на лету», но вдруг камеры и нейросети будут работать супербыстро, и мы получим 140 кадров в секунду. Тогда для формирования любого отчёта нам придётся каждый раз пережевывать большой объём данных. Поэтому набросаем, что нам нужно заранее: временные промежутки, вид отчёта (количество людей, половой состав). Ещё не хватает пруфов: заказчику нужно будет доказать работоспособность нашего сервиса, поэтому добавляем ссылки на источники, видеофайлы и объекты.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Any, List
from uuid import UUID, uuid4
from domain.entity.recording_source import RecordingSourceIdx
from domain.entity.video_file import VideoFileIdx
from domain.entity.detected_object import ObjectIdx
@dataclass(frozen=True)
class ReportIdx:
"""Уникальный идентификатор отчёта."""
value: UUID
@staticmethod
def new() -> "ReportIdx":
return ReportIdx(uuid4())
def __str__(self) -> str:
return str(self.value)
@dataclass(frozen=True)
class ReportType:
"""
Тип отчёта:
- сейчас достаточно, например, 'people_count', 'gender_distribution';
- в будущем: 'traffic_heatmap', 'theft_suspects', 'queue_length' и т.д.
"""
value: str
@dataclass(frozen=True)
class TimeRange:
"""Временной промежуток, за который строится отчёт."""
start: datetime
end: datetime
@dataclass
class ReportData:
"""
Агрегированные данные отчёта.
Сейчас оставляем это в виде произвольного словаря,
чтобы не блокировать развитие. Когда домен стабилизируется,
из meta можно будет вытащить отдельные данные
"""
meta: Dict[str, Any] = field(default_factory=dict)
# === Сущность ===
class Report:
"""
Отчёт для заказчика.
"""
def __init__(
self,
idx: ReportIdx,
report_type: ReportType,
time_range: TimeRange,
data: ReportData,
source_idx_list: List["RecordingSourceIdx"],
video_idx_list: List["VideoFileIdx"],
object_idx_list: List["ObjectIdx"],
) -> None:
self.idx = idx
self.report_type = report_type
self.time_range = time_range
self.data = data
self.source_idx_list = list(source_idx_list)
self.video_idx_list = list(video_idx_list)
self.object_idx_list = list(object_idx_list)
Ну всё, вроде бы понятно, ведь у нас есть набор сущностей. Следующий шаг: написать вокруг этого эндпоинты или даже набросать БД, раз всё это уже можно сохранять. Опять нет!
Сначала попробуем выделить всё, что происходит вокруг этих сущностей: какие действия с ними выполняются и какие задачи решают разные части системы. На этом этапе мы будем определять сервисы. А чтобы описать их поведение абстрактно, нам поможет ABC из стандартной библиотеки, позволяющий задавать интерфейсы через абстрактные базовые классы. Начнём:
Сервис сохранения файлов. Задача нетривиальная: можно записывать файл в потоковом режиме, можно сразу копировать его на диск, но это детали — сейчас они нам не нужны. Самое главное — результат: файл сохранён или нет.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, BinaryIO
from domain.entity.video_file import VideoFileIdx
@dataclass(frozen=True)
class FileLocation:
"""
Абстрактное местоположение файла.
Это может быть путь на диске, ключ в объектном хранилище,
URL и т.д. Конкретику решит инфраструктура.
"""
value: str
@dataclass(frozen=True)
class FileSaveResult:
"""
Результат сохранения файла.
Главное — удалось или нет. Остальное опционально.
"""
success: bool
location: Optional[FileLocation] = None
error_message: Optional[str] = None
class FileStorage(ABC):
"""
Абстрактный сервис сохранения файлов.
Детали (стриминг, буферизация, ретраи, конкретное хранилище)
остаются за пределами домена. Здесь нас интересует только:
«попробуй сохранить» → FileSaveResult.
"""
@abstractmethod
def save_video_content(
self,
video_idx: "VideoFileIdx",
content: BinaryIO,
) -> FileSaveResult:
"""
Сохранить бинарное содержимое видеозаписи,
связав его с доменной сущностью VideoFile.
Аргумент `content` — абстрактный поток байт:
это может быть открытый файл, сетевой стрим и т.п.
"""
raise NotImplementedError
Сервис нарезки кадров из видео. Опять же, мы не знаем, какие-либо библиотеки существуют для этого, какой размер кадра нам нужен на выходе и т.д. Но зато мы знаем, что в результате работы сервиса мы должны получить исчерпывающие данные для формирования сущности «Кадр».
from abc import ABC, abstractmethod
from typing import Iterable
from domain.entity.video_file import VideoFileIdx
from domain.entity.frame import Frame
class FrameExtractor(ABC):
"""
Абстрактный сервис нарезки кадров из видео.
На уровне домена нас интересует только то, что
на вход подаётся идентификатор видеозаписи, а на выходе
мы получаем набор сущностей Frame, достаточный для
дальнейшей обработки и аналитики.
"""
@abstractmethod
def extract_frames(self, video_idx: "VideoFileIdx") -> Iterable["Frame"]:
"""
Извлечь кадры для указанного видео.
Конкретная реализация сама решает:
- как прочитать файл (локальный диск, object storage и т.п.);
- как именно и с какими параметрами вырезать кадры.
"""
raise NotImplementedError
Сервис обработки кадров. Мы не знаем, какие именно нейросети нужны; возможно, придётся запускать несколько моделей друг за другом. Но мы точно можем сказать, что на выходе должны быть получены списки классов, признаков и координат в кадре. Эти данные, в свою очередь, нужны для создания и обновления сущности «Объект».
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
from domain.entity.frame import Frame
from domain.entity.detected_object import ObjectClass, ObjectAttributes, Coordinate
@dataclass(frozen=True)
class RawDetection:
"""
Результат работы нейросетей по одному объекту на кадре.
На уровне сервиса обработки кадров нам достаточно:
- класса объекта;
- набора признаков (пока в виде ObjectAttributes);
- координат объекта в кадре.
"""
object_class: "ObjectClass"
attributes: "ObjectAttributes"
coordinate: "Coordinate"
class FrameProcessor(ABC):
"""
Абстрактный сервис обработки кадров нейросетями.
Мы не знаем:
- какие именно нейросети будут использоваться;
- сколько их будет и в каком порядке они запустятся;
- на каком железе это всё будет крутиться.
"""
@abstractmethod
def process_frame(self, frame: "Frame") -> List[RawDetection]:
"""
Обработать кадр и вернуть список найденных объектов
с их классами, признаками и координатами.
"""
raise NotImplementedError
Сервис классификации объектов. Проблема в том, что кадров у нас много, и объекты на кадрах нужно как-то связать. Мы пока не знаем как, но, как подсказывает логика, следует анализировать предыдущие кадры и пытаться понять, нужно ли создавать новый объект или можно обновить существующий.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
from domain.entity.frame import Frame
from domain.entity.detected_object import ObjectIdx, FrameRef
from domain.service.frame_processor import RawDetection
@dataclass(frozen=True)
class ObjectClassificationDecision:
"""
Результат классификации одного «сыро обнаруженного» объекта.
На уровне домена нас интересует:
- какой доменной сущности DetectedObject он соответствует
(существующий объект или новый);
- в каком кадре и с какими координатами он был обнаружен;
- исходные данные детекции (класс, признаки, координаты).
"""
existing_object_idx: Optional["ObjectIdx"] # None, если объект новый
frame_ref: "FrameRef"
raw_detection: "RawDetection"
@property
def is_new(self) -> bool:
"""True, если по этой детекции нужно создать новый объект."""
return self.existing_object_idx is None
class ObjectClassifier(ABC):
"""
Абстрактный сервис классификации/сопоставления объектов по кадрам.
Его задача — только принять решение:
- к какому уже существующему ObjectIdx отнести детекцию;
- по каким детекциям нужно создать НОВЫЙ объект.
Как именно это делается (трекеры, эвклидово расстояние, нейросети и т.п.) —
детализация инфраструктуры, домен этого не знает.
"""
@abstractmethod
def classify_objects(
self,
frame: "Frame",
detections: List["RawDetection"],
) -> List[ObjectClassificationDecision]:
"""
На основании набора детекций на кадре решить,
какие из них принадлежат уже существующим объектам,
а для каких потребуется создать новый объект.
"""
raise NotImplementedError
Сервис анализа найденных объектов. Заказчику всё-таки нужно получать аналитические данные, и мы уже определились, что формировать их «по запросу» не стоит. Поэтому нам нужен сервис, который анализирует уникальные объекты и старается добыть данные, необходимые для построения отчёта.
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, Any, List
from domain.entity.detected_object import DetectedObject, ObjectIdx
from domain.entity.video_file import VideoFileIdx
from domain.entity.recording_source import RecordingSourceIdx
@dataclass
class ObjectAnalyticsResult:
"""
Результат анализа уникальных объектов.
Здесь мы собираем всё, что нужно для дальнейшего построения отчёта:
- агрегированные метрики (meta),
- «пруфы» — ссылки на источники, видео и объекты.
"""
meta: Dict[str, Any] = field(default_factory=dict)
source_idx_list: List["RecordingSourceIdx"] = field(default_factory=list)
video_idx_list: List["VideoFileIdx"] = field(default_factory=list)
object_idx_list: List["ObjectIdx"] = field(default_factory=list)
class ObjectAnalyzer(ABC):
"""
Абстрактный сервис анализа найденных (уже классифицированных) объектов.
Его задача:
- взять набор уникальных объектов (DetectedObject),
- «переварить» их,
- выдать агрегированные данные, которые потом
пойдут в построение доменной сущности отчёта.
ВАЖНО: сервис ничего не знает о том, как будут выглядеть
конкретные отчёты, он лишь добывает сырые аналитические данные
и пруфы.
"""
@abstractmethod
def analyze_objects(
self,
objects: List["DetectedObject"],
) -> ObjectAnalyticsResult:
"""
Проанализировать набор уникальных объектов и вернуть
агрегированную информацию, пригодную для построения отчёта.
"""
raise NotImplementedError
Теперь поговорим о сохранении, извлечении и поиске сущностей. И здесь нас снова выручает модуль abc, с помощью которого мы напишем абстрактный интерфейс для работы с хранилищем (Repository):
Репозиторий источников. Нам нужно всё-таки вести учёт источников и связывать их с видеофайлами.
from abc import ABC, abstractmethod
from typing import Optional, List
from domain.entity.recording_source import RecordingSource, RecordingSourceIdx
class RecordingSourceRepository(ABC):
"""
Абстрактный репозиторий источников записи.
Детали хранилища (Postgres, NoSQL, файлики, in-memory)
здесь не определяются.
"""
@abstractmethod
def save(self, source: "RecordingSource") -> None:
"""
Сохранить или обновить источник записи.
"""
raise NotImplementedError
@abstractmethod
def get_by_idx(self, idx: "RecordingSourceIdx") -> Optional["RecordingSource"]:
"""
Получить источник по его доменному идентификатору.
"""
raise NotImplementedError
@abstractmethod
def list_all(self) -> List["RecordingSource"]:
"""
Вернуть все известные источники.
(Пагинация уже как домашнее задание)
"""
raise NotImplementedError
Репозиторий видеофайлов. Должен позволять сохранять важные данные о сущности «Видеофайл», включая её идентификатор, чтобы мы могли легко находить видеофайлы и работать с ними из сервиса сохранения видеофайлов.
from abc import ABC, abstractmethod
from typing import Optional, List
from domain.entity.video_file import VideoFile, VideoFileIdx
from domain.entity.recording_source import RecordingSourceIdx
class VideoFileRepository(ABC):
"""
Абстрактный репозиторий видеозаписей.
Задачи:
- сохранить/обновить сущность VideoFile;
- уметь по доменному идентификатору VideoFileIdx быстро её найти;
- при необходимости находить все видео, привязанные к источнику.
"""
@abstractmethod
def save(self, video: "VideoFile") -> None:
"""
Сохранить или обновить видеозапись.
"""
raise NotImplementedError
@abstractmethod
def get_by_idx(self, idx: "VideoFileIdx") -> Optional["VideoFile"]:
"""
Найти видеозапись по её доменному идентификатору.
"""
raise NotImplementedError
@abstractmethod
def list_by_source_idx(self, source_idx: "RecordingSourceIdx") -> List["VideoFile"]:
"""
Получить все видеозаписи, пришедшие с указанного источника.
"""
raise NotImplementedError
Репозиторий кадров. ММы хотим сохранять пруфы нашей работы, поэтому не забываем и про хранилище кадров, которые уже были проанализированы. При этом, нужно ли сохранять сами бинарные данные кадров, можно решить позже.
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional, List
from domain.entity.frame import Frame
from domain.entity.video_file import VideoFileIdx
class FrameRepository(ABC):
"""
Абстрактный репозиторий кадров.
Задачи:
- сохранять проанализированные кадры как «пруф» работы системы;
- уметь находить кадры по видеозаписи и времени;
- уметь получать набор кадров по видеозаписи и/или диапазону времени.
"""
@abstractmethod
def save(self, frame: "Frame") -> None:
"""
Сохранить или обновить кадр.
"""
raise NotImplementedError
@abstractmethod
def get_by_video_and_time(
self,
video_idx: "VideoFileIdx",
captured_at: datetime,
) -> Optional["Frame"]:
"""
Найти конкретный кадр по видеозаписи и времени съёмки.
Эта пара (video_idx, captured_at).
"""
raise NotImplementedError
@abstractmethod
def list_by_video_idx(
self,
video_idx: "VideoFileIdx",
*,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> List["Frame"]:
"""
Получить кадры для указанного видео.
"""
raise NotImplementedError
Репозиторий объектов. Нужен набор методов для сохранения объектов и их поиска по различным параметрам, которые могут потребоваться сервисам.
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any
from domain.entity.detected_object import DetectedObject, ObjectIdx, ObjectClass
from domain.entity.recording_source import RecordingSourceIdx
from domain.entity.video_file import VideoFileIdx
@dataclass
class ObjectSearchCriteria:
"""
Критерии поиска объектов.
Это абстрактный фильтр, который можно постепенно расширять
по мере появления новых сценариев в сервисах.
"""
object_classes: Optional[List["ObjectClass"]] = None
source_idx: Optional["RecordingSourceIdx"] = None
video_idx: Optional["VideoFileIdx"] = None
# Временной диапазон, в котором объект хоть раз появлялся
appeared_from: Optional[datetime] = None
appeared_to: Optional[datetime] = None
# Фильтрация по признакам (ключи/значения в ObjectAttributes.meta)
attributes: Dict[str, Any] = field(default_factory=dict)
class ObjectRepository(ABC):
"""
Абстрактный репозиторий объектов (DetectedObject).
"""
@abstractmethod
def save(self, obj: "DetectedObject") -> None:
"""
Сохранить или обновить объект.
"""
raise NotImplementedError
@abstractmethod
def get_by_idx(self, idx: "ObjectIdx") -> Optional["DetectedObject"]:
"""
Получить объект по доменному идентификатору.
"""
raise NotImplementedError
@abstractmethod
def search(
self,
criteria: ObjectSearchCriteria,
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List["DetectedObject"]:
"""
Поиск объектов по набору критериев.
"""
raise NotImplementedError
Репозиторий отчетов. Сохраняем отчёты; при этом мы пока даже не выбираем временные промежутки их хранения — нам важен только результат. Также не забываем про методы извлечения отчётов.
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional, List
from domain.entity.report import Report, ReportIdx, ReportType
class ReportRepository(ABC):
"""
Абстрактный репозиторий отчётов.
"""
@abstractmethod
def save(self, report: "Report") -> None:
"""
Сохранить или обновить отчёт.
"""
raise NotImplementedError
@abstractmethod
def get_by_idx(self, idx: "ReportIdx") -> Optional["Report"]:
"""
Получить отчёт по его доменному идентификатору.
"""
raise NotImplementedError
@abstractmethod
def list_by_type(
self,
report_type: "ReportType",
*,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List["Report"]:
"""
Получить отчёты указанного типа.
Временной промежуток задаёт интересующий нас диапазон
по времени, за который построены отчёты.
Реализация сама решает:
- как интерпретировать start_time/end_time относительно
поля TimeRange у отчёта;
- как оптимизировать выборку и пагинацию.
"""
raise NotImplementedError
@abstractmethod
def list_all(
self,
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List["Report"]:
"""
Вернуть все отчёты (с возможной пагинацией).
"""
raise NotImplementedError
Вообще, сущности хорошо бы не создавать «голыми руками». Поэтому лучше сразу позаботиться о фабриках и не конструировать их напрямую. Да, сначала это кажется лишней работой, но по мере усложнения сущностей вы всё равно к этому придёте: формат входных данных будет меняться. Давайте сразу заложим фабрики для всех ключевых сущностей.
from dataclasses import dataclass
from typing import Optional
from domain.entity.recording_source import (
RecordingSource,
RecordingSourceIdx,
RecordingSourceExtra,
)
@dataclass
class RecordingSourceFactory:
"""
Фабрика источников записи.
"""
def create(
self,
extra: Optional["RecordingSourceExtra"] = None,
) -> "RecordingSource":
idx = RecordingSourceIdx.new()
return RecordingSource(
idx=idx,
extra=extra or "RecordingSourceExtra"()
)
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from domain.entity.video_file import VideoFile, VideoFileIdx, VideoFileExtra
from domain.entity.recording_source import RecordingSourceIdx
@dataclass
class VideoFileFactory:
"""
Фабрика видеозаписей.
"""
def create(
self,
source_idx: RecordingSourceIdx,
started_at: datetime,
extra: Optional["VideoFileExtra"] = None,
) -> "VideoFile":
video_idx = VideoFileIdx.new()
return VideoFile(
idx=video_idx,
source_idx=source_idx,
started_at=started_at,
extra=extra or VideoFileExtra(),
)
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from domain.entity.frame import Frame, FrameData
from domain.entity.video_file import VideoFileIdx
@dataclass
class FrameFactory:
"""
Фабрика кадров.
"""
def create(
self,
video_idx: VideoFileIdx,
captured_at: datetime,
raw_data: Any,
) -> "Frame":
frame_data = FrameData(value=raw_data)
return Frame(
video_idx=video_idx,
captured_at=captured_at,
data=frame_data,
)
from dataclasses import dataclass
from typing import Optional
from domain.entity.detected_object import (
DetectedObject,
ObjectIdx,
ObjectClass,
ObjectAttributes,
FrameRef,
)
@dataclass
class DetectedObjectFactory:
"""
Фабрика доменных объектов (DetectedObject).
"""
def create_new(
self,
object_class: ObjectClass,
attributes: Optional[ObjectAttributes] = None,
first_frame: Optional[FrameRef] = None,
) -> DetectedObject:
idx = ObjectIdx.new()
frames = [first_frame] if first_frame is not None else None
return DetectedObject(
idx=idx,
object_class=object_class,
attributes=attributes or ObjectAttributes(),
frames=frames,
)
from dataclasses import dataclass
from domain.entity.report import (
Report,
ReportIdx,
ReportType,
TimeRange,
ReportData,
)
from domain.service.object_analyzer import ObjectAnalyticsResult
@dataclass
class ReportFactory:
"""
Фабрика отчётов.
"""
def create_from_analytics(
self,
report_type: ReportType,
time_range: TimeRange,
analytics: ObjectAnalyticsResult,
) -> Report:
idx = ReportIdx.new()
data = ReportData(meta=analytics.meta)
return Report(
idx=idx,
report_type=report_type,
time_range=time_range,
data=data,
source_idx_list=analytics.source_idx_list,
video_idx_list=analytics.video_idx_list,
object_idx_list=analytics.object_idx_list,
)
Вроде всё готово, но наши сущности сейчас — это не просто «мешки с данными», которые сами по себе ничего не делают. Можно, конечно, добавить им простые сеттеры и геттеры, но тогда они останутся просто контейнерами для данных и не будут отражать поведение объектов и их возможности. А нам ведь нужно в будущем поддерживать систему, поэтому лучше сразу думать о том, какие операции должны жить внутри самих сущностей.
Видеоисточник.
class RecordingSource:
# старый код
def update_extra(self, extra: RecordingSourceExtra) -> None:
"""
Обновить дополнительные данные об источнике.
"""
self.extra = extra
Видеофайл.
class VideoFile:
# старый код
def mark_ended(self, ended_at: Optional[datetime] = None) -> None:
"""
Пометить видеозапись как завершённую.
"""
new_ended_at = ended_at or datetime.utcnow()
if new_ended_at < self.started_at:
raise ValueError("ended_at cannot be earlier than started_at")
if self.extra.ended_at is not None and new_ended_at <= self.extra.ended_at:
return
self.extra.ended_at = new_ended_at
self.extra.duration_seconds = (new_ended_at - self.started_at).total_seconds()
def update_duration(self, duration_seconds: float) -> None:
"""
Обновить длительность видеозаписи в секундах.
"""
if duration_seconds < 0:
raise ValueError("duration_seconds cannot be negative")
self.extra.duration_seconds = duration_seconds
def update_technical_info(
self,
*,
codec: Optional[str] = None,
frame_width: Optional[int] = None,
frame_height: Optional[int] = None,
meta: Optional[Dict[str, Any]] = None,
) -> None:
"""
Обновить технические параметры видеозаписи.
"""
if frame_width is not None and frame_width <= 0:
raise ValueError("frame_width must be positive")
if frame_height is not None and frame_height <= 0:
raise ValueError("frame_height must be positive")
if codec is not None:
self.extra.codec = codec
if frame_width is not None:
self.extra.frame_width = frame_width
if frame_height is not None:
self.extra.frame_height = frame_height
if meta:
self.extra.meta.update(meta)
def update_extra(self, extra: VideoFileExtra) -> None:
"""
Полностью заменить объект с дополнительными данными.
"""
self.extra = extra
Кадр
class Frame:
# старый код
def update_data(self, data: FrameData) -> None:
"""
Заменить данные кадра.
Смысл:
- кадр мог быть перекодирован,
- к нему могла быть применена анонимизация/маскирование,
- данные могли быть уменьшены (даунскейл, JPEG и т.п.).
Вместо прямого присваивания снаружи, всё изменение «сырых» данных
кадра проходит через этот метод.
"""
self.data = data
def is_in_time_range(self, start: datetime, end: datetime) -> bool:
"""
Проверить, попадает ли кадр во временной интервал [start, end).
"""
return start <= self.captured_at < end
Объект
class DetectedObject:
# старый код
def add_frame(self, frame_ref: FrameRef) -> None:
"""
Зафиксировать, что объект был замечен ещё в одном кадре.
"""
self.frames.append(frame_ref)
def last_seen_at(self) -> Optional[datetime]:
"""
Вернуть момент времени, когда объект был замечен последним.
"""
if not self.frames:
return None
return max(ref.captured_at for ref in self.frames)
def was_seen_in_range(self, start: datetime, end: datetime) -> bool:
"""
Проверить, встречался ли объект в интервале [start, end).
"""
return any(start <= ref.captured_at < end for ref in self.frames)
def update_attributes(self, meta: Dict[str, Any]) -> None:
"""
Обновить признаки объекта (attributes.meta).
"""
self.attributes.meta.update(meta)
Отчет
class Report:
# старый код
def covers_moment(self, moment: datetime) -> bool:
"""
Проверить, относится ли указанный момент ко времени этого отчёта.
"""
return self.time_range.start <= moment < self.time_range.end
def overlaps_with(self, start: datetime, end: datetime) -> bool:
"""
Проверить, пересекается ли отчёт с временным интервалом [start, end).
"""
r_start = self.time_range.start
r_end = self.time_range.end
return not (end <= r_start or r_end <= start)
def add_source_idx(self, source_idx: "RecordingSourceIdx") -> None:
"""
Добавить источник в список «пруфов» отчёта.
"""
if source_idx not in self.source_idx_list:
self.source_idx_list.append(source_idx)
def add_video_idx(self, video_idx: "VideoFileIdx") -> None:
"""
Добавить видеозапись в список «пруфов» отчёта.
"""
if video_idx not in self.video_idx_list:
self.video_idx_list.append(video_idx)
def add_object_idx(self, object_idx: "ObjectIdx") -> None:
"""
Добавить объект в список «пруфов» отчёта.
"""
if object_idx not in self.object_idx_list:
self.object_idx_list.append(object_idx)
На данном этапе мы ужасно далеки от создания чего-то работоспособного. Тогда зачем всё это нужно?
Мы уже понимаем, что нам нужно делать, и можем прогнозировать, как все компоненты будут взаимодействовать друг с другом.
У нас сейчас нет привязки к конкретным технологиям, мы даже не приступали к их анализу, но ЗАТО мы уже можем поэтапно разобрать обработку одного видеофайла.
Наша архитектура может быть легко расширена под новые требования, когда заказчик согласится продолжить работу с нами.
Мы можем без труда написать реализации наших абстрактных сервисов и репозиториев, используя встроенные типы данных (dict, list) или модули для работы с файлами.
Таким образом, у нас появился домен (Domain), который описывает предметную область нашей задачки и является центром всей системы: именно здесь задаются бизнес-правила и определяется поведение объектов. Далее нам предстоит описать, как компоненты домена взаимодействуют друг с другом (слой Application), — но об этом уже в следующей статье (если, конечно, она вам зайдёт).
P.S. Я намеренно полностью или почти опустил такие важные компоненты, как объекты-значения, доменные события, агрегаты, bounded context и т.п. По моему мнению, это тяжеловато даже для многих специалистов, а у нас всё-таки гайд для новичков. Но какие-то моменты постараюсь осветить в следующей статье.
P.P.S. Это моя первая статья, так что не бейте сильно. С удовольствием почитаю ваши пожелания и всё исправлю/дополню. Сразу оговорюсь: кода много, а старые проекты куда-то затерялись, поэтому в кодогенерации не обошлось без помощи GPT. Заранее спасибо за понимание.
Автор: lackyboye14
Источник [3]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/arhitektura/437823
Ссылки в тексте:
[1] на статью: https://habr.com/ru/articles/970798/
[2] @enamored_poc: https://www.pvsm.ru/users/enamored_poc
[3] Источник: https://habr.com/ru/articles/972642/?utm_source=habrahabr&utm_medium=rss&utm_campaign=972642
Нажмите здесь для печати.