Line data Source code
1 : /*
2 : * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 : *
4 : * Use of this source code is governed by a BSD-style license
5 : * that can be found in the LICENSE file in the root of the source
6 : * tree. An additional intellectual property rights grant can be found
7 : * in the file PATENTS. All contributing project authors may
8 : * be found in the AUTHORS file in the root of the source tree.
9 : */
10 :
11 : #include "webrtc/modules/pacing/paced_sender.h"
12 :
13 : #include <algorithm>
14 : #include <map>
15 : #include <queue>
16 : #include <set>
17 : #include <vector>
18 :
19 : #include "webrtc/base/checks.h"
20 : #include "webrtc/base/logging.h"
21 : #include "webrtc/modules/include/module_common_types.h"
22 : #include "webrtc/modules/pacing/alr_detector.h"
23 : #include "webrtc/modules/pacing/bitrate_prober.h"
24 : #include "webrtc/system_wrappers/include/clock.h"
25 : #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
26 : #include "webrtc/system_wrappers/include/field_trial.h"
27 :
28 : namespace {
29 : // Time limit in milliseconds between packet bursts.
30 : const int64_t kMinPacketLimitMs = 5;
31 :
32 : // Upper cap on process interval, in case process has not been called in a long
33 : // time.
34 : const int64_t kMaxIntervalTimeMs = 30;
35 :
36 : } // namespace
37 :
38 : // TODO(sprang): Move at least PacketQueue and MediaBudget out to separate
39 : // files, so that we can more easily test them.
40 :
41 : namespace webrtc {
42 : namespace paced_sender {
43 : struct Packet {
44 0 : Packet(RtpPacketSender::Priority priority,
45 : uint32_t ssrc,
46 : uint16_t seq_number,
47 : int64_t capture_time_ms,
48 : int64_t enqueue_time_ms,
49 : size_t length_in_bytes,
50 : bool retransmission,
51 : uint64_t enqueue_order)
52 0 : : priority(priority),
53 : ssrc(ssrc),
54 : sequence_number(seq_number),
55 : capture_time_ms(capture_time_ms),
56 : enqueue_time_ms(enqueue_time_ms),
57 : bytes(length_in_bytes),
58 : retransmission(retransmission),
59 0 : enqueue_order(enqueue_order) {}
60 :
61 : RtpPacketSender::Priority priority;
62 : uint32_t ssrc;
63 : uint16_t sequence_number;
64 : int64_t capture_time_ms;
65 : int64_t enqueue_time_ms;
66 : size_t bytes;
67 : bool retransmission;
68 : uint64_t enqueue_order;
69 : std::list<Packet>::iterator this_it;
70 : };
71 :
72 : // Used by priority queue to sort packets.
73 : struct Comparator {
74 0 : bool operator()(const Packet* first, const Packet* second) {
75 : // Highest prio = 0.
76 0 : if (first->priority != second->priority)
77 0 : return first->priority > second->priority;
78 :
79 : // Retransmissions go first.
80 0 : if (second->retransmission != first->retransmission)
81 0 : return second->retransmission;
82 :
83 : // Older frames have higher prio.
84 0 : if (first->capture_time_ms != second->capture_time_ms)
85 0 : return first->capture_time_ms > second->capture_time_ms;
86 :
87 0 : return first->enqueue_order > second->enqueue_order;
88 : }
89 : };
90 :
91 : // Class encapsulating a priority queue with some extensions.
92 : class PacketQueue {
93 : public:
94 0 : explicit PacketQueue(Clock* clock)
95 0 : : bytes_(0),
96 : clock_(clock),
97 : queue_time_sum_(0),
98 0 : time_last_updated_(clock_->TimeInMilliseconds()) {}
99 0 : virtual ~PacketQueue() {}
100 :
101 0 : void Push(const Packet& packet) {
102 0 : if (!AddToDupeSet(packet))
103 0 : return;
104 :
105 0 : UpdateQueueTime(packet.enqueue_time_ms);
106 :
107 : // Store packet in list, use pointers in priority queue for cheaper moves.
108 : // Packets have a handle to its own iterator in the list, for easy removal
109 : // when popping from queue.
110 0 : packet_list_.push_front(packet);
111 0 : std::list<Packet>::iterator it = packet_list_.begin();
112 0 : it->this_it = it; // Handle for direct removal from list.
113 0 : prio_queue_.push(&(*it)); // Pointer into list.
114 0 : bytes_ += packet.bytes;
115 : }
116 :
117 0 : const Packet& BeginPop() {
118 0 : const Packet& packet = *prio_queue_.top();
119 0 : prio_queue_.pop();
120 0 : return packet;
121 : }
122 :
123 0 : void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); }
124 :
125 0 : void FinalizePop(const Packet& packet) {
126 0 : RemoveFromDupeSet(packet);
127 0 : bytes_ -= packet.bytes;
128 0 : queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms);
129 0 : packet_list_.erase(packet.this_it);
130 0 : RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
131 0 : if (packet_list_.empty())
132 0 : RTC_DCHECK_EQ(0, queue_time_sum_);
133 0 : }
134 :
135 0 : bool Empty() const { return prio_queue_.empty(); }
136 :
137 0 : size_t SizeInPackets() const { return prio_queue_.size(); }
138 :
139 0 : uint64_t SizeInBytes() const { return bytes_; }
140 :
141 0 : int64_t OldestEnqueueTimeMs() const {
142 0 : auto it = packet_list_.rbegin();
143 0 : if (it == packet_list_.rend())
144 0 : return 0;
145 0 : return it->enqueue_time_ms;
146 : }
147 :
148 0 : void UpdateQueueTime(int64_t timestamp_ms) {
149 0 : RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
150 0 : int64_t delta = timestamp_ms - time_last_updated_;
151 : // Use packet packet_list_.size() not prio_queue_.size() here, as there
152 : // might be an outstanding element popped from prio_queue_ currently in the
153 : // SendPacket() call, while packet_list_ will always be correct.
154 0 : queue_time_sum_ += delta * packet_list_.size();
155 0 : time_last_updated_ = timestamp_ms;
156 0 : }
157 :
158 0 : int64_t AverageQueueTimeMs() const {
159 0 : if (prio_queue_.empty())
160 0 : return 0;
161 0 : return queue_time_sum_ / packet_list_.size();
162 : }
163 :
164 : private:
165 : // Try to add a packet to the set of ssrc/seqno identifiers currently in the
166 : // queue. Return true if inserted, false if this is a duplicate.
167 0 : bool AddToDupeSet(const Packet& packet) {
168 0 : SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
169 0 : if (it == dupe_map_.end()) {
170 : // First for this ssrc, just insert.
171 0 : dupe_map_[packet.ssrc].insert(packet.sequence_number);
172 0 : return true;
173 : }
174 :
175 : // Insert returns a pair, where second is a bool set to true if new element.
176 0 : return it->second.insert(packet.sequence_number).second;
177 : }
178 :
179 0 : void RemoveFromDupeSet(const Packet& packet) {
180 0 : SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
181 0 : RTC_DCHECK(it != dupe_map_.end());
182 0 : it->second.erase(packet.sequence_number);
183 0 : if (it->second.empty()) {
184 0 : dupe_map_.erase(it);
185 : }
186 0 : }
187 :
188 : // List of packets, in the order the were enqueued. Since dequeueing may
189 : // occur out of order, use list instead of vector.
190 : std::list<Packet> packet_list_;
191 : // Priority queue of the packets, sorted according to Comparator.
192 : // Use pointers into list, to avoid moving whole struct within heap.
193 : std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
194 : // Total number of bytes in the queue.
195 : uint64_t bytes_;
196 : // Map<ssrc, set<seq_no> >, for checking duplicates.
197 : typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
198 : SsrcSeqNoMap dupe_map_;
199 : Clock* const clock_;
200 : int64_t queue_time_sum_;
201 : int64_t time_last_updated_;
202 : };
203 :
204 : class IntervalBudget {
205 : public:
206 0 : explicit IntervalBudget(int initial_target_rate_kbps)
207 0 : : target_rate_kbps_(initial_target_rate_kbps),
208 0 : bytes_remaining_(0) {}
209 :
210 0 : void set_target_rate_kbps(int target_rate_kbps) {
211 0 : target_rate_kbps_ = target_rate_kbps;
212 0 : bytes_remaining_ =
213 0 : std::max(-kWindowMs * target_rate_kbps_ / 8, bytes_remaining_);
214 0 : }
215 :
216 0 : void IncreaseBudget(int64_t delta_time_ms) {
217 0 : int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;
218 0 : if (bytes_remaining_ < 0) {
219 : // We overused last interval, compensate this interval.
220 0 : bytes_remaining_ = bytes_remaining_ + bytes;
221 : } else {
222 : // If we underused last interval we can't use it this interval.
223 0 : bytes_remaining_ = bytes;
224 : }
225 0 : }
226 :
227 0 : void UseBudget(size_t bytes) {
228 0 : bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes),
229 0 : -kWindowMs * target_rate_kbps_ / 8);
230 0 : }
231 :
232 0 : size_t bytes_remaining() const {
233 0 : return static_cast<size_t>(std::max(0, bytes_remaining_));
234 : }
235 :
236 : int target_rate_kbps() const { return target_rate_kbps_; }
237 :
238 : private:
239 : static const int kWindowMs = 500;
240 :
241 : int target_rate_kbps_;
242 : int bytes_remaining_;
243 : };
244 : } // namespace paced_sender
245 :
246 : const int64_t PacedSender::kMaxQueueLengthMs = 2000;
247 : const float PacedSender::kDefaultPaceMultiplier = 2.5f;
248 :
249 0 : PacedSender::PacedSender(Clock* clock, PacketSender* packet_sender)
250 : : clock_(clock),
251 : packet_sender_(packet_sender),
252 0 : alr_detector_(new AlrDetector()),
253 : critsect_(CriticalSectionWrapper::CreateCriticalSection()),
254 : paused_(false),
255 0 : media_budget_(new paced_sender::IntervalBudget(0)),
256 0 : padding_budget_(new paced_sender::IntervalBudget(0)),
257 0 : prober_(new BitrateProber()),
258 : estimated_bitrate_bps_(0),
259 : min_send_bitrate_kbps_(0u),
260 : max_padding_bitrate_kbps_(0u),
261 : pacing_bitrate_kbps_(0),
262 0 : time_last_update_us_(clock->TimeInMicroseconds()),
263 0 : packets_(new paced_sender::PacketQueue(clock)),
264 0 : packet_counter_(0) {
265 0 : UpdateBudgetWithElapsedTime(kMinPacketLimitMs);
266 0 : }
267 :
268 0 : PacedSender::~PacedSender() {}
269 :
270 0 : void PacedSender::CreateProbeCluster(int bitrate_bps) {
271 0 : CriticalSectionScoped cs(critsect_.get());
272 0 : prober_->CreateProbeCluster(bitrate_bps);
273 0 : }
274 :
275 0 : void PacedSender::Pause() {
276 0 : LOG(LS_INFO) << "PacedSender paused.";
277 0 : CriticalSectionScoped cs(critsect_.get());
278 0 : paused_ = true;
279 0 : }
280 :
281 0 : void PacedSender::Resume() {
282 0 : LOG(LS_INFO) << "PacedSender resumed.";
283 0 : CriticalSectionScoped cs(critsect_.get());
284 0 : paused_ = false;
285 0 : }
286 :
287 0 : void PacedSender::SetProbingEnabled(bool enabled) {
288 0 : RTC_CHECK_EQ(0, packet_counter_);
289 0 : CriticalSectionScoped cs(critsect_.get());
290 0 : prober_->SetEnabled(enabled);
291 0 : }
292 :
293 0 : void PacedSender::SetEstimatedBitrate(uint32_t bitrate_bps) {
294 0 : if (bitrate_bps == 0)
295 0 : LOG(LS_ERROR) << "PacedSender is not designed to handle 0 bitrate.";
296 0 : CriticalSectionScoped cs(critsect_.get());
297 0 : estimated_bitrate_bps_ = bitrate_bps;
298 0 : padding_budget_->set_target_rate_kbps(
299 0 : std::min(estimated_bitrate_bps_ / 1000, max_padding_bitrate_kbps_));
300 0 : pacing_bitrate_kbps_ =
301 0 : std::max(min_send_bitrate_kbps_, estimated_bitrate_bps_ / 1000) *
302 0 : kDefaultPaceMultiplier;
303 0 : alr_detector_->SetEstimatedBitrate(bitrate_bps);
304 0 : }
305 :
306 0 : void PacedSender::SetSendBitrateLimits(int min_send_bitrate_bps,
307 : int padding_bitrate) {
308 0 : CriticalSectionScoped cs(critsect_.get());
309 0 : min_send_bitrate_kbps_ = min_send_bitrate_bps / 1000;
310 0 : pacing_bitrate_kbps_ =
311 0 : std::max(min_send_bitrate_kbps_, estimated_bitrate_bps_ / 1000) *
312 0 : kDefaultPaceMultiplier;
313 0 : max_padding_bitrate_kbps_ = padding_bitrate / 1000;
314 0 : padding_budget_->set_target_rate_kbps(
315 0 : std::min(estimated_bitrate_bps_ / 1000, max_padding_bitrate_kbps_));
316 0 : }
317 :
318 0 : void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
319 : uint32_t ssrc,
320 : uint16_t sequence_number,
321 : int64_t capture_time_ms,
322 : size_t bytes,
323 : bool retransmission) {
324 0 : CriticalSectionScoped cs(critsect_.get());
325 0 : RTC_DCHECK(estimated_bitrate_bps_ > 0)
326 0 : << "SetEstimatedBitrate must be called before InsertPacket.";
327 :
328 0 : int64_t now_ms = clock_->TimeInMilliseconds();
329 0 : prober_->OnIncomingPacket(bytes);
330 :
331 0 : if (capture_time_ms < 0)
332 0 : capture_time_ms = now_ms;
333 :
334 0 : packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number,
335 : capture_time_ms, now_ms, bytes,
336 0 : retransmission, packet_counter_++));
337 0 : }
338 :
339 0 : int64_t PacedSender::ExpectedQueueTimeMs() const {
340 0 : CriticalSectionScoped cs(critsect_.get());
341 0 : RTC_DCHECK_GT(pacing_bitrate_kbps_, 0);
342 0 : return static_cast<int64_t>(packets_->SizeInBytes() * 8 /
343 0 : pacing_bitrate_kbps_);
344 : }
345 :
346 0 : rtc::Optional<int64_t> PacedSender::GetApplicationLimitedRegionStartTime()
347 : const {
348 0 : CriticalSectionScoped cs(critsect_.get());
349 0 : return alr_detector_->GetApplicationLimitedRegionStartTime();
350 : }
351 :
352 0 : size_t PacedSender::QueueSizePackets() const {
353 0 : CriticalSectionScoped cs(critsect_.get());
354 0 : return packets_->SizeInPackets();
355 : }
356 :
357 0 : int64_t PacedSender::QueueInMs() const {
358 0 : CriticalSectionScoped cs(critsect_.get());
359 :
360 0 : int64_t oldest_packet = packets_->OldestEnqueueTimeMs();
361 0 : if (oldest_packet == 0)
362 0 : return 0;
363 :
364 0 : return clock_->TimeInMilliseconds() - oldest_packet;
365 : }
366 :
367 0 : int64_t PacedSender::AverageQueueTimeMs() {
368 0 : CriticalSectionScoped cs(critsect_.get());
369 0 : packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
370 0 : return packets_->AverageQueueTimeMs();
371 : }
372 :
373 0 : int64_t PacedSender::TimeUntilNextProcess() {
374 0 : CriticalSectionScoped cs(critsect_.get());
375 0 : if (prober_->IsProbing()) {
376 0 : int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
377 0 : if (ret >= 0)
378 0 : return ret;
379 : }
380 0 : int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
381 0 : int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
382 0 : return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
383 : }
384 :
385 0 : void PacedSender::Process() {
386 0 : int64_t now_us = clock_->TimeInMicroseconds();
387 0 : CriticalSectionScoped cs(critsect_.get());
388 0 : int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
389 0 : time_last_update_us_ = now_us;
390 0 : int target_bitrate_kbps = pacing_bitrate_kbps_;
391 : // TODO(holmer): Remove the !paused_ check when issue 5307 has been fixed.
392 0 : if (!paused_ && elapsed_time_ms > 0) {
393 0 : size_t queue_size_bytes = packets_->SizeInBytes();
394 0 : if (queue_size_bytes > 0) {
395 : // Assuming equal size packets and input/output rate, the average packet
396 : // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
397 : // time constraint shall be met. Determine bitrate needed for that.
398 0 : packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
399 : int64_t avg_time_left_ms = std::max<int64_t>(
400 0 : 1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs());
401 : int min_bitrate_needed_kbps =
402 0 : static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
403 0 : if (min_bitrate_needed_kbps > target_bitrate_kbps)
404 0 : target_bitrate_kbps = min_bitrate_needed_kbps;
405 : }
406 :
407 0 : media_budget_->set_target_rate_kbps(target_bitrate_kbps);
408 :
409 0 : elapsed_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
410 0 : UpdateBudgetWithElapsedTime(elapsed_time_ms);
411 : }
412 :
413 0 : bool is_probing = prober_->IsProbing();
414 0 : int probe_cluster_id = PacketInfo::kNotAProbe;
415 0 : size_t bytes_sent = 0;
416 0 : size_t recommended_probe_size = 0;
417 0 : if (is_probing) {
418 0 : probe_cluster_id = prober_->CurrentClusterId();
419 0 : recommended_probe_size = prober_->RecommendedMinProbeSize();
420 : }
421 0 : while (!packets_->Empty()) {
422 : // Since we need to release the lock in order to send, we first pop the
423 : // element from the priority queue but keep it in storage, so that we can
424 : // reinsert it if send fails.
425 0 : const paced_sender::Packet& packet = packets_->BeginPop();
426 :
427 0 : if (SendPacket(packet, probe_cluster_id)) {
428 : // Send succeeded, remove it from the queue.
429 0 : bytes_sent += packet.bytes;
430 0 : packets_->FinalizePop(packet);
431 0 : if (is_probing && bytes_sent > recommended_probe_size)
432 0 : break;
433 : } else {
434 : // Send failed, put it back into the queue.
435 0 : packets_->CancelPop(packet);
436 0 : break;
437 : }
438 : }
439 :
440 : // TODO(holmer): Remove the paused_ check when issue 5307 has been fixed.
441 0 : if (packets_->Empty() && !paused_) {
442 : // We can not send padding unless a normal packet has first been sent. If we
443 : // do, timestamps get messed up.
444 0 : if (packet_counter_ > 0) {
445 : int padding_needed =
446 0 : static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent)
447 0 : : padding_budget_->bytes_remaining());
448 :
449 0 : if (padding_needed > 0)
450 0 : bytes_sent += SendPadding(padding_needed, probe_cluster_id);
451 : }
452 : }
453 0 : if (is_probing && bytes_sent > 0)
454 0 : prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent);
455 0 : alr_detector_->OnBytesSent(bytes_sent, now_us / 1000);
456 0 : }
457 :
458 0 : bool PacedSender::SendPacket(const paced_sender::Packet& packet,
459 : int probe_cluster_id) {
460 : // TODO(holmer): Because of this bug issue 5307 we have to send audio
461 : // packets even when the pacer is paused. Here we assume audio packets are
462 : // always high priority and that they are the only high priority packets.
463 0 : if (packet.priority != kHighPriority) {
464 0 : if (paused_)
465 0 : return false;
466 0 : if (media_budget_->bytes_remaining() == 0 &&
467 : probe_cluster_id == PacketInfo::kNotAProbe) {
468 0 : return false;
469 : }
470 : }
471 0 : critsect_->Leave();
472 0 : const bool success = packet_sender_->TimeToSendPacket(
473 0 : packet.ssrc, packet.sequence_number, packet.capture_time_ms,
474 0 : packet.retransmission, probe_cluster_id);
475 0 : critsect_->Enter();
476 :
477 0 : if (success) {
478 : // TODO(holmer): High priority packets should only be accounted for if we
479 : // are allocating bandwidth for audio.
480 0 : if (packet.priority != kHighPriority) {
481 : // Update media bytes sent.
482 0 : UpdateBudgetWithBytesSent(packet.bytes);
483 : }
484 : }
485 :
486 0 : return success;
487 : }
488 :
489 0 : size_t PacedSender::SendPadding(size_t padding_needed, int probe_cluster_id) {
490 0 : critsect_->Leave();
491 : size_t bytes_sent =
492 0 : packet_sender_->TimeToSendPadding(padding_needed, probe_cluster_id);
493 0 : critsect_->Enter();
494 :
495 0 : if (bytes_sent > 0) {
496 0 : UpdateBudgetWithBytesSent(bytes_sent);
497 : }
498 0 : return bytes_sent;
499 : }
500 :
501 0 : void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) {
502 0 : media_budget_->IncreaseBudget(delta_time_ms);
503 0 : padding_budget_->IncreaseBudget(delta_time_ms);
504 0 : }
505 :
506 0 : void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) {
507 0 : media_budget_->UseBudget(bytes_sent);
508 0 : padding_budget_->UseBudget(bytes_sent);
509 0 : }
510 : } // namespace webrtc
|