Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions src/frontend/linkshell.cc
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
/* -*-mode:c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

#include <getopt.h>
#include <map>

#include "infinite_packet_queue.hh"
#include "drop_tail_packet_queue.hh"
#include "drop_head_packet_queue.hh"
#include "codel_packet_queue.hh"
#include "pie_packet_queue.hh"
#include "red_packet_queue.hh"
#include "link_queue.hh"
#include "packetshell.cc"
#include "tokenize.hh"
#include "parsed_arguments.hh"

using namespace std;

Expand All @@ -24,15 +28,15 @@ void usage_error( const string & program_name )
cerr << " --uplink-queue=QUEUE_TYPE --downlink-queue=QUEUE_TYPE" << endl;
cerr << " --uplink-queue-args=QUEUE_ARGS --downlink-queue-args=QUEUE_ARGS" << endl;
cerr << endl;
cerr << " QUEUE_TYPE = infinite | droptail | drophead | codel | pie" << endl;
cerr << " QUEUE_TYPE = infinite | droptail | drophead | codel | pie | red" << endl;
cerr << " QUEUE_ARGS = \"NAME=NUMBER[, NAME2=NUMBER2, ...]\"" << endl;
cerr << " (with NAME = bytes | packets | target | interval | qdelay_ref | max_burst)" << endl;
cerr << " target, interval, qdelay_ref, max_burst are in milli-second" << endl << endl;

throw runtime_error( "invalid arguments" );
}

unique_ptr<AbstractPacketQueue> get_packet_queue( const string & type, const string & args, const string & program_name )
unique_ptr<AbstractPacketQueue> get_packet_queue( const string & type, ParsedArguments args, const string & program_name )
{
if ( type == "infinite" ) {
return unique_ptr<AbstractPacketQueue>( new InfinitePacketQueue( args ) );
Expand All @@ -44,6 +48,8 @@ unique_ptr<AbstractPacketQueue> get_packet_queue( const string & type, const str
return unique_ptr<AbstractPacketQueue>( new CODELPacketQueue( args ) );
} else if ( type == "pie" ) {
return unique_ptr<AbstractPacketQueue>( new PIEPacketQueue( args ) );
} else if ( type == "red" ) {
return unique_ptr<AbstractPacketQueue>( new REDPacketQueue( args ) );
} else {
cerr << "Unknown queue type: " << type << endl;
}
Expand All @@ -68,6 +74,26 @@ string shell_quote( const string & arg )
return ret;
}

ParsedArguments parse_queue_args( const string & arg) {
map<string, string> argMap = map<string, string>();
if (arg.size() == 0) {
return ParsedArguments( argMap );
}
vector<string> argList = split(arg, ",");

for (size_t i = 0;i < argList.size();i++) {
string s = argList[i];
vector<string> argParts = split(s, "=");
if (argParts.size() != 2) {
throw runtime_error("Queue args passed in wrong format");
}

argMap.insert(pair<string, string>(argParts[0], argParts[1]));
}

return ParsedArguments( argMap );
}

int main( int argc, char *argv[] )
{
try {
Expand Down Expand Up @@ -183,11 +209,11 @@ int main( int argc, char *argv[] )

link_shell_app.start_uplink( "[link] ", command,
"Uplink", uplink_filename, uplink_logfile, repeat, meter_uplink, meter_uplink_delay,
get_packet_queue( uplink_queue_type, uplink_queue_args, argv[ 0 ] ),
get_packet_queue( uplink_queue_type, parse_queue_args(uplink_queue_args), argv[ 0 ] ),
command_line );

link_shell_app.start_downlink( "Downlink", downlink_filename, downlink_logfile, repeat, meter_downlink, meter_downlink_delay,
get_packet_queue( downlink_queue_type, downlink_queue_args, argv[ 0 ] ),
get_packet_queue( downlink_queue_type, parse_queue_args(downlink_queue_args), argv[ 0 ] ),
command_line );

return link_shell_app.wait_for_exit();
Expand Down
1 change: 1 addition & 0 deletions src/packet/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ libpacket_a_SOURCES = packetshell.hh packetshell.cc queued_packet.hh \
abstract_packet_queue.hh dropping_packet_queue.hh dropping_packet_queue.cc infinite_packet_queue.hh \
drop_tail_packet_queue.hh drop_head_packet_queue.hh \
codel_packet_queue.cc codel_packet_queue.hh \
red_packet_queue.cc red_packet_queue.hh \
pie_packet_queue.cc pie_packet_queue.hh \
bindworkaround.hh
1 change: 1 addition & 0 deletions src/packet/abstract_packet_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <string>

#include "queued_packet.hh"
#include "parsed_arguments.hh"

class AbstractPacketQueue
{
Expand Down
9 changes: 3 additions & 6 deletions src/packet/codel_packet_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@

using namespace std;

CODELPacketQueue::CODELPacketQueue( const string & args )
CODELPacketQueue::CODELPacketQueue( ParsedArguments & args )
: DroppingPacketQueue(args),
target_ ( get_arg( args, "target") ),
interval_ ( get_arg( args, "interval") ),
target_ ( args.get_int_arg("target") ),
interval_ ( args.get_int_arg( "interval") ),
first_above_time_ ( 0 ),
drop_next_( 0 ),
count_ ( 0 ),
lastcount_ ( 0 ),
dropping_ ( 0 )
{
if ( target_ == 0 || interval_ == 0 ) {
throw runtime_error( "CoDel queue must have target and interval arguments." );
}
}

//NOTE: CoDel makes drop decisions at dequeueing.
Expand Down
2 changes: 1 addition & 1 deletion src/packet/codel_packet_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private:
uint64_t control_law ( uint64_t t, uint32_t count );

public:
CODELPacketQueue( const std::string & args );
CODELPacketQueue( ParsedArguments & args );

void enqueue( QueuedPacket && p ) override;

Expand Down
38 changes: 3 additions & 35 deletions src/packet/dropping_packet_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

using namespace std;

DroppingPacketQueue::DroppingPacketQueue( const string & args )
: packet_limit_( get_arg( args, "packets" ) ),
byte_limit_( get_arg( args, "bytes" ) )
DroppingPacketQueue::DroppingPacketQueue( ParsedArguments & args )
: packet_limit_( args.get_int_arg( "packets", 0 ) ),
byte_limit_( args.get_int_arg( "bytes", 0 ) )
{
if ( packet_limit_ == 0 and byte_limit_ == 0 ) {
throw runtime_error( "Dropping queue must have a byte or packet limit." );
Expand Down Expand Up @@ -98,35 +98,3 @@ string DroppingPacketQueue::to_string( void ) const

return ret;
}

unsigned int DroppingPacketQueue::get_arg( const string & args, const string & name )
{
auto offset = args.find( name );
if ( offset == string::npos ) {
return 0; /* default value */
} else {
/* extract the value */

/* advance by length of name */
offset += name.size();

/* make sure next char is "=" */
if ( args.substr( offset, 1 ) != "=" ) {
throw runtime_error( "could not parse queue arguments: " + args );
}

/* advance by length of "=" */
offset++;

/* find the first non-digit character */
auto offset2 = args.substr( offset ).find_first_not_of( "0123456789" );

auto digit_string = args.substr( offset ).substr( 0, offset2 );

if ( digit_string.empty() ) {
throw runtime_error( "could not parse queue arguments: " + args );
}

return myatoi( digit_string );
}
}
9 changes: 7 additions & 2 deletions src/packet/dropping_packet_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <queue>
#include <cassert>
#include <map>

#include "abstract_packet_queue.hh"
#include "exception.hh"
Expand All @@ -31,7 +32,7 @@ protected:
const unsigned int size_in_packets ) const;

public:
DroppingPacketQueue( const std::string & args );
DroppingPacketQueue( ParsedArguments & args );

virtual void enqueue( QueuedPacket && p ) = 0;

Expand All @@ -41,10 +42,14 @@ public:

std::string to_string( void ) const override;

static unsigned int get_arg( const std::string & args, const std::string & name );
static std::string parse_number_arg(const std::string & args, const std::string & name, bool isfloat);

unsigned int size_bytes( void ) const override;
unsigned int size_packets( void ) const override;

static unsigned int get_int_arg( const std::map<std::string, std::string> & args, const std::string & name );
static unsigned int get_arg( const std::string & args, const std::string & name );
static double get_float_arg( const std::map<std::string, std::string> & args, const std::string & name );
};

#endif /* DROPPING_PACKET_QUEUE_HH */
3 changes: 2 additions & 1 deletion src/packet/infinite_packet_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "queued_packet.hh"
#include "abstract_packet_queue.hh"
#include "exception.hh"
#include <map>

class InfinitePacketQueue : public AbstractPacketQueue
{
Expand All @@ -17,7 +18,7 @@ private:
int queue_size_in_bytes_ = 0, queue_size_in_packets_ = 0;

public:
InfinitePacketQueue( const std::string & args )
InfinitePacketQueue( ParsedArguments & args )
{
if ( not args.empty() ) {
throw std::runtime_error( "InfinitePacketQueue does not take arguments." );
Expand Down
6 changes: 3 additions & 3 deletions src/packet/pie_packet_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ using namespace std;

#define DQ_COUNT_INVALID (uint32_t)-1

PIEPacketQueue::PIEPacketQueue( const string & args )
PIEPacketQueue::PIEPacketQueue( ParsedArguments & args )
: DroppingPacketQueue(args),
qdelay_ref_ ( get_arg( args, "qdelay_ref" ) ),
max_burst_ ( get_arg( args, "max_burst" ) ),
qdelay_ref_ ( args.get_int_arg( "qdelay_ref" ) ),
max_burst_ ( args.get_int_arg( "max_burst" ) ),
alpha_ ( 0.125 ),
beta_ ( 1.25 ),
t_update_ ( 30 ),
Expand Down
2 changes: 1 addition & 1 deletion src/packet/pie_packet_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private:
void calculate_drop_prob ( void );

public:
PIEPacketQueue( const std::string & args );
PIEPacketQueue( ParsedArguments & args );

void enqueue( QueuedPacket && p ) override;

Expand Down
91 changes: 91 additions & 0 deletions src/packet/red_packet_queue.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#include "red_packet_queue.hh"
#include <algorithm>
#include "timestamp.hh"

using namespace std;

REDPacketQueue::REDPacketQueue( ParsedArguments & args)
: DroppingPacketQueue(args),
wq_(args.get_float_arg("wq")),
min_thresh_(args.get_float_arg("minthresh")),
max_thresh_(args.get_float_arg("maxthresh")),
transmission_time_(args.get_int_arg("transmission_time")),
time_at_zero_q_(0),
prng_( random_device()() ),
drop_dist_ (0, 1),
current_random_val_(0),
count_(0)
{

}

QueuedPacket REDPacketQueue::dequeue( void )
{
auto packet = DroppingPacketQueue::dequeue();
if (size_packets() == 0) {
time_at_zero_q_ = timestamp();
}

return packet;
}

void REDPacketQueue::enqueue( QueuedPacket && p )
{
auto instantaneous_queue_size = size_packets();

/* If the queue is empty, decay the weighted average by
* the amount of time that the queue has been
* empty. */
if (size_packets() == 0) {
weighted_average_ = powf((1 - wq_), (timestamp() - time_at_zero_q_)/transmission_time_) * weighted_average_;
} else {
weighted_average_ = (instantaneous_queue_size * wq_ ) + (1- wq_) * weighted_average_;
}


/* ratio is how full the queue is percentage-wise */
auto ratio = (weighted_average_)/packet_limit_;
bool accept_packet = true;

/*
* Logic for enqueuing packets:
* - If ratio is between the min and max threshold,
* begin counting the number of packets that have
* passed through the queue. We compute a drop
* probability based on `ratio`, and if the
* number of packets in the range exceeds
* some random number divided by that drop probability,
* drop the packet, and reset teh count.
* - If ratio is greater than the max_thresh_, drop
* the packet.
* - If it is under max thresh, don't
* drop the packet.
* - See: http://www.icir.org/floyd/papers/early.twocolumn.pdf
* for more details
*/
if (ratio >= min_thresh_ && ratio <= max_thresh_) {
count_++;
float drop_probability = (ratio - min_thresh_)/(max_thresh_ - min_thresh_);

if (count_ > 0 && count_ >= current_random_val_/drop_probability ) {
accept_packet = false;
count_ = 0;
}

if (count_ == 0) {
current_random_val_ = drop_dist_(prng_);
}
} else if (ratio > max_thresh_ ) {
accept_packet = false;
count_ = 0;
} else {
count_ = -1;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit expensive to have to compute a new 64-bit floating-point random number for every enqueued packet. Can you tell me how much CPU this ends up consuming for, e.g., traffic at 200 Mbps? Do we have to use this much precision? (And what do you think about the algorithm in the RED93 paper that only requires a new random number every time a packet is dropped or the moving average is updated?)


if ( accept_packet && good_with( size_bytes() + p.contents.size(),
size_packets() + 1 ) ) {
accept( move( p ) );
}

assert( good() );
}
50 changes: 50 additions & 0 deletions src/packet/red_packet_queue.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* -*-mode:c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

#ifndef RED_PACKET_QUEUE_HH
#define RED_PACKET_QUEUE_HH

#include <string>
#include "util.hh"
#include <iostream>

#include <fstream>
#include <memory>
#include <deque>
#include <random>
#include "dropping_packet_queue.hh"

/*
Random Early Detection (RED) AQM Implementation.
See section 3 of https://tools.ietf.org/html/rfc2309#ref-Jacobson88
for a description of the queuing discipline.
j

*/
class REDPacketQueue : public DroppingPacketQueue
{
private:
//Configuration parameters
double wq_, min_thresh_, max_thresh_;
uint32_t transmission_time_;
uint64_t time_at_zero_q_;
std::default_random_engine prng_;
std::uniform_real_distribution<float> drop_dist_;
float current_random_val_;
uint32_t count_;

const std::string & type( void ) const override
{
static const std::string type_ { "red" };
return type_;
}

double weighted_average_ = 0;
unsigned int max_queue_depth_packets() const;

public:
REDPacketQueue( ParsedArguments & args );
QueuedPacket dequeue( void ) override;
void enqueue( QueuedPacket && p ) override;
};

#endif
Loading