- PVSM.RU - https://www.pvsm.ru -
Что ж... Недавно я увлекся C++, поэтому давайте разберемся в какой-нибудь технологии и напишем по ней статью. Мой выбор пал на WebRTC и клиент на Qt.
Начнем с теории и обозначим фронт работ.
Видео занимает значительную долю интернет-трафика. Ролики с котиками, созвоны с коллегами, скучные доклады с конференций - все это "тяжелый" контент, нагружающий сетевой канал. Чтобы накладные расходы на передачу были минимальны, поставщики видео зачастую предпочитают использовать UDP вместо TCP.
Идем дальше. Клиенты должны как-то коммуницировать: обмениваться данными о поддерживаемых кодеках, сообщать, куда отправлять трафик, слать отчеты о приеме-передаче (RR - Receiver Report, SR - Sender Report) и так далее. Здесь вводятся такие понятия, как SDP, STUN/TURN, ICE, Signalling Server, SFU/MCU.
Кратко:
SDP - дескриптор, описывающий медиасессию.
STUN/TURN - серверы, позволяющие клиентам находить друг друга и обмениваться трафиком.
ICE - протокол, поддерживающий соединение в рабочем состоянии.
Signalling Server - сервер, координирующий весь процесс. Обычно его совмещают с SFU/MCU - промежуточными узлами (middleboxes), которые транслируют трафик между клиентами. Мы, как разработчики, обитаем именно здесь.
Конечно можно строить полную mesh сетку между клиентами, но тогда логика становится в разы сложнее. Для более подробного ознакомления советую прочитать набор статей WebRTC For The Curious
Для примера - один поток 1080p 30fps (кодек vp8) потребляет примерно 600 Кб/сек. Но это только если ты не отправляешь опорные кадры (те, что кодируют картинку целиком, а не только изменения). Если же в сети наблюдаются большие потери пакетов, то получатель может и запросить эти самые опорные кадры заново (PLI запрос), что увеличит нагрузку на сеть в разы.
Также не забываем про джиттер, который будет будет нарушать порядок прихода этих пакетов. Для перезапроса утерянных пакетов используется RTCP фидбек механизм - NACK (Negative ack), а для борьбы с джитером - буфер, который так и называется - jitter buffer
Итого, выделяем пространство для работы:
Пишем Signaling / MCU Server
Пишем механизм запроса утерянных пакетов
Пишем jitterbuffer на клиенте
Сервер мы будем писать на Go с использованием библиотеки Pion. На хабре уже есть инструкция [1] по написанию сервера, повторяться не будем, только уточним некоторые детали.
1) Добавим поддержку NACK. В Pion это делается просто, подключаем дефолтные интерцепторы.
type Controller interface {
HandleConnection(c *common.SafeWebSocket)
JoinRoom(peer *common.Peer, msg Msg) error
LeaveRoom(peer *common.Peer, msg Msg) error
}
type controller struct {
logger *zap.Logger
roomRepo repository.RoomRepo
api *webrtc.API
}
func NewController(logger *zap.Logger, roomRepo repository.RoomRepo) Controller {
settingEngine := webrtc.SettingEngine{}
settingEngine.SetAnsweringDTLSRole(webrtc.DTLSRoleServer)
mediaEngine := &webrtc.MediaEngine{}
mediaEngine.RegisterDefaultCodecs()
interseporRegistry := interceptor.Registry{}
// Вот здесь!
if err := webrtc.RegisterDefaultInterceptorsWithOptions(mediaEngine, &interseporRegistry,
webrtc.WithNackGeneratorOptions(nack.GeneratorSize(8192)),
webrtc.WithNackResponderOptions(nack.ResponderSize(8192)),
); err != nil {
logger.Error("failed to register interceptor", zap.Error(err))
panic(err)
}
api := webrtc.NewAPI(
webrtc.WithMediaEngine(mediaEngine),
webrtc.WithSettingEngine(settingEngine),
webrtc.WithInterceptorRegistry(&interseporRegistry),
)
ctrl := &controller{
api: api,
logger: logger,
roomRepo: roomRepo,
}
go func() { // каждые две секунды отправляем RTCP запрос на I-frame
ticker := time.NewTicker(2 * time.Second)
for _ = range ticker.C {
roomIds := ctrl.roomRepo.GetRooms()
for _, roomId := range roomIds {
go ctrl.dispatch(roomId)
}
}
}()
return ctrl
}
В моих тестах pion nack generator терял некоторые пакеты, если вам удастся найти причину этого дайте знать
2) Сымитируем потерю и задержку пакетов через tc утилитку
sudo tc qdisc add dev lo root netem delay 50ms 20ms loss 1%
Для нашего разбора основным поставщиком видео будет браузер. Тут ничего интересного нет, WebRPC API за нас делает всю тяжелую работу. Исходный код можно посмотреть тут [2]
Запускает простой http сервер
python3 -m http.server 8080
Открываем несколько вкладок localhost:8080, и запускаем несколько видео потоков
Куда интереснее C++ клиент. К сожалению мы не можем (или не хотим 🙃 ) бандлить в наш игрушечный клиент целиком ядро хромиум, поэтому обойдемся инструментами попроще. К счастью за нас уже были написаны куча альтернатив. Мой выбор пал на libdatachannel [3], как легковесная и простая альтернатива libwebrtc от google. А в качестве GUI фреймворка используем Qt
Подключение клиента. Устанавливаем callback функции для нашего клиента
void ConferenceClient::connectClient(QString url, QString roomId)
{
rtc::InitLogger(rtc::LogLevel::Debug);
this->pc.onLocalDescription(this->pcOnLocalDescription(roomId));
this->pc.onLocalCandidate(this->pcOnLocalCandidate());
this->pc.onGatheringStateChange(this->pcOnGatheringStateChange());
this->pc.onIceStateChange([](rtc::PeerConnection::IceState state) {
std::cout << "Ice state changed: " << state << std::endl;
});
this->pc.onStateChange([](rtc::PeerConnection::State state) {
std::cout << "state changed: " << state << std::endl;
});
this->ws.onOpen(this->wsOnOpen(roomId));
this->ws.onMessage(this->wsOnMessage());
this->pc.onTrack(this->pcOnTrack());
this->ws.open(url.toStdString());
}
Обработка нового трека
std::function<void(std::shared_ptr<rtc::Track>)> ConferenceClient::pcOnTrack() {
return [this](std::shared_ptr<rtc::Track> track) {
auto mid = track->description().mid();
this->track_index[mid]
= {track, 0, "NO_VALUE", 0, 0, LRUCache<std::uint32_t, jitterbuffer>(256)};
this->player->initMid(mid);
bool isVideo = true;
if (track->description().type() == "audio") {
isVideo = false;
track->setMediaHandler(std::make_shared<rtc::OpusRtpDepacketizer>());
track->chainMediaHandler(std::make_shared<rtc::RtcpReceivingSession>());
track->onFrame(this->trackOnFrame(mid, isVideo));
} else {
track->onMessage(this->pcOnMessage(mid));
}
track->onOpen([track]() { track->requestKeyframe(); });
track->onClosed([this, mid]() { this->player->destroy(mid); });
};
}
Прошу заметить, поскольку в libdatachannel нет механизмов jitter buffer и nack генерации, эти механизмы будем реализовывать самостоятельно. Следовательно, поэтому if блок (строка 18) для видео содержит обработку по пакетам
Отлично, осталось только собирать кадры по пакетам, положить их в нашу очередь проигрывания, и проигрывать с небольшой задержкой.
std::function<void(rtc::message_variant)> ConferenceClient::pcOnMessage(std::string mid)
{
return [this, mid](rtc::message_variant message) {
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto nowTs = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
std::string &codec = this->track_index[mid].codec;
std::uint8_t &codecPT = this->track_index[mid].codecPT;
auto &track_info = this->track_index[mid];
auto track = track_info.track.lock();
auto &frame_cache = this->track_index[mid].buff.value();
try {
auto msg = std::get<rtc::binary>(message);
auto rtpHeader = reinterpret_cast<rtc::RtpHeader *>(msg.data());
std::uint32_t pkgTs = rtpHeader->timestamp();
if (pkgTs < track_info.lastCompletedTs) {
return;
}
auto PT = rtpHeader->payloadType();
// обработка ретранслированных пакетов
if (track->description().rtpMap(PT)->format == MyApp::Rtx) {
auto osnPos = msg.begin() + rtpHeader->getSize()
+ rtpHeader->getExtensionHeaderSize();
rtpHeader->_seqNumber = ((uint8_t) *(osnPos)) | ((uint8_t) (*(osnPos + 1)) << 8);
rtpHeader->_payloadType = codecPT;
msg.erase(osnPos, osnPos + 2);
}
std::vector<std::byte> frame;
if (!frame_cache.exist(pkgTs)
&& (track->description().rtpMap(PT)->format == MyApp::VP8CODEC
|| track->description().rtpMap(PT)->format == MyApp::VP9CODEC)) {
frame_cache.put(pkgTs, jitterbuffer());
track_info.ssrc = rtpHeader->ssrc();
track_info.frame_queue[pkgTs] = std::make_pair(nowTs, std::vector<std::byte>());
codec = track->description().rtpMap(PT)->format;
codecPT = PT;
}
jitterbuffer &buff = frame_cache.get(pkgTs);
if (codec == MyApp::VP9CODEC) {
frame = buff.addVp9Packet(std::move(msg), track_info.lastCompletedTs);
} else if (codec == MyApp::VP8CODEC) {
frame = buff.addVp8Packet(std::move(msg), track_info.lastCompletedTs);
}
if (frame.size() > 0) { // если удалось собрать кадр
track_info.frame_queue[pkgTs].second = std::move(frame);
}
} catch (std::exception &e) {
// std::cout << e.what() << std::endl;
return;
}
if (!track_info.frame_queue.empty()) {
auto it = track_info.frame_queue.begin();
auto &[rtpTs, dataPair] = *it;
auto &[creationTs, frame] = dataPair;
if (!track_info.buff->exist(rtpTs)) {
track_info.frame_queue.erase(it);
return;
}
auto &jitterbuffer = track_info.buff->get(rtpTs);
if (!frame.empty() && (nowTs - creationTs > PLAYER_DELAY)) {
this->player->play(frame, mid, codec, true);
track_info.lastCompletedTs = rtpTs;
track_info.frame_queue.erase(it);
}
this->enforceNackPolicy(mid);
}
};
}
Несколько деталей:
Строка 20 - запоздалые пакеты от кадров которые уже были проиграны просто отбрасываем
Строка 26 - мапинги rtx -> codec обговариваются в SDP при обмене оферами
Строка 30 - данные нам приходят в big-endian, а машина использует little-endian. Не перепутай!
Строка 60 - frame queue обладает типом std::map<std::uint32_t, std::pair<long, std::vectorstd::byte>> frame_queue, где uint32_t это rtp timestamp идентифицирующий кадр, а long в паре - это unix метка указывающая когда первый пакет из этого кадра к нам прибыл. Необходим для nack механизма. Map из STL как раз удобна тем, что хранит ключи в упорядоченном виде, поэтому map.begin() будет указывать на самый старый кадр
Строка 80-81 - проигрывание кадров после небольшой задержки
И наконец - логика отправки NACK пакетов
void ConferenceClient::enforceNackPolicy(std::string mid)
{
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto nowTs = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
auto &track_info = this->track_index[mid];
std::vector<rtc::RtcpNackPart> nacks;
std::vector<std::byte> nackMsg;
auto it = track_info.frame_queue.begin();
while (it != track_info.frame_queue.end()) {
auto &[rtpTs, dataPair] = *it;
auto &[creationTs, frame] = dataPair;
if (nowTs - creationTs <= NACK_TIMEOUT_MS) {
break;
}
if (!track_info.buff->exist(rtpTs)) {
it = track_info.frame_queue.erase(it);
continue;
}
auto &jitterbuffer = track_info.buff->get(rtpTs);
if (nowTs - creationTs <= NACK_TIMEOUT_MS * (jitterbuffer.nackRequested + 1)) {
it++;
continue;
}
auto frameNacks = jitterbuffer.getPacketsToNack();
if (frameNacks.size() == 0) {
it++;
continue;
}
jitterbuffer.nackRequested++;
if (jitterbuffer.nackRequested > NACK_MAX_TRIES) {
std::cout << "deleted frame[too much retries]: " << frameNacks[0].pid() << std::endl;
it = track_info.frame_queue.erase(it);
continue;
}
nacks.insert(nacks.end(), frameNacks.begin(), frameNacks.end());
it++;
}
if (nacks.size() == 0) {
return;
}
auto header = rtc::RtcpFbHeader{};
header.setMediaSourceSSRC(track_info.ssrc);
header.setPacketSenderSSRC(track_info.ssrc);
header.header.prepareHeader(205, 1, 2 + uint16_t(nacks.size()));
header.header._first |= (std::uint8_t) 0b00000001;
const auto *headerPtr = reinterpret_cast<const std::byte *>(&header);
nackMsg.insert(nackMsg.end(), headerPtr, headerPtr + sizeof(header));
const std::byte *dataPtr = reinterpret_cast<const std::byte *>(nacks.data());
const int dataSize = nacks.size() * sizeof(rtc::RtcpNackPart);
nackMsg.insert(nackMsg.end(), dataPtr, dataPtr + dataSize);
auto track = track_info.track.lock();
track->send(nackMsg.data(), nackMsg.size());
return;
}
Пояснения:
Каждый запрос увеличиваем счетчик попыток
Если (счетчик * базовый таймаут) > (nowTs - creationTs) запрашиваем утерянные пакеты
Повторяем для каждого кадра в нашей очереди
В конце агрегируем в одно сообщение и отправляем на сервер
Тут я хочу поворчать. Этот блок занял у меня куда больше времени, чем я изначально планировал. Проблема в том что Qt Media Player - это неудобный кусок ..., который просто абстрагирует нижележащий проигрыватель и взамен предоставляет свой неудобный API. При том, что механизмы для работы с RTP видео потоком там отсутствуют как класс!
Минутка слабости кончилась, продолжим. В качестве плеера будем использовать libavcodec для декодирования кадров и QtVideoSink для их отображения. За примерами ffmpeg кода советую заглянуть в официальный репозиторий [4]. Сама функция проигрывания кадров следующая.
void videoPlayer::play(rtc::binary frame, std::string mid, std::string codec, bool isVideo)
{
auto &dec = this->frames[mid];
if (!this->mp[mid] && !this->audioSink[mid]) {
std::call_once(*this->processed[mid], [this, mid, codec, isVideo]() {
this->setupDecoder(mid, codec, isVideo); // устанавливаем декодер
this->setupVisuals(mid, isVideo); // устанавливаем визуальную часть
});
}
auto start = reinterpret_cast<const uint8_t *>(frame.data());
auto end = start + frame.size();
while (start < end) {
auto ret = av_parser_parse2(dec.parser,
dec.c,
&(dec.pkt->data),
&(dec.pkt->size),
start,
frame.size(),
AV_NOPTS_VALUE,
AV_NOPTS_VALUE,
0);
start += ret;
if (dec.pkt->size) {
this->decode(mid, isVideo);
} else {
break;
}
}
}
void videoPlayer::decode(std::string mid, bool isVideo)
{
auto &dec = this->frames[mid];
auto ret = avcodec_send_packet(dec.c, dec.pkt);
if (ret < 0) {
char errbuf[256];
av_strerror(ret, errbuf, sizeof(errbuf));
std::cout << "error occured: " << ret << " " << errbuf << std::endl;
}
while (ret >= 0) {
ret = avcodec_receive_frame(dec.c, dec.frame);
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF)
return;
if (isVideo) {
QVideoFrameFormat format(QSize(dec.frame->width, dec.frame->height),
QVideoFrameFormat::PixelFormat::Format_YUV420P);
QVideoFrame frame(format);
if (frame.map(QVideoFrame::WriteOnly)) {
for (int i = 0; i < 3; ++i) {
uint8_t *src = dec.frame->data[i];
uint8_t *dst = frame.bits(i);
int size = dec.frame->width * dec.frame->height;
if (frame.bytesPerLine(i) == dec.frame->linesize[i]) {
if (i == 0) {
memcpy(dst, src, size);
} else {
memcpy(dst, src, size >> 2);
}
} else { // padding!! Have to copy line by line
int dstStride = frame.bytesPerLine(i);
int srcStride = dec.frame->linesize[i];
int planeHeight = (i == 0) ? frame.height() : frame.height() / 2;
int bytesToCopy = qMin(dstStride, srcStride);
for (int y = 0; y < planeHeight; y++) {
memcpy(dst + (y * dstStride), src + (y * srcStride), bytesToCopy);
}
}
}
frame.unmap();
}
this->mp[mid]->setVideoFrame(frame);
} else {
int channels = dec.c->channels;
int samplesPerChannel = dec.frame->nb_samples;
int bytesPerSample = av_get_bytes_per_sample((AVSampleFormat) dec.frame->format);
if (!av_sample_fmt_is_planar((AVSampleFormat) dec.frame->format)) {
this->audioDevice[mid]->write((const char *) dec.frame->data[0],
samplesPerChannel * channels * bytesPerSample);
} else {
QByteArray buffer;
buffer.reserve(samplesPerChannel * channels * bytesPerSample);
for (int i = 0; i < samplesPerChannel; ++i) {
for (int ch = 0; ch < channels; ++ch) {
uint8_t *ptr = dec.frame->data[ch] + (i * bytesPerSample);
buffer.append((const char *) ptr, bytesPerSample);
}
}
this->audioDevice[mid]->write(buffer);
}
}
}
}
Пояснение:
Строки 48-81 - Копирование декодированного YUV420 кадра из libavcodec в QtVideoFrame.
Строки 82-100 - Копирование декодированного аудио в QtMediaPlayer
Итак, у нас получилось собрать клиент. Результат смотри в картинке в начале статьи
В текущий статье мы:
Разобрали как устроена технология WebRTC
Построили SFU сервер
Построили браузерный и Qt клиенты
Исходный код [5]
Набор статей WebRTC For The Curious [6]
Более подробная инструкция по написанию SFU на Go [1]
Документация libdatachannel [3]
Примеры работы с FFmpeg [7]
Автор: StillbornGoose
Источник [8]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/qt-2/447922
Ссылки в тексте:
[1] инструкция: https://habr.com/ru/articles/790348/
[2] тут: https://github.com/listnt/sfu-videoconference/blob/main/server/index.html
[3] libdatachannel: https://libdatachannel.org/
[4] репозиторий: https://github.com/libav/libav/tree/master/doc/examples
[5] Исходный код: https://github.com/listnt/sfu-videoconference/tree/main
[6] WebRTC For The Curious : https://webrtcforthecurious.com/
[7] FFmpeg: https://ffmpeg.org/doxygen/7.0/examples.html
[8] Источник: https://habr.com/ru/articles/1016020/?utm_source=habrahabr&utm_medium=rss&utm_campaign=1016020
Нажмите здесь для печати.