diff --git a/pkg/sfu/peer.go b/pkg/sfu/peer.go index ebaa23708..c0db974a0 100644 --- a/pkg/sfu/peer.go +++ b/pkg/sfu/peer.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/lucsky/cuid" - log "github.com/pion/ion-log" "github.com/pion/webrtc/v3" ) @@ -189,7 +188,7 @@ func (p *Peer) SetRemoteDescription(sdp webrtc.SessionDescription) error { if p.negotiationPending { p.negotiationPending = false - go p.subscriber.negotiate() + p.subscriber.negotiate() } return nil diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 619472763..4e48b6d3d 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -36,8 +36,7 @@ type Receiver interface { // WebRTCReceiver receives a video track type WebRTCReceiver struct { - sync.Mutex - rtcpMu sync.RWMutex + rtcpMu sync.Mutex closeOnce sync.Once peerID string @@ -50,6 +49,7 @@ type WebRTCReceiver struct { receiver *webrtc.RTPReceiver codec webrtc.RTPCodecParameters rtcpCh chan []rtcp.Packet + locks [3]sync.Mutex buffers [3]*buffer.Buffer upTracks [3]*webrtc.TrackRemote stats [3]*stats.Stream @@ -135,20 +135,20 @@ func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) { track.trackType = SimpleDownTrack } - w.Lock() + w.locks[layer].Lock() w.downTracks[layer] = append(w.downTracks[layer], track) - w.Unlock() + w.locks[layer].Unlock() } func (w *WebRTCReceiver) SubDownTrack(track *DownTrack, layer int) error { - w.Lock() + w.locks[layer].Lock() if dts := w.downTracks[layer]; dts != nil { w.downTracks[layer] = append(dts, track) } else { - w.Unlock() + w.locks[layer].Unlock() return errNoReceiverFound } - w.Unlock() + w.locks[layer].Unlock() return nil } @@ -179,7 +179,7 @@ func (w *WebRTCReceiver) OnCloseHandler(fn func()) { // DeleteDownTrack removes a DownTrack from a Receiver func (w *WebRTCReceiver) DeleteDownTrack(layer int, id string) { - w.Lock() + w.locks[layer].Lock() idx := -1 for i, dt := range w.downTracks[layer] { if dt.peerID == id { @@ -188,13 +188,13 @@ func (w *WebRTCReceiver) DeleteDownTrack(layer int, id string) { } } if idx == -1 { - w.Unlock() + w.locks[layer].Unlock() return } w.downTracks[layer][idx] = w.downTracks[layer][len(w.downTracks[layer])-1] w.downTracks[layer][len(w.downTracks[layer])-1] = nil w.downTracks[layer] = w.downTracks[layer][:len(w.downTracks[layer])-1] - w.Unlock() + w.locks[layer].Unlock() } func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet) { @@ -262,28 +262,42 @@ func (w *WebRTCReceiver) RetransmitPackets(track *DownTrack, packets []packetMet func (w *WebRTCReceiver) writeRTP(layer int) { defer func() { - w.closeTracks(layer) - w.nackWorker.Stop() - if w.onCloseHandler != nil { - w.closeOnce.Do(w.onCloseHandler) - } + w.closeOnce.Do(func() { + go w.closeTracks() + }) }() + var del []int for pkt := range w.buffers[layer].PacketChan() { - w.Lock() - for _, dt := range w.downTracks[layer] { + w.locks[layer].Lock() + for idx, dt := range w.downTracks[layer] { if err := dt.WriteRTP(pkt); err == io.EOF { - go w.DeleteDownTrack(layer, dt.id) + del = append(del, idx) + } + } + if len(del) > 0 { + for _, idx := range del { + w.downTracks[layer][idx] = w.downTracks[layer][len(w.downTracks[layer])-1] + w.downTracks[layer][len(w.downTracks[layer])-1] = nil + w.downTracks[layer] = w.downTracks[layer][:len(w.downTracks[layer])-1] } + del = del[:0] } - w.Unlock() + w.locks[layer].Unlock() } } // closeTracks close all tracks from Receiver -func (w *WebRTCReceiver) closeTracks(layer int) { - w.Lock() - defer w.Unlock() - for _, dt := range w.downTracks[layer] { - dt.Close() +func (w *WebRTCReceiver) closeTracks() { + for idx, layer := range w.downTracks { + w.locks[idx].Lock() + for _, dt := range layer { + dt.Close() + } + w.downTracks[idx] = w.downTracks[idx][:0] + w.locks[idx].Unlock() + } + w.nackWorker.Stop() + if w.onCloseHandler != nil { + w.onCloseHandler() } } diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 3cca7333c..00c51840b 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "sync" "time" + + log "github.com/pion/ion-log" ) const ( @@ -66,7 +68,8 @@ func (p packetMeta) getVP8PayloadMeta() (uint8, uint16) { // Sequencer stores the packet sequence received by the down track type sequencer struct { - sync.RWMutex + sync.Mutex + init bool seq []byte step int headSN uint16 @@ -83,8 +86,9 @@ func newSequencer() *sequencer { func (n *sequencer) push(sn, offSn uint16, timeStamp uint32, layer uint8, head bool) packetMeta { n.Lock() defer n.Unlock() - if n.headSN == 0 { + if !n.init { n.headSN = offSn + n.init = true } step := 0 @@ -99,7 +103,14 @@ func (n *sequencer) push(sn, offSn uint16, timeStamp uint32, layer uint8, head b step = n.step n.headSN = offSn } else { - step = n.step - int(n.headSN-offSn) + 1 + step = n.step - int(n.headSN-offSn) + if step < 0 { + if step*-1 >= maxPacketMetaHistory { + log.Warnf("old packet received, can not be sequenced, head: %d received: %d", sn, offSn) + return packetMeta{} + } + step = maxPacketMetaHistory + step + } } off := step * packetMetaSize binary.BigEndian.PutUint16(n.seq[off:off+2], sn) diff --git a/pkg/sfu/session.go b/pkg/sfu/session.go index a654cc4a2..01bc5ed2b 100644 --- a/pkg/sfu/session.go +++ b/pkg/sfu/session.go @@ -51,8 +51,8 @@ func (s *Session) RemovePeer(pid string) { // Close session if no peers if len(s.peers) == 0 && s.onCloseHandler != nil && !s.closed.get() { - s.onCloseHandler() s.closed.set(true) + s.onCloseHandler() } } @@ -90,20 +90,22 @@ func (s *Session) AddDatachannel(owner string, dc *webrtc.DataChannel) { label := dc.Label() s.mu.Lock() - defer s.mu.Unlock() - s.fanOutDCs = append(s.fanOutDCs, label) s.peers[owner].subscriber.channels[label] = dc + peers := make([]*Peer, 0, len(s.peers)) + for _, p := range s.peers { + if p.id == owner { + continue + } + peers = append(peers, p) + } + s.mu.Unlock() dc.OnMessage(func(msg webrtc.DataChannelMessage) { s.onMessage(owner, label, msg) }) - for pid, p := range s.peers { - // Don't add to self - if owner == pid { - continue - } + for _, p := range peers { n, err := p.subscriber.AddDataChannel(label) if err != nil { @@ -111,28 +113,27 @@ func (s *Session) AddDatachannel(owner string, dc *webrtc.DataChannel) { continue } - pid := pid + pid := p.id n.OnMessage(func(msg webrtc.DataChannelMessage) { s.onMessage(pid, label, msg) }) - go p.subscriber.negotiate() + p.subscriber.negotiate() } } // Publish will add a Sender to all peers in current Session from given // Receiver func (s *Session) Publish(router Router, r Receiver) { - s.mu.RLock() - defer s.mu.RUnlock() + peers := s.Peers() - for pid, p := range s.peers { + for _, p := range peers { // Don't sub to self - if router.ID() == pid { + if router.ID() == p.id { continue } - log.Infof("Publishing track to peer %s", pid) + log.Infof("Publishing track to peer %s", p.id) if err := router.AddDownTracks(p.subscriber, r); err != nil { log.Errorf("Error subscribing transport to router: %s", err) @@ -144,28 +145,32 @@ func (s *Session) Publish(router Router, r Receiver) { // Subscribe will create a Sender for every other Receiver in the session func (s *Session) Subscribe(peer *Peer) { s.mu.RLock() - defer s.mu.RUnlock() + fdc := make([]string, len(s.fanOutDCs)) + copy(fdc, s.fanOutDCs) + peers := make([]*Peer, 0, len(s.peers)) + for _, p := range s.peers { + if p == peer { + continue + } + peers = append(peers, p) + } + s.mu.RUnlock() // Subscribe to fan out datachannels - for _, label := range s.fanOutDCs { + for _, label := range fdc { n, err := peer.subscriber.AddDataChannel(label) - if err != nil { log.Errorf("error adding datachannel: %s", err) continue } - - label := label + l := label n.OnMessage(func(msg webrtc.DataChannelMessage) { - s.onMessage(peer.id, label, msg) + s.onMessage(peer.id, l, msg) }) } // Subscribe to publisher streams - for pid, p := range s.peers { - if pid == peer.id { - continue - } + for _, p := range peers { err := p.publisher.GetRouter().AddDownTracks(peer.subscriber, nil) if err != nil { log.Errorf("Subscribing to router err: %v", err) @@ -177,10 +182,14 @@ func (s *Session) Subscribe(peer *Peer) { } // Transports returns peers in this session -func (s *Session) Peers() map[string]*Peer { +func (s *Session) Peers() []*Peer { s.mu.RLock() defer s.mu.RUnlock() - return s.peers + p := make([]*Peer, 0, len(s.peers)) + for _, peer := range s.peers { + p = append(p, peer) + } + return p } // OnClose is called when the session is closed