diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 98123c8e88..e997f959c5 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -93,7 +93,8 @@ PacingController::PacingController(Clock* clock, paused_(false), media_debt_(DataSize::Zero()), padding_debt_(DataSize::Zero()), - media_rate_(DataRate::Zero()), + pacing_rate_(DataRate::Zero()), + adjusted_media_rate_(DataRate::Zero()), padding_rate_(DataRate::Zero()), prober_(field_trials_), probing_send_failure_(false), @@ -198,21 +199,22 @@ void PacingController::SetPacingRates(DataRate pacing_rate, << " kbps, padding = " << padding_rate.kbps() << " kbps."; } - media_rate_ = pacing_rate; + pacing_rate_ = pacing_rate; padding_rate_ = padding_rate; + MaybeUpdateMediaRateDueToLongQueue(CurrentTime()); - RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" << media_rate_.kbps() + RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" << pacing_rate_.kbps() << " padding_budget_kbps=" << padding_rate.kbps(); } void PacingController::EnqueuePacket(std::unique_ptr packet) { - RTC_DCHECK(media_rate_ > DataRate::Zero()) + RTC_DCHECK(pacing_rate_ > DataRate::Zero()) << "SetPacingRate must be called before InsertPacket."; RTC_CHECK(packet->packet_type()); prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size())); - Timestamp now = CurrentTime(); + const Timestamp now = CurrentTime(); if (packet_queue_->Empty()) { // If queue is empty, we need to "fast-forward" the last process time, // so that we don't use passed time as budget for sending the first new @@ -228,6 +230,9 @@ void PacingController::EnqueuePacket(std::unique_ptr packet) { } packet_queue_->Push(now, std::move(packet)); seen_first_packet_ = true; + + // Queue length has increased, check if we need to change the pacing rate. + MaybeUpdateMediaRateDueToLongQueue(now); } void PacingController::SetAccountForAudioPackets(bool account_for_audio) { @@ -249,10 +254,8 @@ void PacingController::SetSendBurstInterval(TimeDelta burst_interval) { } TimeDelta PacingController::ExpectedQueueTime() const { - RTC_DCHECK_GT(media_rate_, DataRate::Zero()); - return TimeDelta::Millis( - (QueueSizeData().bytes() * 8 * rtc::kNumMillisecsPerSec) / - media_rate_.bps()); + RTC_DCHECK_GT(adjusted_media_rate_, DataRate::Zero()); + return QueueSizeData() / adjusted_media_rate_; } size_t PacingController::QueueSizePackets() const { @@ -343,11 +346,11 @@ Timestamp PacingController::NextSendTime() const { return last_send_time_ + kCongestedPacketInterval; } - if (media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) { + if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) { // If packets are allowed to be sent in a burst, the // debt is allowed to grow up to one packet more than what can be sent // during 'send_burst_period_'. - TimeDelta drain_time = media_debt_ / media_rate_; + TimeDelta drain_time = media_debt_ / adjusted_media_rate_; next_send_time = last_process_time_ + ((send_burst_interval_ > drain_time) ? TimeDelta::Zero() : drain_time); @@ -355,9 +358,9 @@ Timestamp PacingController::NextSendTime() const { // If we _don't_ have pending packets, check how long until we have // bandwidth for padding packets. Both media and padding debts must // have been drained to do this. - RTC_DCHECK_GT(media_rate_, DataRate::Zero()); - TimeDelta drain_time = - std::max(media_debt_ / media_rate_, padding_debt_ / padding_rate_); + RTC_DCHECK_GT(adjusted_media_rate_, DataRate::Zero()); + TimeDelta drain_time = std::max(media_debt_ / adjusted_media_rate_, + padding_debt_ / padding_rate_); if (drain_time.IsZero() && (!media_debt_.IsZero() || !padding_debt_.IsZero())) { @@ -380,7 +383,7 @@ Timestamp PacingController::NextSendTime() const { } void PacingController::ProcessPackets() { - Timestamp now = CurrentTime(); + const Timestamp now = CurrentTime(); Timestamp target_send_time = now; if (ShouldSendKeepalive(now)) { @@ -420,27 +423,6 @@ void PacingController::ProcessPackets() { TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time); if (elapsed_time > TimeDelta::Zero()) { - DataRate target_rate = media_rate_; - DataSize queue_size_data = QueueSizeData(); - if (queue_size_data > DataSize::Zero()) { - // Assuming equal size packets and input/output rate, the average packet - // has avg_time_left_ms left to get queue_size_bytes out of the queue, if - // time constraint shall be met. Determine bitrate needed for that. - packet_queue_->UpdateAverageQueueTime(now); - if (drain_large_queues_) { - TimeDelta avg_time_left = - std::max(TimeDelta::Millis(1), - queue_time_limit_ - packet_queue_->AverageQueueTime()); - DataRate min_rate_needed = queue_size_data / avg_time_left; - if (min_rate_needed > target_rate) { - target_rate = min_rate_needed; - RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps=" - << target_rate.kbps(); - } - } - } - - media_rate_ = target_rate; UpdateBudgetWithElapsedTime(elapsed_time); } @@ -556,6 +538,11 @@ void PacingController::ProcessPackets() { prober_.ProbeSent(CurrentTime(), data_sent); } } + + // Queue length has probably decreased, check if pacing rate needs to updated. + // Poll the time again, since we might have enqueued new fec/padding packets + // with a later timestamp than `now`. + MaybeUpdateMediaRateDueToLongQueue(CurrentTime()); } DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size, @@ -630,7 +617,7 @@ std::unique_ptr PacingController::GetPendingPacket( // is not more than would be reduced to zero at the target sent time. // If we allow packets to be sent in a burst, packet are allowed to be // sent early. - TimeDelta flush_time = media_debt_ / media_rate_; + TimeDelta flush_time = media_debt_ / adjusted_media_rate_; if (now + flush_time > target_send_time) { return nullptr; } @@ -656,13 +643,13 @@ void PacingController::OnPacketSent(RtpPacketMediaType packet_type, } void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { - media_debt_ -= std::min(media_debt_, media_rate_ * delta); + media_debt_ -= std::min(media_debt_, adjusted_media_rate_ * delta); padding_debt_ -= std::min(padding_debt_, padding_rate_ * delta); } void PacingController::UpdateBudgetWithSentData(DataSize size) { media_debt_ += size; - media_debt_ = std::min(media_debt_, media_rate_ * kMaxDebtInTime); + media_debt_ = std::min(media_debt_, adjusted_media_rate_ * kMaxDebtInTime); UpdatePaddingBudgetWithSentData(size); } @@ -675,4 +662,28 @@ void PacingController::SetQueueTimeLimit(TimeDelta limit) { queue_time_limit_ = limit; } +void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) { + adjusted_media_rate_ = pacing_rate_; + if (!drain_large_queues_) { + return; + } + + DataSize queue_size_data = QueueSizeData(); + if (queue_size_data > DataSize::Zero()) { + // Assuming equal size packets and input/output rate, the average packet + // has avg_time_left_ms left to get queue_size_bytes out of the queue, if + // time constraint shall be met. Determine bitrate needed for that. + packet_queue_->UpdateAverageQueueTime(now); + TimeDelta avg_time_left = + std::max(TimeDelta::Millis(1), + queue_time_limit_ - packet_queue_->AverageQueueTime()); + DataRate min_rate_needed = queue_size_data / avg_time_left; + if (min_rate_needed > pacing_rate_) { + adjusted_media_rate_ = min_rate_needed; + RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps=" + << pacing_rate_.kbps(); + } + } +} + } // namespace webrtc diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index b3949b6ae1..b8cbc8861e 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -145,7 +145,7 @@ class PacingController { // Sets the pacing rates. Must be called once before packets can be sent. void SetPacingRates(DataRate pacing_rate, DataRate padding_rate); - DataRate pacing_rate() const { return media_rate_; } + DataRate pacing_rate() const { return adjusted_media_rate_; } // Currently audio traffic is not accounted by pacer and passed through. // With the introduction of audio BWE audio traffic will be accounted for @@ -217,6 +217,7 @@ class PacingController { void OnPacketSent(RtpPacketMediaType packet_type, DataSize packet_size, Timestamp send_time); + void MaybeUpdateMediaRateDueToLongQueue(Timestamp now); Timestamp CurrentTime() const; @@ -241,9 +242,17 @@ class PacingController { mutable Timestamp last_timestamp_; bool paused_; + // Amount of outstanding data for media and padding. DataSize media_debt_; DataSize padding_debt_; - DataRate media_rate_; + + // The target pacing rate, signaled via SetPacingRates(). + DataRate pacing_rate_; + // The media send rate, which might adjusted from pacing_rate_, e.g. if the + // pacing queue is growing too long. + DataRate adjusted_media_rate_; + // The padding target rate. We aim to fill up to this rate with padding what + // is not already used by media. DataRate padding_rate_; BitrateProber prober_; diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index b9ec80e874..79ab5ee5f4 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -1096,45 +1096,6 @@ TEST_F(PacingControllerTest, InactiveFromStart) { 2 * PacingController::kPausedProcessInterval); } -TEST_F(PacingControllerTest, ExpectedQueueTimeMs) { - uint32_t ssrc = 12346; - uint16_t sequence_number = 1234; - const size_t kNumPackets = 60; - const size_t kPacketSize = 1200; - const int32_t kMaxBitrate = kPaceMultiplier * 30000; - auto pacer = std::make_unique(&clock_, &callback_, trials_); - pacer->SetPacingRates(kTargetRate * kPaceMultiplier, DataRate::Zero()); - EXPECT_TRUE(pacer->OldestPacketEnqueueTime().IsInfinite()); - - pacer->SetPacingRates(DataRate::BitsPerSec(30000 * kPaceMultiplier), - DataRate::Zero()); - for (size_t i = 0; i < kNumPackets; ++i) { - SendAndExpectPacket(pacer.get(), RtpPacketMediaType::kVideo, ssrc, - sequence_number++, clock_.TimeInMilliseconds(), - kPacketSize); - } - - // Queue in ms = 1000 * (bytes in queue) *8 / (bits per second) - TimeDelta queue_time = - TimeDelta::Millis(1000 * kNumPackets * kPacketSize * 8 / kMaxBitrate); - EXPECT_EQ(queue_time, pacer->ExpectedQueueTime()); - - const Timestamp time_start = clock_.CurrentTime(); - while (pacer->QueueSizePackets() > 0) { - AdvanceTimeUntil(pacer->NextSendTime()); - pacer->ProcessPackets(); - } - TimeDelta duration = clock_.CurrentTime() - time_start; - - EXPECT_EQ(TimeDelta::Zero(), pacer->ExpectedQueueTime()); - - // Allow for aliasing, duration should be within one pack of max time limit. - const TimeDelta deviation = - duration - PacingController::kMaxExpectedQueueLength; - EXPECT_LT(deviation.Abs(), - TimeDelta::Millis(1000 * kPacketSize * 8 / kMaxBitrate)); -} - TEST_F(PacingControllerTest, QueueTimeGrowsOverTime) { uint32_t ssrc = 12346; uint16_t sequence_number = 1234; @@ -1756,7 +1717,7 @@ TEST_F(PacingControllerTest, } } -TEST_F(PacingControllerTest, AccountsForAudioEnqueuTime) { +TEST_F(PacingControllerTest, AccountsForAudioEnqueueTime) { const uint32_t kSsrc = 12345; const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); const DataRate kPaddingDataRate = DataRate::Zero(); @@ -2063,5 +2024,45 @@ TEST_F(PacingControllerTest, RespectsTargetRateWhenSendingPacketsInBursts) { EXPECT_EQ(number_of_bursts, 4); } +TEST_F(PacingControllerTest, RespectsQueueTimeLimit) { + static constexpr DataSize kPacketSize = DataSize::Bytes(100); + static constexpr DataRate kNominalPacingRate = DataRate::KilobitsPerSec(200); + static constexpr TimeDelta kPacketPacingTime = + kPacketSize / kNominalPacingRate; + static constexpr TimeDelta kQueueTimeLimit = TimeDelta::Millis(1000); + + PacingController pacer(&clock_, &callback_, trials_); + pacer.SetPacingRates(kNominalPacingRate, /*padding_rate=*/DataRate::Zero()); + pacer.SetQueueTimeLimit(kQueueTimeLimit); + + // Fill pacer up to queue time limit. + static constexpr int kNumPackets = kQueueTimeLimit / kPacketPacingTime; + for (int i = 0; i < kNumPackets; ++i) { + pacer.EnqueuePacket(video_.BuildNextPacket(kPacketSize.bytes())); + } + EXPECT_EQ(pacer.ExpectedQueueTime(), kQueueTimeLimit); + EXPECT_EQ(pacer.pacing_rate(), kNominalPacingRate); + + // Double the amount of packets in the queue, the queue time limit should + // effectively double the pacing rate in response. + for (int i = 0; i < kNumPackets; ++i) { + pacer.EnqueuePacket(video_.BuildNextPacket(kPacketSize.bytes())); + } + EXPECT_EQ(pacer.ExpectedQueueTime(), kQueueTimeLimit); + EXPECT_EQ(pacer.pacing_rate(), 2 * kNominalPacingRate); + + // Send all the packets, should take as long as the queue time limit. + Timestamp start_time = clock_.CurrentTime(); + while (pacer.QueueSizePackets() > 0) { + AdvanceTimeUntil(pacer.NextSendTime()); + pacer.ProcessPackets(); + } + EXPECT_EQ(clock_.CurrentTime() - start_time, kQueueTimeLimit); + + // We're back in a normal state - pacing rate should be back to previous + // levels. + EXPECT_EQ(pacer.pacing_rate(), kNominalPacingRate); +} + } // namespace } // namespace webrtc