Skip to content

Commit

Permalink
fix(sfu): fix deadlock (#419)
Browse files Browse the repository at this point in the history
* fix(sfu): Add deadlock finder

* fix(sfu): Add deadlock on receiver

* fix(lock): Dont keep lock on close

* fix(lock): Dont hold session lock during negotiations

* fix(lock): Fix sequencer OOR error

* fix(lock): Remove deadlock checker
  • Loading branch information
OrlandoCo authored Feb 9, 2021
1 parent c893fde commit 116cc2c
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 56 deletions.
3 changes: 1 addition & 2 deletions pkg/sfu/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/lucsky/cuid"

log "github.com/pion/ion-log"
"github.com/pion/webrtc/v3"
)
Expand Down Expand Up @@ -189,7 +188,7 @@ func (p *Peer) SetRemoteDescription(sdp webrtc.SessionDescription) error {

if p.negotiationPending {
p.negotiationPending = false
go p.subscriber.negotiate()
p.subscriber.negotiate()
}

return nil
Expand Down
62 changes: 38 additions & 24 deletions pkg/sfu/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ type Receiver interface {

// WebRTCReceiver receives a video track
type WebRTCReceiver struct {
sync.Mutex
rtcpMu sync.RWMutex
rtcpMu sync.Mutex
closeOnce sync.Once

peerID string
Expand All @@ -50,6 +49,7 @@ type WebRTCReceiver struct {
receiver *webrtc.RTPReceiver
codec webrtc.RTPCodecParameters
rtcpCh chan []rtcp.Packet
locks [3]sync.Mutex
buffers [3]*buffer.Buffer
upTracks [3]*webrtc.TrackRemote
stats [3]*stats.Stream
Expand Down Expand Up @@ -135,20 +135,20 @@ func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) {
track.trackType = SimpleDownTrack
}

w.Lock()
w.locks[layer].Lock()
w.downTracks[layer] = append(w.downTracks[layer], track)
w.Unlock()
w.locks[layer].Unlock()
}

func (w *WebRTCReceiver) SubDownTrack(track *DownTrack, layer int) error {
w.Lock()
w.locks[layer].Lock()
if dts := w.downTracks[layer]; dts != nil {
w.downTracks[layer] = append(dts, track)
} else {
w.Unlock()
w.locks[layer].Unlock()
return errNoReceiverFound
}
w.Unlock()
w.locks[layer].Unlock()
return nil
}

Expand Down Expand Up @@ -179,7 +179,7 @@ func (w *WebRTCReceiver) OnCloseHandler(fn func()) {

// DeleteDownTrack removes a DownTrack from a Receiver
func (w *WebRTCReceiver) DeleteDownTrack(layer int, id string) {
w.Lock()
w.locks[layer].Lock()
idx := -1
for i, dt := range w.downTracks[layer] {
if dt.peerID == id {
Expand All @@ -188,13 +188,13 @@ func (w *WebRTCReceiver) DeleteDownTrack(layer int, id string) {
}
}
if idx == -1 {
w.Unlock()
w.locks[layer].Unlock()
return
}
w.downTracks[layer][idx] = w.downTracks[layer][len(w.downTracks[layer])-1]
w.downTracks[layer][len(w.downTracks[layer])-1] = nil
w.downTracks[layer] = w.downTracks[layer][:len(w.downTracks[layer])-1]
w.Unlock()
w.locks[layer].Unlock()
}

func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet) {
Expand Down Expand Up @@ -262,28 +262,42 @@ func (w *WebRTCReceiver) RetransmitPackets(track *DownTrack, packets []packetMet

func (w *WebRTCReceiver) writeRTP(layer int) {
defer func() {
w.closeTracks(layer)
w.nackWorker.Stop()
if w.onCloseHandler != nil {
w.closeOnce.Do(w.onCloseHandler)
}
w.closeOnce.Do(func() {
go w.closeTracks()
})
}()
var del []int
for pkt := range w.buffers[layer].PacketChan() {
w.Lock()
for _, dt := range w.downTracks[layer] {
w.locks[layer].Lock()
for idx, dt := range w.downTracks[layer] {
if err := dt.WriteRTP(pkt); err == io.EOF {
go w.DeleteDownTrack(layer, dt.id)
del = append(del, idx)
}
}
if len(del) > 0 {
for _, idx := range del {
w.downTracks[layer][idx] = w.downTracks[layer][len(w.downTracks[layer])-1]
w.downTracks[layer][len(w.downTracks[layer])-1] = nil
w.downTracks[layer] = w.downTracks[layer][:len(w.downTracks[layer])-1]
}
del = del[:0]
}
w.Unlock()
w.locks[layer].Unlock()
}
}

// closeTracks close all tracks from Receiver
func (w *WebRTCReceiver) closeTracks(layer int) {
w.Lock()
defer w.Unlock()
for _, dt := range w.downTracks[layer] {
dt.Close()
func (w *WebRTCReceiver) closeTracks() {
for idx, layer := range w.downTracks {
w.locks[idx].Lock()
for _, dt := range layer {
dt.Close()
}
w.downTracks[idx] = w.downTracks[idx][:0]
w.locks[idx].Unlock()
}
w.nackWorker.Stop()
if w.onCloseHandler != nil {
w.onCloseHandler()
}
}
17 changes: 14 additions & 3 deletions pkg/sfu/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/binary"
"sync"
"time"

log "github.com/pion/ion-log"
)

const (
Expand Down Expand Up @@ -66,7 +68,8 @@ func (p packetMeta) getVP8PayloadMeta() (uint8, uint16) {

// Sequencer stores the packet sequence received by the down track
type sequencer struct {
sync.RWMutex
sync.Mutex
init bool
seq []byte
step int
headSN uint16
Expand All @@ -83,8 +86,9 @@ func newSequencer() *sequencer {
func (n *sequencer) push(sn, offSn uint16, timeStamp uint32, layer uint8, head bool) packetMeta {
n.Lock()
defer n.Unlock()
if n.headSN == 0 {
if !n.init {
n.headSN = offSn
n.init = true
}

step := 0
Expand All @@ -99,7 +103,14 @@ func (n *sequencer) push(sn, offSn uint16, timeStamp uint32, layer uint8, head b
step = n.step
n.headSN = offSn
} else {
step = n.step - int(n.headSN-offSn) + 1
step = n.step - int(n.headSN-offSn)
if step < 0 {
if step*-1 >= maxPacketMetaHistory {
log.Warnf("old packet received, can not be sequenced, head: %d received: %d", sn, offSn)
return packetMeta{}
}
step = maxPacketMetaHistory + step
}
}
off := step * packetMetaSize
binary.BigEndian.PutUint16(n.seq[off:off+2], sn)
Expand Down
63 changes: 36 additions & 27 deletions pkg/sfu/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (s *Session) RemovePeer(pid string) {

// Close session if no peers
if len(s.peers) == 0 && s.onCloseHandler != nil && !s.closed.get() {
s.onCloseHandler()
s.closed.set(true)
s.onCloseHandler()
}
}

Expand Down Expand Up @@ -90,49 +90,50 @@ func (s *Session) AddDatachannel(owner string, dc *webrtc.DataChannel) {
label := dc.Label()

s.mu.Lock()
defer s.mu.Unlock()

s.fanOutDCs = append(s.fanOutDCs, label)
s.peers[owner].subscriber.channels[label] = dc
peers := make([]*Peer, 0, len(s.peers))
for _, p := range s.peers {
if p.id == owner {
continue
}
peers = append(peers, p)
}
s.mu.Unlock()

dc.OnMessage(func(msg webrtc.DataChannelMessage) {
s.onMessage(owner, label, msg)
})

for pid, p := range s.peers {
// Don't add to self
if owner == pid {
continue
}
for _, p := range peers {
n, err := p.subscriber.AddDataChannel(label)

if err != nil {
log.Errorf("error adding datachannel: %s", err)
continue
}

pid := pid
pid := p.id
n.OnMessage(func(msg webrtc.DataChannelMessage) {
s.onMessage(pid, label, msg)
})

go p.subscriber.negotiate()
p.subscriber.negotiate()
}
}

// Publish will add a Sender to all peers in current Session from given
// Receiver
func (s *Session) Publish(router Router, r Receiver) {
s.mu.RLock()
defer s.mu.RUnlock()
peers := s.Peers()

for pid, p := range s.peers {
for _, p := range peers {
// Don't sub to self
if router.ID() == pid {
if router.ID() == p.id {
continue
}

log.Infof("Publishing track to peer %s", pid)
log.Infof("Publishing track to peer %s", p.id)

if err := router.AddDownTracks(p.subscriber, r); err != nil {
log.Errorf("Error subscribing transport to router: %s", err)
Expand All @@ -144,28 +145,32 @@ func (s *Session) Publish(router Router, r Receiver) {
// Subscribe will create a Sender for every other Receiver in the session
func (s *Session) Subscribe(peer *Peer) {
s.mu.RLock()
defer s.mu.RUnlock()
fdc := make([]string, len(s.fanOutDCs))
copy(fdc, s.fanOutDCs)
peers := make([]*Peer, 0, len(s.peers))
for _, p := range s.peers {
if p == peer {
continue
}
peers = append(peers, p)
}
s.mu.RUnlock()

// Subscribe to fan out datachannels
for _, label := range s.fanOutDCs {
for _, label := range fdc {
n, err := peer.subscriber.AddDataChannel(label)

if err != nil {
log.Errorf("error adding datachannel: %s", err)
continue
}

label := label
l := label
n.OnMessage(func(msg webrtc.DataChannelMessage) {
s.onMessage(peer.id, label, msg)
s.onMessage(peer.id, l, msg)
})
}

// Subscribe to publisher streams
for pid, p := range s.peers {
if pid == peer.id {
continue
}
for _, p := range peers {
err := p.publisher.GetRouter().AddDownTracks(peer.subscriber, nil)
if err != nil {
log.Errorf("Subscribing to router err: %v", err)
Expand All @@ -177,10 +182,14 @@ func (s *Session) Subscribe(peer *Peer) {
}

// Transports returns peers in this session
func (s *Session) Peers() map[string]*Peer {
func (s *Session) Peers() []*Peer {
s.mu.RLock()
defer s.mu.RUnlock()
return s.peers
p := make([]*Peer, 0, len(s.peers))
for _, peer := range s.peers {
p = append(p, peer)
}
return p
}

// OnClose is called when the session is closed
Expand Down

0 comments on commit 116cc2c

Please sign in to comment.