Skip to content

Commit 05cafb3

Browse files
committed
Close unhandled rtcp simulcast streams
handleIncomingSSRC will call streamsForSSRC which opens rtp/rtcp streams that if unhandled can be leaked resources. Now we will proactively open them before calling handleIncomingSSRC and close then later. In the future it would be better to do this inside handleIncomingSSRC to protect other callers.
1 parent a930500 commit 05cafb3

File tree

2 files changed

+31
-9
lines changed

2 files changed

+31
-9
lines changed

dtlstransport.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type DTLSTransport struct {
5151

5252
srtpSession, srtcpSession atomic.Value
5353
srtpEndpoint, srtcpEndpoint *mux.Endpoint
54-
simulcastStreams []*srtp.ReadStreamSRTP
54+
simulcastStreams []simulcastStreamPair
5555
srtpReady chan struct{}
5656

5757
dtlsMatcher mux.MatchFunc
@@ -60,6 +60,11 @@ type DTLSTransport struct {
6060
log logging.LeveledLogger
6161
}
6262

63+
type simulcastStreamPair struct {
64+
srtp *srtp.ReadStreamSRTP
65+
srtcp *srtp.ReadStreamSRTCP
66+
}
67+
6368
// NewDTLSTransport creates a new DTLSTransport.
6469
// This constructor is part of the ORTC API. It is not
6570
// meant to be used together with the basic WebRTC API.
@@ -436,7 +441,8 @@ func (t *DTLSTransport) Stop() error {
436441
}
437442

438443
for i := range t.simulcastStreams {
439-
closeErrs = append(closeErrs, t.simulcastStreams[i].Close())
444+
closeErrs = append(closeErrs, t.simulcastStreams[i].srtp.Close())
445+
closeErrs = append(closeErrs, t.simulcastStreams[i].srtcp.Close())
440446
}
441447

442448
if t.conn != nil {
@@ -477,11 +483,11 @@ func (t *DTLSTransport) ensureICEConn() error {
477483
return nil
478484
}
479485

480-
func (t *DTLSTransport) storeSimulcastStream(s *srtp.ReadStreamSRTP) {
486+
func (t *DTLSTransport) storeSimulcastStream(srtpReadStream *srtp.ReadStreamSRTP, srtcpReadStream *srtp.ReadStreamSRTCP) {
481487
t.lock.Lock()
482488
defer t.lock.Unlock()
483489

484-
t.simulcastStreams = append(t.simulcastStreams, s)
490+
t.simulcastStreams = append(t.simulcastStreams, simulcastStreamPair{srtpReadStream, srtcpReadStream})
485491
}
486492

487493
func (t *DTLSTransport) streamsForSSRC(ssrc SSRC, streamInfo interceptor.StreamInfo) (*srtp.ReadStreamSRTP, interceptor.RTPReader, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {

peerconnection.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,26 +1670,42 @@ func (pc *PeerConnection) undeclaredRTPMediaProcessor() {
16701670
return
16711671
}
16721672

1673-
stream, ssrc, err := srtpSession.AcceptStream()
1673+
srtcpSession, err := pc.dtlsTransport.getSRTCPSession()
1674+
if err != nil {
1675+
pc.log.Warnf("undeclaredMediaProcessor failed to open SrtcpSession: %v", err)
1676+
return
1677+
}
1678+
1679+
srtpReadStream, ssrc, err := srtpSession.AcceptStream()
16741680
if err != nil {
16751681
pc.log.Warnf("Failed to accept RTP %v", err)
16761682
return
16771683
}
16781684

1685+
// open accompanying srtcp stream
1686+
srtcpReadStream, err := srtcpSession.OpenReadStream(ssrc)
1687+
if err != nil {
1688+
pc.log.Warnf("Failed to open RTCP stream for %d: %v", ssrc, err)
1689+
return
1690+
}
1691+
16791692
if pc.isClosed.get() {
1680-
if err = stream.Close(); err != nil {
1693+
if err = srtpReadStream.Close(); err != nil {
16811694
pc.log.Warnf("Failed to close RTP stream %v", err)
16821695
}
1696+
if err = srtcpReadStream.Close(); err != nil {
1697+
pc.log.Warnf("Failed to close RTCP stream %v", err)
1698+
}
16831699
continue
16841700
}
16851701

1702+
pc.dtlsTransport.storeSimulcastStream(srtpReadStream, srtcpReadStream)
1703+
16861704
if ssrc == 0 {
16871705
go pc.handleNonMediaBandwidthProbe()
16881706
continue
16891707
}
16901708

1691-
pc.dtlsTransport.storeSimulcastStream(stream)
1692-
16931709
if atomic.AddUint64(&simulcastRoutineCount, 1) >= simulcastMaxProbeRoutines {
16941710
atomic.AddUint64(&simulcastRoutineCount, ^uint64(0))
16951711
pc.log.Warn(ErrSimulcastProbeOverflow.Error())
@@ -1701,7 +1717,7 @@ func (pc *PeerConnection) undeclaredRTPMediaProcessor() {
17011717
pc.log.Errorf(incomingUnhandledRTPSsrc, ssrc, err)
17021718
}
17031719
atomic.AddUint64(&simulcastRoutineCount, ^uint64(0))
1704-
}(stream, SSRC(ssrc))
1720+
}(srtpReadStream, SSRC(ssrc))
17051721
}
17061722
}
17071723

0 commit comments

Comments
 (0)