Page MenuHomec4science

delaytcp.cc
No OneTemporary

File Metadata

Created
Sun, Jul 6, 08:26

delaytcp.cc

#include <click/config.h>
#include "util.hh"
#include "delaytcp.hh"
CLICK_DECLS
#define DEBUG_CHATTER(arg, ...) do { if (_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
#define VERB_DEBUG_CHATTER(arg, ...) do { if (_verb_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
// Christina: DelayTcp element without using timeouts for each packet arriving out of DelayTcp
DelayTcp::DelayTcp() : _active_timer(this), _print_timer(this) {}
DelayTcp::~DelayTcp(){}
int DelayTcp::configure(Vector<String> &conf, ErrorHandler *errh) {
click_chatter("[DelayTcp] enters configure.\n");
_debug = false;
_verb_debug = false;
_active = true;
_print_error = false;
alpha_ts = 50;
_fixed_delay = 0;
_packets_received = 0;
_packets_sent_directly = 0;
_packets_sent_delayed = 0;
if (Args(this, errh).bind(conf)
//.read("NODE_INDEX", _node_index)
.read("ACTIVE", _active)
.read("DEBUG", _debug)
.read("VERB_DEBUG", _verb_debug)
.read("FIXED_DELAY", _fixed_delay) // just add fixed delay to all packets
.consume() < 0)
return -1;
if(_verb_debug) _debug=true;
click_chatter("[DelayTcp] configure done.\n");
return 0;
}
int DelayTcp::initialize(ErrorHandler *errh) {
if(_debug)
click_chatter("[DelayTcp] Initializing...\n");
Element::initialize(errh);
_print_timer.initialize(this);
_active_timer.initialize(this);
return 0;
}
void DelayTcp::push(int port, Packet* p){
if(!_active) {
output(0).push(p);
return;
}
click_ip *ip_hdr = (click_ip *)(p->ip_header());
if(ip_hdr == 0) {
VERB_DEBUG_CHATTER("[DelayTcp] Could not get IP header from anno");
ip_hdr = (click_ip *)(p->data()+sizeof(acne_header));
}
// only for TCP
if(!ip_hdr || ip_hdr->ip_p != IP_PROTO_TCP) {
String err_b;
if(!ip_hdr)
err_b = " (non ip)";
else
err_b = " (proto is "+String::make_numeric(static_cast<String::uintmax_t>(ip_hdr->ip_p, 16, true))+")";
VERB_DEBUG_CHATTER("[DelayTcp %s] Non-IP or non-TCP packet%s.", Timestamp::now().unparse().c_str(), err_b.c_str());
output(0).push(p);
return;
}
click_tcp *tcp_hdr = (click_tcp *)(p->tcp_header());
if(tcp_hdr == 0)
tcp_hdr = (click_tcp *) (p->data()+sizeof(acne_header) + sizeof(click_ip));
// only for traffic
if(!tcp_hdr || tcp_hdr->th_flags & TH_SYN) {
VERB_DEBUG_CHATTER("[DelayTcp %s] Syn packet", Timestamp::now().unparse().c_str());
output(0).push(p);
return;
}
if(tcp_hdr->th_flags & TH_ACK) {
uint16_t a = htons(ip_hdr->ip_len);
uint32_t b = tcp_hdr->th_off;
uint32_t c = a - b*4 - 20;
if(c==0) {
VERB_DEBUG_CHATTER("[DelayTcp %s] Ack packet", Timestamp::now().unparse().c_str());
output(0).push(p);
return;
}
}
VERB_DEBUG_CHATTER("[DelayTcp %s] Received a packet", Timestamp::now().unparse().c_str());
_packets_received++;
acne_header *hdr = (acne_header *)(p->data());
if(hdr->_type != REGULAR_TRAFFIC) {
VERB_DEBUG_CHATTER("[DelayTcp %s] Non regular traffic packet", Timestamp::now().unparse().c_str());
output(0).push(p);
return;
}
// recompute delay difference every 500ms (100 ms first time)
IPAddress src = ip_hdr->ip_src;
if(!src) {
if(!_print_error)
click_chatter("[DelayTcp] Could not get src from IP header");
_print_error = true;
output(0).push(p);
return;
}
if(_fixed_delay > 0) {
// just add fixed delay
if(!is_in_hash_table(_pktTables,src) || !_pktTables.get(src)) {
VERB_DEBUG_CHATTER("[DelayTcp] Adding packet table for source %s", src.unparse().c_str());
_pktTables.set(src, new PacketTable());
}
if(!is_in_hash_table(_routeTables,src) || !_routeTables.get(src)) {
VERB_DEBUG_CHATTER("[DelayTcp] Adding route table for source %s", src.unparse().c_str());
_routeTables.set(src, new RouteTable());
}
PacketTable *_ptable = _pktTables.get(src);
RouteTable *_rtable = _routeTables.get(src);
assert(_ptable);
assert(_rtable);
_ptable->set(hdr->_seq, p);
_rtable->set(hdr->_seq, hdr->_route);
createPacketTimer(src,hdr->_seq, Timestamp(_fixed_delay));
return;
}
uint16_t packet_seq = hdr->_seq;
_now.assign_now();
_active_timer.schedule_after_sec(30);
if(_debug && !_print_timer.scheduled())
_print_timer.schedule_after_sec(1);
if(click_random(0,10) == 0) {
// once every 10 packet, keep packet delay (includes clock skew)
_froute2rate[src][hdr->_route] = ((1000-alpha_ts)*_froute2rate[src][hdr->_route] + alpha_ts*hdr->_rate_demand)/1000;
_froute2delta[src][hdr->_route] = ((1000-alpha_ts)*_froute2delta[src][hdr->_route] + alpha_ts*(_now-hdr->_ts))/1000;
}
//once every 100 packets, recompute delay diff
if(click_random(0,100) == 0) {
if(_froute2delta[src].size() == 2) {
if(!_print_timer.scheduled()) {
if(!is_in_hash_table(_route_max,src))
_print_timer.schedule_after_sec(1);
else
_print_timer.schedule_after_sec(2);
}
int i=0;
uint32_t rate_max;
uint32_t total_rate = 0;
for (HashTable<FormatedRoute, Timestamp>::iterator it = _froute2delta[src].begin(); it.live(); it++) {
if(i==0) {
// keep timestamp of route 1
_diff_route_delays.set(src,it.value());
_route_max.set(src,it.key());
rate_max = _froute2rate[src][it.key()];
total_rate+=rate_max;
}
else {
_diff_route_delays.set(src,_diff_route_delays[src]-it.value());
_route_min.set(src,it.key());
total_rate += _froute2rate[src][it.key()];
if(_diff_route_delays[src] < 0) {
// delay of route 1 smaller than delay of route 2; switch max and min
_diff_route_delays[src] = -_diff_route_delays[src];
_route_min[src] = _route_max[src];
_route_max[src] = it.key();
rate_max = _froute2rate[src][it.key()];
}
}
i++;
}
if(total_rate > 0)
_route_max_proportion.set(src,(1000*rate_max)/total_rate);
}
}
Timestamp time_to_wait;
if(_route_max_proportion[src] > 50 && _route_max_proportion[src] < 950) {
// delay unless it is already late
Timestamp diff = _froute2delta[src][_route_max[src]] - (_now-hdr->_ts);
if(diff.msecval() > 0) {
time_to_wait = diff;
if(_avg_waiting_time==Timestamp())
_avg_waiting_time = diff;
else
_avg_waiting_time = ((1000-alpha_ts)*_avg_waiting_time+alpha_ts*diff)/1000;
}
}
VERB_DEBUG_CHATTER("[DelayTcp %s] Received packet %d on route %s (ts %s)", _now.unparse().c_str(),
packet_seq, hdr->_route.unparse().c_str(), time_to_wait.unparse().c_str());
if(time_to_wait == Timestamp()) {
// if route is the route with max delay, just push packet
_packets_sent_directly++;
pushPackets(src,hdr->_seq,hdr->_route);
_last_seq.set(hdr->_route,hdr->_seq);
VERB_DEBUG_CHATTER("[DelayTcp %s] Pushing directly packet %d", Timestamp::now().unparse().c_str(), packet_seq);
output(0).push(p);
return;
}
// route is route with min delay; enqueue packet, timer expire after delay_diff
if(!_pktTables.get(src)) {
VERB_DEBUG_CHATTER("[DelayTcp] Adding packet table for source %s", src.unparse().c_str());
_pktTables.set(src, new PacketTable());
}
if(!_routeTables.get(src)) {
VERB_DEBUG_CHATTER("[DelayTcp] Adding route table for source %s", src.unparse().c_str());
_routeTables.set(src, new RouteTable());
}
PacketTable *_ptable = _pktTables.get(src);
RouteTable *_rtable = _routeTables.get(src);
assert(_ptable);
assert(_rtable);
_ptable->set(packet_seq, p);
_rtable->set(packet_seq, hdr->_route);
createPacketTimer(src,packet_seq,time_to_wait);
}
////////////////////////// PRIVATE FUNCTIONS ////////////////////////////////////
void DelayTcp::reset() {
click_chatter("[DelayTcp %s] Resetting DelayTcp", Timestamp::now().unparse().c_str());
for (HashTable<IPAddress, HashTable<FormatedRoute, Timestamp> >::iterator iter = _froute2delta.begin(); iter.live(); iter++) {
iter.value().clear();
}
_froute2delta.clear();
for (HashTable<IPAddress, HashTable<FormatedRoute, uint32_t> >::iterator iter = _froute2rate.begin(); iter.live(); iter++) {
iter.value().clear();
}
_froute2rate.clear();
_print_error = false;
for(HashTable<IPAddress, PacketTable*>::iterator iter = _pktTables.begin(); iter.live(); iter++) {
if(iter.value()) {
for(PacketTable::iterator it = iter.value()->begin();it.live();it++) {
if(it.value())
it.value()->kill();
}
}
}
_pktTables.clear();
_routeTables.clear();
_last_seq.clear();
_route_max.clear();
_route_min.clear();
_route_max_proportion.clear();
_diff_route_delays.clear();
}
void DelayTcp::run_timer(Timer *t) {
VERB_DEBUG_CHATTER("[DelayTcp %s] Run timer.", Timestamp::now().unparse().c_str());
if(t == &_active_timer) {
reset();
}
if(t == &_print_timer) {
if(_active) {
for (HashTable<IPAddress, HashTable<FormatedRoute, Timestamp> >::iterator iter = _froute2delta.begin(); iter.live(); iter++) {
IPAddress src = iter.key();
if(!is_in_hash_table(_froute2rate,src))
continue;
String s;
int nb_routes = iter.value().size();
if (nb_routes == 2) {
s="Delay difference between "+_route_min[src].unparse()+" and "+_route_max[src].unparse()+" is "+
_diff_route_delays[src].unparse()+" seconds ("+_froute2delta[src][_route_min[src]].unparse()+" ; "+
_froute2delta[src][_route_max[src]].unparse()+"). Rate proportion "+String(_route_max_proportion[src])+"/1000. Avg waiting time "+
_avg_waiting_time.unparse()+" seconds. ";
_avg_waiting_time = Timestamp();
}
int waiting_packets=0;
if(is_in_hash_table(_pktTables,src) && _pktTables[src])
waiting_packets=_pktTables[src]->size();
DEBUG_CHATTER("[DelayTcp %s] %s",Timestamp::now().unparse().c_str(),s.c_str());
DEBUG_CHATTER("\t %d packets waiting for %d routes (%d received, %d sent directly, %d sent delayed, %d ooo waiting %s sec)", waiting_packets,
nb_routes, _packets_received, _packets_sent_directly, _packets_sent_delayed, _packets_delayed_ooo, _avg_waiting_time_ooo.unparse().c_str());
_packets_received = 0;
_packets_sent_directly = 0;
_packets_sent_delayed = 0;
_packets_delayed_ooo = 0;
}
}
}
}
void DelayTcp::pushPackets(IPAddress src, uint16_t seq, FormatedRoute route_this_packet) {
PacketTable *_ptable = _pktTables[src];
RouteTable *_rtable = _routeTables[src];
if(_ptable && _rtable && is_in_hash_table(_last_seq,route_this_packet)) {
uint32_t i=1;
uint32_t first_seq = _last_seq[route_this_packet];
uint16_t current_seq = (uint16_t) ((first_seq+i)%MAX_SEQ);
_now.assign_now();
while( current_seq != (seq%MAX_SEQ) ) {
if(is_in_hash_table(*_rtable, current_seq) && _rtable->get(current_seq) == route_this_packet) {
Packet* packet = _ptable->get(current_seq);
if(packet) {
VERB_DEBUG_CHATTER("[DelayTcp] Pushing previous packet %d from route %s", current_seq, route_this_packet.unparse().c_str());
_packets_sent_delayed++;
_packets_delayed_ooo++;
_last_seq.set(route_this_packet,current_seq);
output(0).push(packet);
_ptable->erase(current_seq);
_rtable->erase(current_seq);
if(is_in_hash_table(_expiration_time,src)) {
if(is_in_hash_table(_expiration_time[src],current_seq)) {
if(_avg_waiting_time_ooo==Timestamp())
_avg_waiting_time_ooo = _expiration_time[src][current_seq]-_now;
else
_avg_waiting_time_ooo = ((1000-alpha_ts)*_avg_waiting_time_ooo + alpha_ts*(_expiration_time[src][current_seq]-_now))/1000;
_expiration_time[src].erase(current_seq);
}
}
}
}
i++;
current_seq = (uint16_t) ((first_seq+i)%MAX_SEQ);
if(i==1000) {
click_chatter("[DelayTcp %s] Count to infinity? i=1000, first_seq=%u, seq=%u for route %s. New last seq %u", Timestamp::now().unparse().c_str(),
first_seq, seq, route_this_packet.unparse().c_str(), _last_seq[route_this_packet]);
break;
}
}
}
}
/////////////////////// TIMEOUT staff /////////////////////
void DelayTcp::handleExpiryPacket(Timer*, void * data) {
PacketTimerData *tdata = (PacketTimerData*) data;
assert(tdata);
tdata->elmt->expire_packet(tdata);
}
void DelayTcp::expire_packet(PacketTimerData *tdata) {
// push expired packet
VERB_DEBUG_CHATTER("[DelayTcp %s] Packet timer expired for packet %d", Timestamp::now().unparse().c_str(), tdata->seq);
if(!is_in_hash_table(_pktTables,*tdata->src))
return;
PacketTable *_ptable = _pktTables[*tdata->src];
RouteTable *_rtable = _routeTables[*tdata->src];
// send previous packets of same route
pushPackets(*tdata->src, tdata->seq, _rtable->get(tdata->seq));
Packet* packet = _ptable->get(tdata->seq);
if(packet) {
VERB_DEBUG_CHATTER("[DelayTcp] Pushing packet %d from route %s", tdata->seq, _rtable->get(tdata->seq).unparse().c_str());
_packets_sent_delayed++;
_last_seq.set(_rtable->get(tdata->seq),tdata->seq);
output(0).push(packet);
_ptable->erase(tdata->seq);
_rtable->erase(tdata->seq);
if(is_in_hash_table(_expiration_time,*tdata->src))
_expiration_time[*tdata->src].erase(tdata->seq);
}
}
void DelayTcp::createPacketTimer(IPAddress const& src, uint16_t seq, Timestamp expire_time) {
// create a timer for the source and use it to have timeout on the waiting of out of DelayTcp packets
VERB_DEBUG_CHATTER("[DelayTcp %s] Creating timer for packet from %s with seq %u. Expires in %s", Timestamp::now().unparse().c_str(),
src.unparse().c_str(), seq, expire_time.unparse().c_str());
PacketTimerData* timerdata = new PacketTimerData();
timerdata->elmt = this;
timerdata->src = new IPAddress(src);
timerdata->seq = seq;
Timer* timer = new Timer(&DelayTcp::handleExpiryPacket, timerdata);
timer->initialize(this);
if(!is_in_hash_table(_expiration_time,src))
_expiration_time.set(src,HashTable<uint16_t,Timestamp>());
_expiration_time[src].set(seq,_now+expire_time);
timer->schedule_after(expire_time);
}
int
DelayTcp::active_handler(const String &s, Element *e, void *,
ErrorHandler *errh) {
DelayTcp *elmt = (DelayTcp *)e;
int active;
if(!cp_integer(s, &active))
return errh->error("Active must be 0 or 1");
if (!(active == 0 || active == 1))
return errh->error("Active must be 0 or 1");
elmt->set_active(active==1);
return 0;
}
int
DelayTcp::fixed_delay_handler(const String &s, Element *e, void *,
ErrorHandler *errh) {
DelayTcp *elmt = (DelayTcp *)e;
double arg;
if(!cp_double(s, &arg))
return errh->error("Active must be double");
elmt->set_fixed_delay(arg);
return 0;
}
int
DelayTcp::debug_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
DelayTcp *elmt = (DelayTcp *)e;
int debug;
if(!cp_integer(s, &debug))
return errh->error("Debug must be 0 or 1");
if (!(debug == 0 || debug == 1))
return errh->error("Debug must be 0 or 1");
if(((intptr_t) a) == 0)
elmt->set_debug(debug==1);
else if(((intptr_t) a) == 1)
elmt->set_verb_debug(debug==1);
return 0;
}
void DelayTcp::add_handlers() {
add_write_handler("debug", debug_handler, 0);
add_write_handler("verb_debug", debug_handler, 1);
add_write_handler("active", active_handler, 0);
add_write_handler("fixed_delay", fixed_delay_handler);
}
CLICK_ENDDECLS
EXPORT_ELEMENT(DelayTcp)
ELEMENT_PROVIDES(DelayTcp)

Event Timeline