- PVSM.RU - https://www.pvsm.ru -

Как воспроизводить WebRTC видео на Qt клиенте

Что ж... Недавно я увлекся C++, поэтому давайте разберемся в какой-нибудь технологии и напишем по ней статью. Мой выбор пал на WebRTC и клиент на Qt.

Результаты

Результаты

Начнем с теории и обозначим фронт работ.

Введение

Видео занимает значительную долю интернет-трафика. Ролики с котиками, созвоны с коллегами, скучные доклады с конференций - все это "тяжелый" контент, нагружающий сетевой канал. Чтобы накладные расходы на передачу были минимальны, поставщики видео зачастую предпочитают использовать UDP вместо TCP.

Идем дальше. Клиенты должны как-то коммуницировать: обмениваться данными о поддерживаемых кодеках, сообщать, куда отправлять трафик, слать отчеты о приеме-передаче (RR - Receiver Report, SR - Sender Report) и так далее. Здесь вводятся такие понятия, как SDP, STUN/TURN, ICE, Signalling Server, SFU/MCU.

Кратко:

  • SDP - дескриптор, описывающий медиасессию.

  • STUN/TURN - серверы, позволяющие клиентам находить друг друга и обмениваться трафиком.

  • ICE - протокол, поддерживающий соединение в рабочем состоянии.

  • Signalling Server - сервер, координирующий весь процесс. Обычно его совмещают с SFU/MCU - промежуточными узлами (middleboxes), которые транслируют трафик между клиентами. Мы, как разработчики, обитаем именно здесь.

Конечно можно строить полную mesh сетку между клиентами, но тогда логика становится в разы сложнее. Для более подробного ознакомления советую прочитать набор статей WebRTC For The Curious

Для примера - один поток 1080p 30fps (кодек vp8) потребляет примерно 600 Кб/сек. Но это только если ты не отправляешь опорные кадры (те, что кодируют картинку целиком, а не только изменения). Если же в сети наблюдаются большие потери пакетов, то получатель может и запросить эти самые опорные кадры заново (PLI запрос), что увеличит нагрузку на сеть в разы.

Также не забываем про джиттер, который будет будет нарушать порядок прихода этих пакетов. Для перезапроса утерянных пакетов используется RTCP фидбек механизм - NACK (Negative ack), а для борьбы с джитером - буфер, который так и называется - jitter buffer

Итого, выделяем пространство для работы:

  • Пишем Signaling / MCU Server

  • Пишем механизм запроса утерянных пакетов

  • Пишем jitterbuffer на клиенте

Сервер

Сервер мы будем писать на Go с использованием библиотеки Pion. На хабре уже есть инструкция [1] по написанию сервера, повторяться не будем, только уточним некоторые детали.

1) Добавим поддержку NACK. В Pion это делается просто, подключаем дефолтные интерцепторы.

type Controller interface {
	HandleConnection(c *common.SafeWebSocket)
	JoinRoom(peer *common.Peer, msg Msg) error
	LeaveRoom(peer *common.Peer, msg Msg) error
}

type controller struct {
	logger *zap.Logger

	roomRepo repository.RoomRepo
	api      *webrtc.API
}

func NewController(logger *zap.Logger, roomRepo repository.RoomRepo) Controller {
	settingEngine := webrtc.SettingEngine{}
	settingEngine.SetAnsweringDTLSRole(webrtc.DTLSRoleServer)
	mediaEngine := &webrtc.MediaEngine{}
	mediaEngine.RegisterDefaultCodecs()
	interseporRegistry := interceptor.Registry{}

  // Вот здесь!
	if err := webrtc.RegisterDefaultInterceptorsWithOptions(mediaEngine, &interseporRegistry,
		webrtc.WithNackGeneratorOptions(nack.GeneratorSize(8192)),
		webrtc.WithNackResponderOptions(nack.ResponderSize(8192)),
	); err != nil {
		logger.Error("failed to register interceptor", zap.Error(err))

		panic(err)
	}

	api := webrtc.NewAPI(
		webrtc.WithMediaEngine(mediaEngine),
		webrtc.WithSettingEngine(settingEngine),
		webrtc.WithInterceptorRegistry(&interseporRegistry),
	)

	ctrl := &controller{
		api:      api,
		logger:   logger,
		roomRepo: roomRepo,
	}

	go func() { // каждые две секунды отправляем RTCP запрос на I-frame
		ticker := time.NewTicker(2 * time.Second)
		for _ = range ticker.C {
			roomIds := ctrl.roomRepo.GetRooms()
			for _, roomId := range roomIds {
				go ctrl.dispatch(roomId)
			}
		}
	}()

	return ctrl
}
Скрытый текст

В моих тестах pion nack generator терял некоторые пакеты, если вам удастся найти причину этого дайте знать

2) Сымитируем потерю и задержку пакетов через tc утилитку

sudo tc qdisc add dev lo root netem delay 50ms 20ms loss 1%

Клиент

Для нашего разбора основным поставщиком видео будет браузер. Тут ничего интересного нет, WebRPC API за нас делает всю тяжелую работу. Исходный код можно посмотреть тут [2]

Запускает простой http сервер

python3 -m http.server 8080

Открываем несколько вкладок localhost:8080, и запускаем несколько видео потоков

Куда интереснее C++ клиент. К сожалению мы не можем (или не хотим 🙃 ) бандлить в наш игрушечный клиент целиком ядро хромиум, поэтому обойдемся инструментами попроще. К счастью за нас уже были написаны куча альтернатив. Мой выбор пал на libdatachannel [3], как легковесная и простая альтернатива libwebrtc от google. А в качестве GUI фреймворка используем Qt

Подключение клиента. Устанавливаем callback функции для нашего клиента

void ConferenceClient::connectClient(QString url, QString roomId)
{
    rtc::InitLogger(rtc::LogLevel::Debug);

    this->pc.onLocalDescription(this->pcOnLocalDescription(roomId));
    this->pc.onLocalCandidate(this->pcOnLocalCandidate());
    this->pc.onGatheringStateChange(this->pcOnGatheringStateChange());

    this->pc.onIceStateChange([](rtc::PeerConnection::IceState state) {
        std::cout << "Ice state changed: " << state << std::endl;
    });
    this->pc.onStateChange([](rtc::PeerConnection::State state) {
        std::cout << "state changed: " << state << std::endl;
    });

    this->ws.onOpen(this->wsOnOpen(roomId));
    this->ws.onMessage(this->wsOnMessage());

    this->pc.onTrack(this->pcOnTrack());
    this->ws.open(url.toStdString());
}

Обработка нового трека

std::function<void(std::shared_ptr<rtc::Track>)> ConferenceClient::pcOnTrack() {
    return [this](std::shared_ptr<rtc::Track> track) {
        auto mid = track->description().mid();

        this->track_index[mid]
            = {track, 0, "NO_VALUE", 0, 0, LRUCache<std::uint32_t, jitterbuffer>(256)};

        this->player->initMid(mid);
        bool isVideo = true;

        if (track->description().type() == "audio") {
            isVideo = false;

            track->setMediaHandler(std::make_shared<rtc::OpusRtpDepacketizer>());
            track->chainMediaHandler(std::make_shared<rtc::RtcpReceivingSession>());
            track->onFrame(this->trackOnFrame(mid, isVideo));
        } else {
            track->onMessage(this->pcOnMessage(mid));
        }

        track->onOpen([track]() { track->requestKeyframe(); });
        track->onClosed([this, mid]() { this->player->destroy(mid); });
    };
}

Прошу заметить, поскольку в libdatachannel нет механизмов jitter buffer и nack генерации, эти механизмы будем реализовывать самостоятельно. Следовательно, поэтому if блок (строка 18) для видео содержит обработку по пакетам

Отлично, осталось только собирать кадры по пакетам, положить их в нашу очередь проигрывания, и проигрывать с небольшой задержкой.

std::function<void(rtc::message_variant)> ConferenceClient::pcOnMessage(std::string mid)
{
    return [this, mid](rtc::message_variant message) {
        auto now = std::chrono::system_clock::now();
        auto duration = now.time_since_epoch();
        auto nowTs = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();

        std::string &codec = this->track_index[mid].codec;
        std::uint8_t &codecPT = this->track_index[mid].codecPT;

        auto &track_info = this->track_index[mid];
        auto track = track_info.track.lock();
        auto &frame_cache = this->track_index[mid].buff.value();

        try {
            auto msg = std::get<rtc::binary>(message);
            auto rtpHeader = reinterpret_cast<rtc::RtpHeader *>(msg.data());

            std::uint32_t pkgTs = rtpHeader->timestamp();
            if (pkgTs < track_info.lastCompletedTs) {
                return;
            }

            auto PT = rtpHeader->payloadType();
            // обработка ретранслированных пакетов
            if (track->description().rtpMap(PT)->format == MyApp::Rtx) { 
                auto osnPos = msg.begin() + rtpHeader->getSize()
                              + rtpHeader->getExtensionHeaderSize();

                rtpHeader->_seqNumber = ((uint8_t) *(osnPos)) | ((uint8_t) (*(osnPos + 1)) << 8);
                rtpHeader->_payloadType = codecPT;

                msg.erase(osnPos, osnPos + 2);
            }

            std::vector<std::byte> frame;

            if (!frame_cache.exist(pkgTs)
                && (track->description().rtpMap(PT)->format == MyApp::VP8CODEC
                    || track->description().rtpMap(PT)->format == MyApp::VP9CODEC)) {
                frame_cache.put(pkgTs, jitterbuffer());

                track_info.ssrc = rtpHeader->ssrc();

                track_info.frame_queue[pkgTs] = std::make_pair(nowTs, std::vector<std::byte>());

                codec = track->description().rtpMap(PT)->format;
                codecPT = PT;
            }

            jitterbuffer &buff = frame_cache.get(pkgTs);

            if (codec == MyApp::VP9CODEC) {
                frame = buff.addVp9Packet(std::move(msg), track_info.lastCompletedTs);
            } else if (codec == MyApp::VP8CODEC) {
                frame = buff.addVp8Packet(std::move(msg), track_info.lastCompletedTs);
            }

            if (frame.size() > 0) { // если удалось собрать кадр
                track_info.frame_queue[pkgTs].second = std::move(frame);
            }
        } catch (std::exception &e) {
            // std::cout << e.what() << std::endl;
            return;
        }

        if (!track_info.frame_queue.empty()) {
            auto it = track_info.frame_queue.begin();

            auto &[rtpTs, dataPair] = *it;
            auto &[creationTs, frame] = dataPair;

            if (!track_info.buff->exist(rtpTs)) {
                track_info.frame_queue.erase(it);
                return;
            }

            auto &jitterbuffer = track_info.buff->get(rtpTs);

            if (!frame.empty() && (nowTs - creationTs > PLAYER_DELAY)) {
                this->player->play(frame, mid, codec, true);
                track_info.lastCompletedTs = rtpTs;

                track_info.frame_queue.erase(it);
            }

            this->enforceNackPolicy(mid);
        }      
    };
}

Несколько деталей:

  1. Строка 20 - запоздалые пакеты от кадров которые уже были проиграны просто отбрасываем

  2. Строка 26 - мапинги rtx -> codec обговариваются в SDP при обмене оферами

  3. Строка 30 - данные нам приходят в big-endian, а машина использует little-endian. Не перепутай!

  4. Строка 60 - frame queue обладает типом std::map<std::uint32_t, std::pair<long, std::vectorstd::byte>> frame_queue, где uint32_t это rtp timestamp идентифицирующий кадр, а long в паре - это unix метка указывающая когда первый пакет из этого кадра к нам прибыл. Необходим для nack механизма. Map из STL как раз удобна тем, что хранит ключи в упорядоченном виде, поэтому map.begin() будет указывать на самый старый кадр

  5. Строка 80-81 - проигрывание кадров после небольшой задержки

И наконец - логика отправки NACK пакетов

void ConferenceClient::enforceNackPolicy(std::string mid)
{
    auto now = std::chrono::system_clock::now();
    auto duration = now.time_since_epoch();
    auto nowTs = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
    auto &track_info = this->track_index[mid];

    std::vector<rtc::RtcpNackPart> nacks;
    std::vector<std::byte> nackMsg;

    auto it = track_info.frame_queue.begin();
    while (it != track_info.frame_queue.end()) {
        auto &[rtpTs, dataPair] = *it;
        auto &[creationTs, frame] = dataPair;

        if (nowTs - creationTs <= NACK_TIMEOUT_MS) {
            break;
        }

        if (!track_info.buff->exist(rtpTs)) {
            it = track_info.frame_queue.erase(it);
            continue;
        }

        auto &jitterbuffer = track_info.buff->get(rtpTs);
        if (nowTs - creationTs <= NACK_TIMEOUT_MS * (jitterbuffer.nackRequested + 1)) {
            it++;
            continue;
        }

        auto frameNacks = jitterbuffer.getPacketsToNack();
        if (frameNacks.size() == 0) {
            it++;
            continue;
        }
        jitterbuffer.nackRequested++;

        if (jitterbuffer.nackRequested > NACK_MAX_TRIES) {
            std::cout << "deleted frame[too much retries]: " << frameNacks[0].pid() << std::endl;
            it = track_info.frame_queue.erase(it);
            continue;
        }

        nacks.insert(nacks.end(), frameNacks.begin(), frameNacks.end());

        it++;
    }

    if (nacks.size() == 0) {
        return;
    }

    auto header = rtc::RtcpFbHeader{};
    header.setMediaSourceSSRC(track_info.ssrc);
    header.setPacketSenderSSRC(track_info.ssrc);
    header.header.prepareHeader(205, 1, 2 + uint16_t(nacks.size()));
    header.header._first |= (std::uint8_t) 0b00000001;

    const auto *headerPtr = reinterpret_cast<const std::byte *>(&header);

    nackMsg.insert(nackMsg.end(), headerPtr, headerPtr + sizeof(header));

    const std::byte *dataPtr = reinterpret_cast<const std::byte *>(nacks.data());
    const int dataSize = nacks.size() * sizeof(rtc::RtcpNackPart);
    nackMsg.insert(nackMsg.end(), dataPtr, dataPtr + dataSize);

    auto track = track_info.track.lock();

    track->send(nackMsg.data(), nackMsg.size());

    return;
}

Пояснения:

  1. Каждый запрос увеличиваем счетчик попыток

  2. Если (счетчик * базовый таймаут) > (nowTs - creationTs) запрашиваем утерянные пакеты

  3. Повторяем для каждого кадра в нашей очереди

  4. В конце агрегируем в одно сообщение и отправляем на сервер

Плеер

Тут я хочу поворчать. Этот блок занял у меня куда больше времени, чем я изначально планировал. Проблема в том что Qt Media Player - это неудобный кусок ..., который просто абстрагирует нижележащий проигрыватель и взамен предоставляет свой неудобный API. При том, что механизмы для работы с RTP видео потоком там отсутствуют как класс!

Минутка слабости кончилась, продолжим. В качестве плеера будем использовать libavcodec для декодирования кадров и QtVideoSink для их отображения. За примерами ffmpeg кода советую заглянуть в официальный репозиторий [4]. Сама функция проигрывания кадров следующая.

void videoPlayer::play(rtc::binary frame, std::string mid, std::string codec, bool isVideo)
{
    auto &dec = this->frames[mid];

    if (!this->mp[mid] && !this->audioSink[mid]) {
        std::call_once(*this->processed[mid], [this, mid, codec, isVideo]() {
            this->setupDecoder(mid, codec, isVideo); // устанавливаем декодер
            this->setupVisuals(mid, isVideo); // устанавливаем визуальную часть
        });
    }

    auto start = reinterpret_cast<const uint8_t *>(frame.data());
    auto end = start + frame.size();

    while (start < end) {
        auto ret = av_parser_parse2(dec.parser,
                                    dec.c,
                                    &(dec.pkt->data),
                                    &(dec.pkt->size),
                                    start,
                                    frame.size(),
                                    AV_NOPTS_VALUE,
                                    AV_NOPTS_VALUE,
                                    0);
        start += ret;
        if (dec.pkt->size) {
            this->decode(mid, isVideo);
        } else {
            break;
        }
    }
}

void videoPlayer::decode(std::string mid, bool isVideo)
{
    auto &dec = this->frames[mid];
    auto ret = avcodec_send_packet(dec.c, dec.pkt);
    if (ret < 0) {
        char errbuf[256];
        av_strerror(ret, errbuf, sizeof(errbuf));
        std::cout << "error occured: " << ret << " " << errbuf << std::endl;
    }
  while (ret >= 0) {
      ret = avcodec_receive_frame(dec.c, dec.frame);
      if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF)
          return;

      if (isVideo) {
          QVideoFrameFormat format(QSize(dec.frame->width, dec.frame->height),
                                   QVideoFrameFormat::PixelFormat::Format_YUV420P);

          QVideoFrame frame(format); 
          if (frame.map(QVideoFrame::WriteOnly)) {
              for (int i = 0; i < 3; ++i) {
                  uint8_t *src = dec.frame->data[i];
                  uint8_t *dst = frame.bits(i);

                  int size = dec.frame->width * dec.frame->height;

                  if (frame.bytesPerLine(i) == dec.frame->linesize[i]) {
                      if (i == 0) {
                          memcpy(dst, src, size);
                      } else {
                          memcpy(dst, src, size >> 2);
                      }
                  } else { // padding!! Have to copy line by line
                      int dstStride = frame.bytesPerLine(i);
                      int srcStride = dec.frame->linesize[i];

                      int planeHeight = (i == 0) ? frame.height() : frame.height() / 2;
                      int bytesToCopy = qMin(dstStride, srcStride);

                      for (int y = 0; y < planeHeight; y++) {
                          memcpy(dst + (y * dstStride), src + (y * srcStride), bytesToCopy);
                      }
                  }
              }
              frame.unmap();
          }

          this->mp[mid]->setVideoFrame(frame);
      } else {
          int channels = dec.c->channels;
          int samplesPerChannel = dec.frame->nb_samples;
          int bytesPerSample = av_get_bytes_per_sample((AVSampleFormat) dec.frame->format);

          if (!av_sample_fmt_is_planar((AVSampleFormat) dec.frame->format)) {
              this->audioDevice[mid]->write((const char *) dec.frame->data[0],
                                            samplesPerChannel * channels * bytesPerSample);
          } else {
              QByteArray buffer;
              buffer.reserve(samplesPerChannel * channels * bytesPerSample);

              for (int i = 0; i < samplesPerChannel; ++i) {
                  for (int ch = 0; ch < channels; ++ch) {
                      uint8_t *ptr = dec.frame->data[ch] + (i * bytesPerSample);
                      buffer.append((const char *) ptr, bytesPerSample);
                  }
              }
              this->audioDevice[mid]->write(buffer);
          }
      }
  }
}

Пояснение:

  • Строки 48-81 - Копирование декодированного YUV420 кадра из libavcodec в QtVideoFrame.

  • Строки 82-100 - Копирование декодированного аудио в QtMediaPlayer

Итак, у нас получилось собрать клиент. Результат смотри в картинке в начале статьи

Заключение

В текущий статье мы:

  • Разобрали как устроена технология WebRTC

  • Построили SFU сервер

  • Построили браузерный и Qt клиенты

Полезные ссылки

Автор: StillbornGoose

Источник [8]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/qt-2/447922

Ссылки в тексте:

[1] инструкция: https://habr.com/ru/articles/790348/

[2] тут: https://github.com/listnt/sfu-videoconference/blob/main/server/index.html

[3] libdatachannel: https://libdatachannel.org/

[4] репозиторий: https://github.com/libav/libav/tree/master/doc/examples

[5] Исходный код: https://github.com/listnt/sfu-videoconference/tree/main

[6] WebRTC For The Curious : https://webrtcforthecurious.com/

[7] FFmpeg: https://ffmpeg.org/doxygen/7.0/examples.html

[8] Источник: https://habr.com/ru/articles/1016020/?utm_source=habrahabr&utm_medium=rss&utm_campaign=1016020