Skip to content

Commit 271a101

Browse files
committed
Add CCFB interceptor unit tests
1 parent 0c4e7a6 commit 271a101

File tree

5 files changed

+387
-36
lines changed

5 files changed

+387
-36
lines changed

pkg/ccfb/ccfb_receiver.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,35 @@ type acknowledgementList struct {
2020
}
2121

2222
func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) map[uint32]acknowledgementList {
23+
if feedback == nil {
24+
return nil
25+
}
2326
result := map[uint32]acknowledgementList{}
24-
referenceTime := ntp.ToTime(uint64(feedback.ReportTimestamp) << 16)
27+
referenceTime := ntp.ToTime32(feedback.ReportTimestamp, ts)
2528
for _, rb := range feedback.ReportBlocks {
2629
result[rb.MediaSSRC] = convertMetricBlock(ts, referenceTime, rb.BeginSequence, rb.MetricBlocks)
2730
}
2831
return result
2932
}
3033

31-
func convertMetricBlock(ts time.Time, referenceTime time.Time, seqNrOffset uint16, blocks []rtcp.CCFeedbackMetricBlock) acknowledgementList {
34+
func convertMetricBlock(ts time.Time, reference time.Time, seqNrOffset uint16, blocks []rtcp.CCFeedbackMetricBlock) acknowledgementList {
3235
reports := make([]acknowledgement, len(blocks))
3336
for i, mb := range blocks {
3437
if mb.Received {
35-
delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second))
38+
arrival := time.Time{}
39+
40+
/// RFC 8888 states: If the measurement is unavailable or if the
41+
//arrival time of the RTP packet is after the time represented by
42+
//the RTS field, then an ATO value of 0x1FFF MUST be reported for
43+
//the packet. In that case, we set a zero time.Time value.
44+
if mb.ArrivalTimeOffset != 0x1FFF {
45+
delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second))
46+
arrival = reference.Add(-delta)
47+
}
3648
reports[i] = acknowledgement{
3749
seqNr: seqNrOffset + uint16(i),
3850
arrived: true,
39-
arrival: referenceTime.Add(-delta),
51+
arrival: arrival,
4052
ecn: mb.ECN,
4153
}
4254
} else {

pkg/ccfb/ccfb_receiver_test.go

+202
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package ccfb
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/pion/interceptor/internal/ntp"
9+
"github.com/pion/rtcp"
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func TestConvertCCFB(t *testing.T) {
14+
timeZero := time.Now()
15+
cases := []struct {
16+
ts time.Time
17+
feedback *rtcp.CCFeedbackReport
18+
expect map[uint32]acknowledgementList
19+
}{
20+
{},
21+
{
22+
ts: timeZero.Add(2 * time.Second),
23+
feedback: &rtcp.CCFeedbackReport{
24+
SenderSSRC: 1,
25+
ReportBlocks: []rtcp.CCFeedbackReportBlock{
26+
{
27+
MediaSSRC: 2,
28+
BeginSequence: 17,
29+
MetricBlocks: []rtcp.CCFeedbackMetricBlock{
30+
{
31+
Received: true,
32+
ECN: 0,
33+
ArrivalTimeOffset: 512,
34+
},
35+
},
36+
},
37+
},
38+
ReportTimestamp: ntp.ToNTP32(timeZero.Add(time.Second)),
39+
},
40+
expect: map[uint32]acknowledgementList{
41+
2: {
42+
ts: timeZero.Add(2 * time.Second),
43+
acks: []acknowledgement{
44+
{
45+
seqNr: 17,
46+
arrived: true,
47+
arrival: timeZero.Add(500 * time.Millisecond),
48+
ecn: 0,
49+
},
50+
},
51+
},
52+
},
53+
},
54+
}
55+
for i, tc := range cases {
56+
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
57+
res := convertCCFB(tc.ts, tc.feedback)
58+
59+
// Can't directly check equality since arrival timestamp conversions
60+
// may be slightly off due to ntp conversions.
61+
assert.Equal(t, len(tc.expect), len(res))
62+
for i, ee := range tc.expect {
63+
assert.Equal(t, ee.ts, res[i].ts)
64+
for j, ack := range ee.acks {
65+
assert.Equal(t, ack.seqNr, res[i].acks[j].seqNr)
66+
assert.Equal(t, ack.arrived, res[i].acks[j].arrived)
67+
assert.Equal(t, ack.ecn, res[i].acks[j].ecn)
68+
assert.InDelta(t, ack.arrival.UnixNano(), res[i].acks[j].arrival.UnixNano(), float64(time.Millisecond.Nanoseconds()))
69+
}
70+
}
71+
})
72+
}
73+
}
74+
75+
func TestConvertMetricBlock(t *testing.T) {
76+
cases := []struct {
77+
ts time.Time
78+
reference time.Time
79+
seqNrOffset uint16
80+
blocks []rtcp.CCFeedbackMetricBlock
81+
expected acknowledgementList
82+
}{
83+
{
84+
ts: time.Time{},
85+
reference: time.Time{},
86+
seqNrOffset: 0,
87+
blocks: []rtcp.CCFeedbackMetricBlock{},
88+
expected: acknowledgementList{
89+
ts: time.Time{},
90+
acks: []acknowledgement{},
91+
},
92+
},
93+
{
94+
ts: time.Time{}.Add(2 * time.Second),
95+
reference: time.Time{}.Add(time.Second),
96+
seqNrOffset: 3,
97+
blocks: []rtcp.CCFeedbackMetricBlock{
98+
{
99+
Received: true,
100+
ECN: 0,
101+
ArrivalTimeOffset: 512,
102+
},
103+
{
104+
Received: false,
105+
ECN: 0,
106+
ArrivalTimeOffset: 0,
107+
},
108+
{
109+
Received: true,
110+
ECN: 0,
111+
ArrivalTimeOffset: 0,
112+
},
113+
},
114+
expected: acknowledgementList{
115+
ts: time.Time{}.Add(2 * time.Second),
116+
acks: []acknowledgement{
117+
{
118+
seqNr: 3,
119+
arrived: true,
120+
arrival: time.Time{}.Add(500 * time.Millisecond),
121+
ecn: 0,
122+
},
123+
{
124+
seqNr: 4,
125+
arrived: false,
126+
arrival: time.Time{},
127+
ecn: 0,
128+
},
129+
{
130+
seqNr: 5,
131+
arrived: true,
132+
arrival: time.Time{}.Add(time.Second),
133+
ecn: 0,
134+
},
135+
},
136+
},
137+
},
138+
{
139+
ts: time.Time{}.Add(2 * time.Second),
140+
reference: time.Time{}.Add(time.Second),
141+
seqNrOffset: 3,
142+
blocks: []rtcp.CCFeedbackMetricBlock{
143+
{
144+
Received: true,
145+
ECN: 0,
146+
ArrivalTimeOffset: 512,
147+
},
148+
{
149+
Received: false,
150+
ECN: 0,
151+
ArrivalTimeOffset: 0,
152+
},
153+
{
154+
Received: true,
155+
ECN: 0,
156+
ArrivalTimeOffset: 0,
157+
},
158+
{
159+
Received: true,
160+
ECN: 0,
161+
ArrivalTimeOffset: 0x1FFF,
162+
},
163+
},
164+
expected: acknowledgementList{
165+
ts: time.Time{}.Add(2 * time.Second),
166+
acks: []acknowledgement{
167+
{
168+
seqNr: 3,
169+
arrived: true,
170+
arrival: time.Time{}.Add(500 * time.Millisecond),
171+
ecn: 0,
172+
},
173+
{
174+
seqNr: 4,
175+
arrived: false,
176+
arrival: time.Time{},
177+
ecn: 0,
178+
},
179+
{
180+
seqNr: 5,
181+
arrived: true,
182+
arrival: time.Time{}.Add(time.Second),
183+
ecn: 0,
184+
},
185+
{
186+
seqNr: 6,
187+
arrived: true,
188+
arrival: time.Time{},
189+
ecn: 0,
190+
},
191+
},
192+
},
193+
},
194+
}
195+
196+
for i, tc := range cases {
197+
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
198+
res := convertMetricBlock(tc.ts, tc.reference, tc.seqNrOffset, tc.blocks)
199+
assert.Equal(t, tc.expected, res)
200+
})
201+
}
202+
}

pkg/ccfb/history.go

+45-31
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package ccfb
22

33
import (
4+
"container/list"
45
"errors"
5-
"log"
66
"time"
77

88
"github.com/pion/interceptor/internal/sequencenumber"
@@ -30,63 +30,77 @@ type sentPacket struct {
3030
}
3131

3232
type history struct {
33-
inflight []sentPacket
34-
sentSeqNr *sequencenumber.Unwrapper
35-
ackedSeqNr *sequencenumber.Unwrapper
33+
size int
34+
evictList *list.List
35+
seqNrToPacket map[int64]*list.Element
36+
sentSeqNr *sequencenumber.Unwrapper
37+
ackedSeqNr *sequencenumber.Unwrapper
3638
}
3739

38-
func newHistory() *history {
40+
func newHistory(size int) *history {
3941
return &history{
40-
inflight: []sentPacket{},
41-
sentSeqNr: &sequencenumber.Unwrapper{},
42-
ackedSeqNr: &sequencenumber.Unwrapper{},
42+
size: size,
43+
evictList: list.New(),
44+
seqNrToPacket: make(map[int64]*list.Element),
45+
sentSeqNr: &sequencenumber.Unwrapper{},
46+
ackedSeqNr: &sequencenumber.Unwrapper{},
4347
}
4448
}
4549

4650
func (h *history) add(seqNr uint16, size uint16, departure time.Time) error {
4751
sn := h.sentSeqNr.Unwrap(seqNr)
48-
if len(h.inflight) > 0 && sn < h.inflight[len(h.inflight)-1].seqNr {
49-
return errors.New("sequence number went backwards")
52+
last := h.evictList.Back()
53+
if last != nil {
54+
if p, ok := last.Value.(sentPacket); ok && sn < p.seqNr {
55+
return errors.New("sequence number went backwards")
56+
}
5057
}
51-
h.inflight = append(h.inflight, sentPacket{
58+
ent := h.evictList.PushBack(sentPacket{
5259
seqNr: sn,
5360
size: size,
5461
departure: departure,
5562
})
63+
h.seqNrToPacket[sn] = ent
64+
65+
if h.evictList.Len() > h.size {
66+
h.removeOldest()
67+
}
68+
5669
return nil
5770
}
5871

5972
func (h *history) getReportForAck(al acknowledgementList) PacketReportList {
60-
reports := []PacketReport{}
61-
log.Printf("highest sent: %v", h.inflight[len(h.inflight)-1].seqNr)
73+
var reports []PacketReport
6274
for _, pr := range al.acks {
6375
sn := h.ackedSeqNr.Unwrap(pr.seqNr)
64-
i := h.index(sn)
65-
if i > -1 {
66-
reports = append(reports, PacketReport{
67-
SeqNr: sn,
68-
Size: h.inflight[i].size,
69-
Departure: h.inflight[i].departure,
70-
Arrived: pr.arrived,
71-
Arrival: pr.arrival,
72-
ECN: pr.ecn,
73-
})
74-
} else {
75-
panic("got feedback for unknown packet")
76+
ent, ok := h.seqNrToPacket[sn]
77+
// Ignore report for unknown packets (migth have been dropped from
78+
// history)
79+
if ok {
80+
if ack, ok := ent.Value.(sentPacket); ok {
81+
reports = append(reports, PacketReport{
82+
SeqNr: sn,
83+
Size: ack.size,
84+
Departure: ack.departure,
85+
Arrived: pr.arrived,
86+
Arrival: pr.arrival,
87+
ECN: pr.ecn,
88+
})
89+
}
7690
}
77-
log.Printf("processed ack for seq nr %v, arrived: %v", sn, pr.arrived)
7891
}
92+
7993
return PacketReportList{
8094
Timestamp: al.ts,
8195
Reports: reports,
8296
}
8397
}
8498

85-
func (h *history) index(seqNr int64) int {
86-
for i := range h.inflight {
87-
if h.inflight[i].seqNr == seqNr {
88-
return i
99+
func (h *history) removeOldest() {
100+
if ent := h.evictList.Front(); ent != nil {
101+
v := h.evictList.Remove(ent)
102+
if sp, ok := v.(sentPacket); ok {
103+
delete(h.seqNrToPacket, sp.seqNr)
89104
}
90105
}
91-
return -1
92106
}

0 commit comments

Comments
 (0)