Заметки о том, как я писал SFU на Rust (1 часть)

от автора

Я работаю с Rust уже несколько лет. В основном — бэкенд, микросервисы, много асинхронного кода и многопототочных задач. А ещё я давно хотел разобраться в WebRTC. Не на уровне «вот туториал, скопируй и запусти», а чтобы реально понимать, что происходит с пакетами, как они передаются, как маршрутизируются.

В итоге я решил написать свой SFU.

Не для продакшена. Не чтобы конкурировать с LiveKit или Jitsi. Просто чтобы посмотреть, как оно работает изнутри. И написать его на Rust — потому что это мой основной инструмент, и мне было интересно, насколько он подходит для такого рода задач.

Эта статья — не гайд «как сделать SFU за 5 минут». Это скорее дневник разработки.

В конце я буду рад, если кто-то укажет на ошибки или предложит лучшие решения. Это и есть главная цель — разобраться глубже.

С чего всё началось

Пару месяцев назад я наткнулся на статью о том, как устроен WebRTC в браузере. В ней было много про SDP, ICE кандидаты, DTLS рукопожатие. Я прочитал и понял, что половину терминов не знаю.

Тогда я решил: единственный способ понять — попробовать собрать что-то рабочее самому.

Я начал с простого: взял библиотеку webrtc-rs, написал эхо-сервер, который принимал видео и отправлял его обратно. Заработало. Потом я захотел добавить второго пользователя и столкнулся с тем что мне необходимо понимание всех тех терминов, которых я не понял из той статьи. Поэтому придется все таки начать с базы, а именно теории.

Что такое SFU и зачем он нужен

WebRTC изначально задумывался как технология для прямого соединения между браузерами. Peer-to-peer. Один участник отправляет поток напрямую другому. Всё хорошо, пока участников двое.

Как только их становится трое, каждый должен отправлять видео каждому. Это уже три соединения на участника. Для четырёх — шесть. Для десяти — сорок пять. Это называется mesh-топология, и она не масштабируется — растёт нагрузка на клиенты и расход трафика.

SFU (Selective Forwarding Unit) решает эту проблему. Это сервер, который сидит между участниками. Каждый участник отправляет свой поток только один раз — на сервер. Сервер получает поток и рассылает его всем остальным участникам.

Какие фичи я хотел получить

Когда я проектировал свой SFU, я сразу определил, что хочу:

1. Переключение качества на лету

Не у всех участников хороший канал. Кто-то сидит с мобильного интернета, кто-то — с оптоволокна. SFU должен уметь отдавать 1080p тем, кто может его принять, и 360p — тем, у кого плохое соединение. И делать это без переподключения.

2. Адаптивность к состоянию канала

Даже если у участника хороший интернет, он может ухудшиться. SFU должен замечать это (потеря пакетов, задержки) и автоматически понижать качество, чтобы звонок не прервался.

3. Минимальная задержка

SFU не перекодирует видео (в отличие от MCU — Multipoint Control Unit). Он просто пересылает пакеты. Это даёт минимальную задержку и низкую нагрузку на сервер.

Как эти фичи повлияли на архитектуру

Эти три требования определили всё, что я делал дальше.

Переключение качества означало, что у меня должно быть несколько потоков одного видео с разным разрешением. Например, один участник отправляет видео в 1080p, 720p и 360p одновременно. SFU выбирает, какой поток отдать каждому подписчику.

Адаптивность означала, что я должен постоянно мониторить состояние канала каждого подписчика. Если ухудшается — менять поток на более низкое качество. Если улучшается — поднимать обратно.

Минимальная задержка означала, что я не могу перекодировать видео на сервере. Только пересылать RTP-пакеты. Это упрощало архитектуру, но накладывало ограничения: всё переключение качества должно происходить на уровне выбора готового потока, а не на уровне перекодирования.

Эти требования привели меня к следующей архитектуре:

1. Комната как маршрутизатор

Нужен компонент, который получает RTP-пакеты и рассылает их подписчикам без задержек. Комната выступает в роли такого маршрутизатора: она ретранслирует медиа от Publisher во все активные подписки без перекодирования.

2. User (Пользователь)

Каждый пользователь представлен компонентом User, который может выступать в двух ролях — издателя (Publisher) и подписчика (Subscriber). Роли опциональны: пользователь может только публиковать, только подписываться или делать и то, и другое одновременно. User объединяет эти роли и связывает их с конкретной комнатой.

3. Publisher (Издатель)

Нужен компонент, который управляет потоками издателя. Publisher хранит несколько видеотреков разного качества и аудиотрек, отправляет их в комнату и знает, какой слой качества активен для каждого подписчика.

4. QualityMonitor (Мониторинг качества)

Нужен компонент, который мониторит состояние канала каждого подписчика и принимает решения о смене качества. QualityMonitor получает обратную связь (потеря пакетов, задержки) и сигнализирует Publisher, когда нужно переключиться.

5. Подписки (VideoSubscription / AudioSubscription)

Нужна возможность переключать каналы без потери пакетов — чтобы при смене качества подписчик не заметил разрыва. VideoSubscription хранит доступные слои качества и текущий выходной трек. Это позволяет Publisher выбирать нужный слой, а комнате — ретранслировать его без перекодирования.

В этой частьи статьи я пройдусь лишь по базовой маршрутизации потоков, я покажу как я реализовал смену слоев исходя из качества соединения во второй части.

Упрощенная классовая диаграмма сервера

Упрощенная классовая диаграмма сервера

Архитектурный фундамент

Мне нужен был способ структурировать асинхронный код. Прямые tokio-таски, Arc<Mutex> быстро привели к гонкам данных: например, один обработчик читал список подписчиков, пока другой его модифицировал. Нужна была сериализация доступа к состоянию.

Брать Actix ради десятка сущностей было избыточно. Поэтому я написал минимальную обёртку над tokio::spawn + mpsc::channel в духе паттерна Active Object:

  • Каждый компонент живёт в своей таске

  • Всё общение — через сообщения в канал

  • Доступ к внутреннему состоянию — только изнутри таски, без мьютексов

Никакой иерархии супервизоров и перезапусков здесь нет — для моего случая это не требовалось. Падение таски WebRTC-пира означает, что пользователь отключился, и это корректно обрабатывается на уровне комнаты.

pub trait Actor: Sized + Send + 'static {    type Message: Sized + Send + 'static;    fn handle(&mut self, ctx: &mut Ctx<'_, Self>, m: Self::Message) -> impl Future<Output = ()> + Send;    fn starting(&mut self, ctx: &Ctx<'_, Self>) -> impl Future<Output = ()> + Send;    fn stopping(self, ctx: &Ctx<'_, Self>) -> impl Future<Output = ()> + Send;    fn stop(&mut self, ctx: &mut Ctx<'_, Self>) -> impl Future<Output = ()> + Send {        async move {            ctx.should_stop = true;        }    }    fn start(self) -> Addr<Self> {        Addr::spawn(self, 32)    }    fn start_with_capacity(self, capacity: usize) -> Addr<Self> {        Addr::spawn(self, capacity)    }}

Каждый актор живет в свой tokio таске, принимает сообщения по mpsc каналу. Чтобы отправить ему сообщение необходимо иметь его адрес.

pub struct Addr<A: Actor> {    id: Uuid,    requests: tokio::sync::mpsc::Sender<<A as Actor>::Message>,    terminate_call: Arc<Mutex<Option<tokio::sync::oneshot::Sender<()>>>>}impl<A: Actor> Addr<A> {    fn spawn(mut actor: A, capacity: usize) -> Self {        let (requests, messages) = tokio::sync::mpsc::channel(capacity);        let (terminate_call, terminate) = tokio::sync::oneshot::channel();        let addr = Addr { id: Uuid::new_v4(), requests, terminate_call: Arc::new(Mutex::new(Some(terminate_call))) };        tokio::spawn({            let ctx_addr = addr.clone();            async move {                let mut ctx = Ctx {addr: &ctx_addr, should_stop: false};                actor.starting(&ctx).await;                tokio::select! {                    _ = handle_messages(&mut actor, &mut ctx, messages) => {},                    _ = terminate => {}                }                actor.stopping(&ctx).await;            }        });        addr    }    pub async fn send(&self, m: <A as Actor>::Message) -> Result<(), SendError<<A as Actor>::Message>> {        self.requests.send(m).await?;        Ok(())    }}

Актор может быть остановлен (и дропнут) в двух случаях: не осталось ни одного отправителя сообщений или у адреса был вызван метод terminate. В этом случая любой вызов send будет возвращать ошибку посколько получатель сообщений был дропнут.

Сервер-клиент взаимодействие

Перед тем как перейти к реализации sfu вкратце опишу то как браузер будет взаимодействовать с нашим сервером.

Клиент-серверное взаимодействие

Клиент-серверное взаимодействие

Как видно браузер при открытие нашего фронтенда будет делать запрос на получение всех комнат. Пользователю предоставлена возможность как выбрать и зайти в нужную комнату так и создать свою POST запросом. Целью работы было разобраться именно с WEBRTC поэтому авторизации, хранения комнат в базе данных и прочего реализовано не было, все данные будет храниться в памяти нашего сервера.

Сценарии начального взаимодействия

Сценарии начального взаимодействия
Главная страница

Главная страница

Чтобы проверить, что клиент подключается, создадим новую комнату, запустим в OBS виртуальную камеру с гифкой и подключимся c других вкладок.

Погружаемся в удивительный мир WEBRTC

Итак мы смогли подключится к комнате с двух вкладок и теперь на обоих из них наша виртальная вебкамера. Думаю мы можем начать разбираться как это все работает.

По сути наш сервер будет заниматься пересылкой rtp пакетов от одного пользователя всем остальным в комнате, но, чтобы это было возможно, нам необходимо, чтобы браузер передал информацию о подключение пользователя серверу, а тот в свою очередь уже связал его с остальными и здесь нам понадобится вебсокет.

Подключения пользователя к серверу

Попав в комнату мы cоздаем вебсокет используя айди комнаты

function createWebSocket (room_id) {    const hostname = window.location.hostname;    if (hostname === 'localhost') {        return new WebSocket(`ws://${hostname}:8080/api/room/${room_id}`);    } else {        return new WebSocket(`wss://${hostname}/api/room/${room_id}`);    }};

В этот момент сервер пытается найти комнату по этому айди и добавить актор User в нее.

Actor User

В моей первой версии юзер представлял из себя God Object, он отвечал и за работу с вебсокетом и за передачу треков и за подписки на треки. Помимо того что это было просто очень сложно поддерживать, я периодически ловил баги с самим webrtc. По этому я принял решение распилить логику юзера на User, Publisher, Subscriber.

pub struct Publisher<S: SyncChannel> {    pub peer_id: Uuid,    pub pc: Arc<RTCPeerConnection>,    pub video_tracks: HashMap<StreamQuality, Track<VideoPacketForwarder>>,    pub audio_track: Option<Track<AudioPacketForwarder>>,    pub user: Addr<User<S>>,    pub room: Addr<Room<S>>,        pub qualify_monitor: WeakAddr<QualityMonitor<S>>,}
pub struct Subscriber<S: SyncChannel> {    pub user: Addr<User<S>>,    pub pc: Arc<RTCPeerConnection>,    pub audio_subscriptions: HashMap<Uuid, Addr<AudioSubscription<S>>>,    pub video_subscriptions: HashMap<Uuid, Addr<VideoSubscription>>,}

Как можно понять, теперь юзер отвечает за основную логику взаимодействия с комнатой и вебсокетом, он ничего не знает про RTCPeerConnection, которых в нашем случае два.
RTCPeerConnection паблишера отвечает за прием видео и аудио треков самого юзера, то есть за его вебкамеру и микрофон.
RTCPeerConnection подписчика отвечает за добавление чужих треков.

Обмен сетевыми адресами

В начале браузер и сервер должны обменяться сетевыми адресами.
ICE-кандидат это сетевой адрес, который устройство сообщает другой стороне для установления соединения. В нашем случае стороны — это браузер и SFU-сервер. ICE помогает найти рабочую пару адресов в обход NAT-ов и файрволов.
На сервере и фронте мы ожидаем евента icecandidate, получив его мы отправим результат по вебсокету. Это нужно будет сделать для обоих соединений (паблишера и подписчика — я покажу на одном).

Ждем евент на сервере:

self.pc.on_ice_candidate(Box::new(move |candidate| {    let addr = addr.clone();    Box::pin(async move {        if let Some(cand) = candidate {            if let Ok(candidate_json) = cand.to_json() {                let RTCIceCandidateInit {                       candidate, s                      dp_mid,                       sdp_mline_index, .. } = candidate_json;                addr.send(PublisherMessage::IceCandidate {                   candidate: IceCandidate {                         candidate,                         sdp_mid,                         sdp_mline_index                       }                   }).await;            }        }    })}));

При получении сообщения мы должны добавить его к нашему соединению на фронте.

async add_ice_candidate(message) {    const iceCandidate = new RTCIceCandidate({        candidate: message.candidate,        sdpMid: message.sdp_mid,        sdpMLineIndex: message.sdp_mline_index    });    await this.publisher_pc.addIceCandidate(iceCandidate);}

Ждем евент на фронте:

publisher_pc.onicecandidate = (event) => {    if (event.candidate && event.candidate.candidate) {        this.ws.send(JSON.stringify({            kind: "rtc",            target: "publisher",            type: "candidate",            candidate: event.candidate.candidate,        }));    }};

При получении сообщения мы должны добавить его к нашему соединению на сервере.

MessageType::Candidate { candidate } => {    let IceCandidate { candidate, sdp_mid, sdp_mline_index } = candidate;    let candidate_init = RTCIceCandidateInit {        candidate,        sdp_mid,        sdp_mline_index,        ..Default::default()    };    self.pc.add_ice_candidate(candidate_init).await?;},

Сценарий соединения с сервером

Подключившись по вебсокету теперь клиент должен ожидать от нас сообщения welcome содержащим его айди.

case 'welcome': {    this.peer_id = message.peer_id;    this.room_status.value = "Получение медиапотока...";    const video_stream = await this.getMedia({         video: {            width: { ideal: 1280 },            height: { ideal: 720 },            frameRate: { ideal: 30 }        }    });    const audio_stream = await this.getMedia({        audio: {            echoCancellation: true,                    noiseSuppression: true,                    autoGainControl: true,                     sampleRate: { ideal: 48000 },              sampleSize: { ideal: 16 },                 channelCount: { ideal: 1 }             }    });    await this.peer_connection.add_tracks(video_stream, audio_stream);    this.users.value.set(this.peer_id, {        stream: video_stream,        isLocal: true    });    this.room_status.value = "Подключено";    break;}

Получив сообщение мы запрашиваем у пользователя его микрофон и вебкамеру (для простоты я показал код без опциональности того и другого). Получив медиа стимы мы должны добавить их в наше соединения для паблишера.

async add_tracks(video_stream, audio_stream) {    const video_track = video_stream.getVideoTracks()[0];    this.publisher_pc.addTransceiver(video_track, {        direction: 'sendonly',    });    const audio_track = audio_stream.getAudioTracks()[0];    this.publisher_pc.addTransceiver(audio_track, { direction: 'sendonly' });    let offer = await this.publisher_pc.createOffer();    await this.publisher_pc.setLocalDescription(offer);    this.ws.send(JSON.stringify({        kind: "rtc",        target: "publisher",        type: "offer",        sdp: offer.sdp    }));}

Возможно отправление трека для меня было самой сложной в понимании частью пока я не просто не посмотрел что за оффер мы создаем.
Чтобы было наглядно понятно с чем мы имеем дело, я собрал нужную для понимания информацию в JSON.

{  "session_type": "offer",  "direction": "sendonly",  "security": {    "fingerprint_algorithm": "sha-256",    "fingerprint": "C6:EB:6A:00:AF:81:07:85:D1:E1:9A:C7:E3:C9:77:BE:A6:72:E2:46:BC:47:86:0F:51:03:48:0E:01:E8:97:5C"  },  "connectivity": {    "ice_ufrag": "d4ew",    "ice_pwd": "0/FjYTns3K/Ib8xQ8JvFnaPN",    "ice_options": "trickle"  },  "media_streams": {    "video": {      "supported_codecs": [        "VP8",        "H264",        "AV1",        "VP9"      ],      "features": {        "simulcast": "enabled",        "simulcast_layers": ["low", "mid", "high"]      }    },    "audio": {      "primary_codec": "opus",      "sample_rate_hz": 48000,      "channels": 2,      "fallback_codecs": [        "G722",        "PCMU",        "PCMA"      ]    }  }}

Получив Offer от клиента, SFU вызывает set_remote_description(offer). Именно этот вызов запускает два параллельных процесса:

  • Приём медиа: начинается DTLS-рукопожатие, и как только оно завершается, срабатывает колбэк on_track — сервер получает доступ к входящим RTP-пакетам, не дожидаясь отправки Answer

  • Сигнализация: SFU создаёт Answer и отправляет его обратно клиенту, чтобы тот завершил рукопожатие со своей стороны

Эти два процесса идут параллельно. SFU может начать ретранслировать пакеты другим участникам ещё до того, как клиент-издатель получит Answer, потому что UDP-поток идёт в одну сторону, а сигнализация — по вебсокету.

MessageType::Offer {sdp} => {    let offer_desc = RTCSessionDescription::offer(sdp)?;    self.pc.set_remote_description(offer_desc).await?;    let answer = self.pc.create_answer(None).await?;    self.pc.set_local_description(answer.clone()).await?;    let message = SignalMessage::Rtc {        target: Target::Publisher,        message_type: MessageType::Answer {sdp: answer.sdp }    };    self.user.send(UserMessage::SignalMessage(message)).await;    self.room.send(RoomMessage::SubscribeToPeers {       peer_id: self.peer_id     }).await;}

Сервер присылает на фронт answer (для наглядности представим его ввиде json):

{  "role": "answer",  "direction": "recvonly",  "connection": {    "bundle": true,    "dtls_setup": "active",    "fingerprint_hash": "sha-256 F1:5C:67:08:21:4B:C5:33:CD:AB:B4:66:EC:54:8D:70:5D:F6:41:CF:21:A4:CD:42:0F:E9:67:2D:D8:86:50:17",    "ice_trickle_enabled": true,    "ice_credentials": {      "ufrag": "IOOqnyJnAPErLLLV",      "pwd": "PIsWYhOYTLAoOIDfaEZtugybeZKLmwQi"    }  },  "media_streams": [    {      "type": "video",      "mid": 0,      "port": 9,      "protocol": "UDP/TLS/RTP/SAVPF",      "codecs": [        { "payload_type": 96, "name": "VP8", "clock_rate": 90000 },        { "payload_type": 103, "name": "H264", "clock_rate": 90000, "profile": "42001f", "packetization_mode": 1 },        { "payload_type": 107, "name": "H264", "clock_rate": 90000, "profile": "42001f", "packetization_mode": 0 },        { "payload_type": 109, "name": "H264", "clock_rate": 90000, "profile": "42e01f", "packetization_mode": 1 },        { "payload_type": 115, "name": "H264", "clock_rate": 90000, "profile": "42e01f", "packetization_mode": 0 },        { "payload_type": 45, "name": "AV1", "clock_rate": 90000, "profile": 0 },        { "payload_type": 98, "name": "VP9", "clock_rate": 90000, "profile_id": 0 },        { "payload_type": 121, "name": "ulpfec", "clock_rate": 90000 }      ],      "features": {        "rtcp_mux": true,        "rtcp_rsize": true,        "feedback_mechanisms": ["goog-remb", "transport-cc", "ccm fir", "nack", "nack pli"],        "simulcast": false      }    },    {      "type": "audio",      "mid": 1,      "port": 9,      "protocol": "UDP/TLS/RTP/SAVPF",      "codecs": [        { "payload_type": 111, "name": "opus", "clock_rate": 48000, "channels": 2, "fec": true },        { "payload_type": 9, "name": "G722", "clock_rate": 8000 },        { "payload_type": 0, "name": "PCMU", "clock_rate": 8000 },        { "payload_type": 8, "name": "PCMA", "clock_rate": 8000 }      ],      "features": {        "rtcp_mux": true,        "rtcp_rsize": true,        "feedback_mechanisms": ["transport-cc"]      }    }  ]}

При обработке коллбека on_track мы должны разослать трек другим пользователям в комнате.

self.pc.on_track(Box::new(move |track, _, _| {    let pc = pc.clone();    let addr = addr.clone();    let ssrc = track.ssrc();    let mime_type = track.codec().capability.mime_type;    let mime_type = MimeType::from_str(&mime_type).unwrap_or_default();    let kind = track.kind();    let rid = StreamQuality::from_str(track.rid());    Box::pin(async move {        match kind {            RTPCodecType::Audio | RTPCodecType::Unspecified => {                 let rtp_packet_forwarder =                     RtpPacketGatewayRouter::<AudioPacketForwarder>::spawn(                      track, StreamQuality::Audio, None                    );                addr.send(                    PublisherMessage::NewAudioTrack(Track {                       mime_type, addr: rtp_packet_forwarder                     })                ).await;            },            RTPCodecType::Video => {                let pli_sender = PliSender::new(pc.clone(), ssrc).start();                let quality = rid.unwrap_or(StreamQuality::High);                let rtp_packet_forwarder=                     RtpPacketGatewayRouter::<VideoPacketForwarder>::spawn(                      track, quality, Some(pli_sender)                    );                addr.send(                    PublisherMessage::NewVideoTrack {                        track: Track { mime_type, addr: rtp_packet_forwarder } ,                         quality                     }                ).await;            }           };    })}));

Актор комната пересылает трек и запоминает его, для того чтобы послать его при добавлении нового пользователя.

  RoomMessage::AddAudioTrack { peer_id, track: mut stream } => {      self.connect_new_audio_stream(peer_id, &mut stream).await;      let Some(peer) = self.peers.get_mut(&peer_id) else { return; };      peer.add_audio_track(stream);  },  RoomMessage::AddVideoTrack { peer_id, track: mut stream, quality } => {      self.connect_new_video_stream(peer_id, quality, &mut stream).await;      let Some(peer) = self.peers.get_mut(&peer_id) else { return; };      peer.add_stream_track(quality, stream);  }
async fn connect_new_audio_stream(&mut self,   peer_id: Uuid,   stream: &mut PeerTrack<AudioPacketForwarder>) {      let peers = self.peers.iter()          .filter(|(id, _)| **id != peer_id);      for (_, peer) in peers {          let Peer { user, .. } = peer;          user.send(UserMessage::ConnectAudio(ConnectionRequest {               peer_id,               gateway_router: stream.gateway_router.clone(),               codec_mime_type: stream.mime_type.clone()          })).await;      }}async fn connect_new_video_stream(&mut self,   peer_id: Uuid,   quality: StreamQuality,   stream: &mut PeerTrack<VideoPacketForwarder>) {    let peers = self.peers.iter()        .filter(|(id, _)| **id != peer_id);    for (_, peer) in peers {        let Peer { user, .. } = peer;        user.send(UserMessage::ConnectVideo{             quality,             request: ConnectionRequest {                 peer_id,                 gateway_router: stream.gateway_router.clone(),                 codec_mime_type: stream.mime_type.clone()            }        }).await;    }}

Подписка на чужие треки

Теперь мы можем перейти к подписке на другого пользователя. В отличие от сценария первого пользователя, второй вошедший при заходе в комнату сразу подписывается на всех остальных.

RoomMessage::SubscribeToPeers { peer_id } => {    self.connect_old_streams(peer_id).await;}
async fn connect_old_streams(&mut self, peer_id: Uuid) {    let Some(Peer { user, .. }) = self.peers.get(&peer_id) else { return ; };    let stream = self.peers.iter()        .filter(|(id, _)| **id != peer_id);    for (existed_peer_id, Peer { audio_track: audio_stream, video_tracks: video_streams, .. }) in stream {        if let Some(audio_stream) = audio_stream {            user.send(UserMessage::ConnectAudio(ConnectionRequest {                codec_mime_type: audio_stream.mime_type.clone(),                peer_id: *existed_peer_id,                gateway_router: audio_stream.gateway_router.clone()            })).await;        }        for (quailty, video_stream) in video_streams {            user.send(UserMessage::ConnectVideo {                quality,                request: ConnectionRequest {                    codec_mime_type: video_stream.mime_type.clone(),                    peer_id: *existed_peer_id,                    gateway_router: video_stream.gateway_router.clone()                }            }).await;        }    }}

Для того чтобы все работало нам нужно подключить чужой трек к RTCPeerConnection подписчика.

let track: Arc<TrackLocalStaticRTP> = Arc::new(TrackLocalStaticRTP::new(    RTCRtpCodecCapability {        mime_type: mime_type.to_string(),        ..Default::default()    },    format!("video_{peer_id}"),    format!("video_{peer_id}")));let active_track = pc.add_transceiver_from_track(        track.clone() as Arc<_>,        Some(RTCRtpTransceiverInit {           direction: RTCRtpTransceiverDirection::Sendonly,           send_encodings: vec![]       }))    .await?    .sender()    .await;

В этот момент у подключения вызовется евент negotiation_needed, мы обязаны обработать, сформировать оффер на сервере и переслать его на фронт всем остальным пользователям.

self.pc.on_negotiation_needed(Box::new(move || {    let pc = Arc::clone(&pc);    let addr = addr.clone();    Box::pin(async move {        if pc.signaling_state() != RTCSignalingState::Stable {            return;        }        match pc.create_offer(None).await {            Ok(offer) => {                if let Err(e) = pc.set_local_description(offer.clone()).await {                    tracing::error!("[User] set_local_description: {:?}", e);                    return;                }                addr.send(SubscriberMessage::Signal {                    offer: offer.sdp                }).await;            }            Err(e) => tracing::error!("[User] create_offer: {:?}", e),        }    })}));

В момент получения оффера на фронте нам важно последовательно ответить на каждый поэтому воспользуемся Promise Queue.

async create_answer(sdp) {    this.signaling_queue = this.signaling_queue.then(      () => this.create_answer_task(sdp)    );    return this.signaling_queue;}async create_answer_task(sdp) {    try {        await this.subscriber_pc.setRemoteDescription(new RTCSessionDescription({            type: 'offer',            sdp: sdp        }));        const answer = await this.subscriber_pc.createAnswer();        await this.subscriber_pc.setLocalDescription(answer);        this.ws.send(JSON.stringify({            kind: "rtc",            target: 'subscriber',            type: 'answer',            sdp: answer.sdp        }));    } catch (error) {        console.error("Ошибка при обработке SFU Offer:", error);    }}

Нам важна последовательность поскольку, если начать одновременно выполнять два асинхронных шага (например, параллельно обрабатывать два разных Offer от сервера), свойство signalingState у браузерного объекта связи уйдет в конфликт (например, изменится на have-remote-offer посреди выполнения первого процесса), и браузер выбросит неустранимую ошибку. Очередь гарантирует, что текущая транзакция изменения связи завершится до того, как начнется следующая.

Посланный answer мы также должны обработать на сервере вызвав set_remote_description у подключения «подписчика».

MessageType::Answer { sdp } => {    let answer_desc = RTCSessionDescription::answer(sdp)?;    self.pc.set_remote_description(answer_desc).await?;},

Когда подписчик применяет Offer от SFU через setRemoteDescription, браузер обнаруживает в SDP новые медиа-треки и немедленно генерирует события ontrack — ещё до отправки Answer. Поскольку аудио и видео это два разных трека мы должны проверить, не был ли добавлен стрим до этого, и если был, то просто добавляем трек к нему.

subscriber_pc.ontrack = ({ streams, track }) => {  const remote_stream = streams[0];  const user_id = remoteStream.id.split("_")[1];  let user = users.value.get(user_id);  if (!!user) {      user.stream.addTrack(track);      return;  }  users.value.set(user_id, {      stream: remoteStream,      isLocal: false  });};

Но не все так просто, да, мы добавили трек, но ничего не будет работать пока мы не настроим пересылку пакетов от паблишера первого юзера к подпискам второго и наоборот.

Акторы RtpPacketGatewayRouter и PacketForwarder

Для первой статьи я использую упрощенную версию, впрочем ее будет достаточно для того чтобы у нас работало и видео, и аудио.

impl<A: Actor> RtpPacketGatewayRouter<A>  {    pub fn spawn(track: Arc<TrackRemote>) -> Addr<Self> {        let this: Addr<RtpPacketGatewayRouter<A>> = Self {          subscriptions: HashSet::new()        }          .start_with_capacity(2048);        let receiver = this.clone();        tokio::spawn(async move {            loop {                let Ok((packet, _)) = track.read_rtp().await                    .map_err(|e| tracing::error!("[RtpPacketGatewayRouter] read_rtp {e}")) else {                         receiver.terminate().await;                        break;                    };                let Ok(_) = receiver.do_send(RtpPacketGatewayRouterMessage::RtpPacket(packet))                    .map_err(|_| tracing::error!("[RtpPacketGatewayRouter] send RtpPacketForwarderMessage::RtpPacket(packet)"))                 else { break; };            }        });        this    }}
async fn handle(&mut self, _ctx: &mut crate::actor::Ctx<'_, Self>, msg: Self::Message) {  match msg {      RtpPacketGatewayRouterMessage::Subscribe(sub) => {          self.subscriptions.insert(sub);          self.pc.write_rtcp(&[Box::new(FullIntraRequest {              media_ssrc: self.ssrc,              sender_ssrc: 0,              fir: [                  FirEntry { ssrc: self.ssrc, sequence_number: self.fir_sequence }              ].to_vec()            })]).await?;            self.fir_sequence = self.fir_sequence.wrapping_add(1);      },      RtpPacketGatewayRouterMessage::RtpPacket(packet) => {          self.subscriptions.iter().for_each(|sub| {             sub.do_send(packet.clone());           });      },      RtpPacketGatewayRouterMessage::Unsubscribe(sub) => {          self.subscriptions.remove(&sub);      }  }}

Актор по сути выполняет функцию роутера, он умеет добавлять новых подписчиков, удалять их, и главное пересылать пакеты, прочитанные из трека актуальным подписчикам. Во время пересылки нам приходится клонировать пакеты, но это не является проблемой поскольку сам пакет, это просто хедер + Bytes (ссылка на память + атомарный счетчик).

Во время добавления подписчика нам нужно отправить FullIntraRequest.

Full Intra Request (FIR) — это механизм обратной связи в протоколах WebRTC и RTP/RTCP, используемый при потоковой передаче видео. Когда принимающая сторона обнаруживает поврежденный или пропущенный кадр, она отправляет отправителю пакет FIR. Этот пакет заставляет отправителя немедленно сгенерировать и передать новый, независимый опорный кадр (Intra-кадр, I-frame или ключевой кадр).

Этот запрос критически важен для непрерывности видео и устойчивости к ошибкам, особенно в WebRTC. Декодер не может правильно обрабатывать последующие зависимые кадры (P-frames или B-frames) без наличия начального ключевого кадра. Принудительная генерация Intra-кадра позволяет быстро восстановить качество потока после потери пакетов или сбоев при передаче.

Актор PacketForwarder работает еще проще, по сути после того как его адрес отправят роутера ему нужно лишь получать новые пакеты и писать их в трек подписчика.

let forwarder = PacketForwarder { track: output_track.clone() }      .start_with_capacity(256);gateway_router    .send(RtpPacketGatewayRouterMessage::Subscribe(forwarder.clone()))    .await?;
pub struct PacketForwarder {    pub track: Arc<TrackLocalStaticRTP>,}impl PacketForwarder {    async fn forward(&self, r: Packet) -> Result<(), Error> {        self.track.write_rtp(&r).await?;        Ok(())    }}
impl Actor for PacketForwarder {    type Message = AudioPacketForwarderMessage;    async fn handle(&mut self, ctx: &mut crate::actor::Ctx<'_, Self>, packet: Self::Message) {        if let Err(e) = self.forward(packet.packet).await {            self.stop(ctx).await;        }    }}

Отключение пользователя

В случае отключения пользователя пользователя актор юзер будет остановлен, дропнет подписчика и паблишера, и перешлет сообщение комнате о том, что необходимо удалить юзера и оповестить остальным пользователей.

async fn stopping(self, _: &Ctx<'_, Self>) {    self.subscriber.try_terminate().await;    self.publisher.try_terminate().await;    let _ = self.room.send(RoomMessage::Leave {         peer_id: self.peer_id     }).await;}
RoomMessage::Leave { peer_id } => {    self.peers.remove(&peer_id);    for (_, Peer { user, .. }) in self.peers.iter() {        user.send(UserMessage::Unsubscribe { user_id: peer_id }).await;    }}

После чего остальные пользователи отключат акторов подписок на вышедшего юзера.

async fn disconnect_from_user(&mut self, speaker_id: Uuid) -> Result<(), Error> {    let audio_subscription = self.audio_subscriptions        .remove(&speaker_id)        .ok_or(Error::SystemError { message: "subscription not found".into() })?;    audio_subscription.terminate().await?;    let video_subscription = self.video_subscriptions        .remove(&speaker_id)        .ok_or(Error::SystemError { message: "subscription not found".into() })?;    video_subscription.terminate().await?;    Ok(())}

В результате отключения акторы подписки пошлют сообщение об отписке RtpPacketGatewayRouter и удалят трек из соединения подписчика.

Выход в свет

Для того чтобы проверить, что все действительно работает правильно, нам нужно убедиться, что все будет работать также не только в локальной сети, но и в глобальном интернете.

Чтобы протестировать наше сервер нам необязательно разворачивать его на впске, хоть нам и нужны протоколы SSL/TLS, из за того что браузеры (Chrome, Safari, Firefox) блокируют доступ к медиаустройствам на сайтах, работающих по незащищенному протоколу HTTP. Исключение составляет лишь localhost для целей локального тестирования. Мы можем воспользоваться tuna, который работает как полноценный аналог Ngrok. Основное назначение таких программ — создание безопасного туннеля, который дает прямой доступ из интернета к локальному серверу или приложению (localhost) без необходимости настраивать роутер и иметь «белый» IP-адрес.

tuna http 8080INFO[16:31:04] Web Interface: http://127.0.0.1:4040         INFO[16:31:04] Forwarding https://ua3buj-134-17-184-155.ru.tuna.am -> 127.0.0.1:8080
Братки на созвоне

Братки на созвоне

Заключение

В данной статье я постарался наглядно показать базовый набор функций SFU для передачи медиа потока от одного пользователя остальным в комнате.

В следующей статье я продемонстрирую, то как я реализовал Simulcast (переключение слоев качества в зависимости от соединения пользователя), обработку ключевого кадра, переподключение в случае смены сети, а также напишем тесты для более быстрого дебага нашего приложения.
Всем большое спасибо за то что прочитали до конца мой дебют, буду рад конструктивной критике!

ссылка на оригинал статьи https://habr.com/ru/articles/1055732/