Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interceptor to aggregate CCFB reports #300

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions internal/test/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
// RTCPWithError is used to send a batch of rtcp packets or an error on a channel
type RTCPWithError struct {
Packets []rtcp.Packet
Attr interceptor.Attributes
Err error
}

Expand Down Expand Up @@ -107,21 +108,21 @@
go func() {
buf := make([]byte, 1500)
for {
i, _, err := s.rtcpReader.Read(buf, interceptor.Attributes{})
i, attr, err := s.rtcpReader.Read(buf, interceptor.Attributes{})
if err != nil {
if !errors.Is(err, io.EOF) {
s.rtcpInModified <- RTCPWithError{Err: err}
s.rtcpInModified <- RTCPWithError{Attr: attr, Err: err}

Check warning on line 114 in internal/test/mock_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock_stream.go#L114

Added line #L114 was not covered by tests
}
return
}

pkts, err := rtcp.Unmarshal(buf[:i])
if err != nil {
s.rtcpInModified <- RTCPWithError{Err: err}
s.rtcpInModified <- RTCPWithError{Attr: attr, Err: err}
return
}

s.rtcpInModified <- RTCPWithError{Packets: pkts}
s.rtcpInModified <- RTCPWithError{Attr: attr, Packets: pkts}
}
}()
go func() {
Expand Down
59 changes: 59 additions & 0 deletions pkg/ccfb/ccfb_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ccfb

import (
"time"

"github.com/pion/interceptor/internal/ntp"
"github.com/pion/rtcp"
)

type acknowledgement struct {
seqNr uint16
arrived bool
arrival time.Time
ecn rtcp.ECN
}

func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Time, map[uint32][]acknowledgement) {
if feedback == nil {
return time.Time{}, nil
}
result := map[uint32][]acknowledgement{}
referenceTime := ntp.ToTime32(feedback.ReportTimestamp, ts)
for _, rb := range feedback.ReportBlocks {
result[rb.MediaSSRC] = convertMetricBlock(referenceTime, rb.BeginSequence, rb.MetricBlocks)
}
return referenceTime, result
}

func convertMetricBlock(reference time.Time, seqNrOffset uint16, blocks []rtcp.CCFeedbackMetricBlock) []acknowledgement {
reports := make([]acknowledgement, len(blocks))
for i, mb := range blocks {
if mb.Received {
arrival := time.Time{}

// RFC 8888 states: If the measurement is unavailable or if the
// arrival time of the RTP packet is after the time represented by
// the RTS field, then an ATO value of 0x1FFF MUST be reported for
// the packet. In that case, we set a zero time.Time value.
if mb.ArrivalTimeOffset != 0x1FFF {
delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second))
arrival = reference.Add(-delta)
}
reports[i] = acknowledgement{
seqNr: seqNrOffset + uint16(i), // nolint:gosec
arrived: true,
arrival: arrival,
ecn: mb.ECN,
}
} else {
reports[i] = acknowledgement{
seqNr: seqNrOffset + uint16(i), // nolint:gosec
arrived: false,
arrival: time.Time{},
ecn: 0,
}
}
}
return reports
}
193 changes: 193 additions & 0 deletions pkg/ccfb/ccfb_receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package ccfb

import (
"fmt"
"testing"
"time"

"github.com/pion/interceptor/internal/ntp"
"github.com/pion/rtcp"
"github.com/stretchr/testify/assert"
)

func TestConvertCCFB(t *testing.T) {
timeZero := time.Now()
cases := []struct {
ts time.Time
feedback *rtcp.CCFeedbackReport
expect map[uint32][]acknowledgement
expectTS time.Time
}{
{},
{
ts: timeZero.Add(2 * time.Second),
feedback: &rtcp.CCFeedbackReport{
SenderSSRC: 1,
ReportBlocks: []rtcp.CCFeedbackReportBlock{
{
MediaSSRC: 2,
BeginSequence: 17,
MetricBlocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
},
},
},
ReportTimestamp: ntp.ToNTP32(timeZero.Add(time.Second)),
},
expect: map[uint32][]acknowledgement{
2: {
{
seqNr: 17,
arrived: true,
arrival: timeZero.Add(500 * time.Millisecond),
ecn: 0,
},
},
},
expectTS: timeZero.Add(time.Second),
},
}
for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
resTS, res := convertCCFB(tc.ts, tc.feedback)

assert.InDelta(t, tc.expectTS.UnixNano(), resTS.UnixNano(), float64(time.Millisecond.Nanoseconds()))

// Can't directly check equality since arrival timestamp conversions
// may be slightly off due to ntp conversions.
assert.Equal(t, len(tc.expect), len(res))
for i, acks := range tc.expect {
for j, ack := range acks {
assert.Equal(t, ack.seqNr, res[i][j].seqNr)
assert.Equal(t, ack.arrived, res[i][j].arrived)
assert.Equal(t, ack.ecn, res[i][j].ecn)
assert.InDelta(t, ack.arrival.UnixNano(), res[i][j].arrival.UnixNano(), float64(time.Millisecond.Nanoseconds()))
}
}
})
}
}

func TestConvertMetricBlock(t *testing.T) {
cases := []struct {
ts time.Time
reference time.Time
seqNrOffset uint16
blocks []rtcp.CCFeedbackMetricBlock
expected []acknowledgement
}{
{
ts: time.Time{},
reference: time.Time{},
seqNrOffset: 0,
blocks: []rtcp.CCFeedbackMetricBlock{},
expected: []acknowledgement{},
},
{
ts: time.Time{}.Add(2 * time.Second),
reference: time.Time{}.Add(time.Second),
seqNrOffset: 3,
blocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
{
Received: false,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0,
},
},
expected: []acknowledgement{
{
seqNr: 3,
arrived: true,
arrival: time.Time{}.Add(500 * time.Millisecond),
ecn: 0,
},
{
seqNr: 4,
arrived: false,
arrival: time.Time{},
ecn: 0,
},
{
seqNr: 5,
arrived: true,
arrival: time.Time{}.Add(time.Second),
ecn: 0,
},
},
},
{
ts: time.Time{}.Add(2 * time.Second),
reference: time.Time{}.Add(time.Second),
seqNrOffset: 3,
blocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
{
Received: false,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0x1FFF,
},
},
expected: []acknowledgement{
{
seqNr: 3,
arrived: true,
arrival: time.Time{}.Add(500 * time.Millisecond),
ecn: 0,
},
{
seqNr: 4,
arrived: false,
arrival: time.Time{},
ecn: 0,
},
{
seqNr: 5,
arrived: true,
arrival: time.Time{}.Add(time.Second),
ecn: 0,
},
{
seqNr: 6,
arrived: true,
arrival: time.Time{},
ecn: 0,
},
},
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
res := convertMetricBlock(tc.reference, tc.seqNrOffset, tc.blocks)
assert.Equal(t, tc.expected, res)
})
}
}
29 changes: 29 additions & 0 deletions pkg/ccfb/duplicate_ack_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ccfb

// DuplicateAckFilter is a helper to remove duplicate acks from a Report.
type DuplicateAckFilter struct {
highestAckedBySSRC map[uint32]int64
}

// NewDuplicateAckFilter creates a new DuplicateAckFilter
func NewDuplicateAckFilter() *DuplicateAckFilter {
return &DuplicateAckFilter{
highestAckedBySSRC: make(map[uint32]int64),
}
}

// Filter filters duplicate acks. It filters out all acks for packets with a
// sequence number smaller than the highest seen sequence number for each SSRC.
func (f *DuplicateAckFilter) Filter(reports Report) {
for ssrc, prs := range reports.SSRCToPacketReports {
n := 0
for _, report := range prs {
if highest, ok := f.highestAckedBySSRC[ssrc]; !ok || report.SeqNr > highest {
f.highestAckedBySSRC[ssrc] = report.SeqNr
prs[n] = report
n++
}
}
reports.SSRCToPacketReports[ssrc] = prs[:n]
}
}
Loading
Loading