Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F120690277
delaytcp.cc
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sun, Jul 6, 08:26
Size
17 KB
Mime Type
text/x-c
Expires
Tue, Jul 8, 08:26 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
27221589
Attached To
R6591 HyMAB
delaytcp.cc
View Options
#include <click/config.h>
#include "util.hh"
#include "delaytcp.hh"
CLICK_DECLS
#define DEBUG_CHATTER(arg, ...) do { if (_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
#define VERB_DEBUG_CHATTER(arg, ...) do { if (_verb_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
// Christina: DelayTcp element without using timeouts for each packet arriving out of DelayTcp
DelayTcp::DelayTcp() : _active_timer(this), _print_timer(this) {}
DelayTcp::~DelayTcp(){}
int DelayTcp::configure(Vector<String> &conf, ErrorHandler *errh) {
click_chatter("[DelayTcp] enters configure.\n");
_debug = false;
_verb_debug = false;
_active = true;
_print_error = false;
alpha_ts = 50;
_fixed_delay = 0;
_packets_received = 0;
_packets_sent_directly = 0;
_packets_sent_delayed = 0;
if (Args(this, errh).bind(conf)
//.read("NODE_INDEX", _node_index)
.read("ACTIVE", _active)
.read("DEBUG", _debug)
.read("VERB_DEBUG", _verb_debug)
.read("FIXED_DELAY", _fixed_delay) // just add fixed delay to all packets
.consume() < 0)
return -1;
if(_verb_debug) _debug=true;
click_chatter("[DelayTcp] configure done.\n");
return 0;
}
int DelayTcp::initialize(ErrorHandler *errh) {
if(_debug)
click_chatter("[DelayTcp] Initializing...\n");
Element::initialize(errh);
_print_timer.initialize(this);
_active_timer.initialize(this);
return 0;
}
void DelayTcp::push(int port, Packet* p){
if(!_active) {
output(0).push(p);
return;
}
click_ip *ip_hdr = (click_ip *)(p->ip_header());
if(ip_hdr == 0) {
VERB_DEBUG_CHATTER("[DelayTcp] Could not get IP header from anno");
ip_hdr = (click_ip *)(p->data()+sizeof(acne_header));
}
// only for TCP
if(!ip_hdr || ip_hdr->ip_p != IP_PROTO_TCP) {
String err_b;
if(!ip_hdr)
err_b = " (non ip)";
else
err_b = " (proto is "+String::make_numeric(static_cast<String::uintmax_t>(ip_hdr->ip_p, 16, true))+")";
VERB_DEBUG_CHATTER("[DelayTcp %s] Non-IP or non-TCP packet%s.", Timestamp::now().unparse().c_str(), err_b.c_str());
output(0).push(p);
return;
}
click_tcp *tcp_hdr = (click_tcp *)(p->tcp_header());
if(tcp_hdr == 0)
tcp_hdr = (click_tcp *) (p->data()+sizeof(acne_header) + sizeof(click_ip));
// only for traffic
if(!tcp_hdr || tcp_hdr->th_flags & TH_SYN) {
VERB_DEBUG_CHATTER("[DelayTcp %s] Syn packet", Timestamp::now().unparse().c_str());
output(0).push(p);
return;
}
if(tcp_hdr->th_flags & TH_ACK) {
uint16_t a = htons(ip_hdr->ip_len);
uint32_t b = tcp_hdr->th_off;
uint32_t c = a - b*4 - 20;
if(c==0) {
VERB_DEBUG_CHATTER("[DelayTcp %s] Ack packet", Timestamp::now().unparse().c_str());
output(0).push(p);
return;
}
}
VERB_DEBUG_CHATTER("[DelayTcp %s] Received a packet", Timestamp::now().unparse().c_str());
_packets_received++;
acne_header *hdr = (acne_header *)(p->data());
if(hdr->_type != REGULAR_TRAFFIC) {
VERB_DEBUG_CHATTER("[DelayTcp %s] Non regular traffic packet", Timestamp::now().unparse().c_str());
output(0).push(p);
return;
}
// recompute delay difference every 500ms (100 ms first time)
IPAddress src = ip_hdr->ip_src;
if(!src) {
if(!_print_error)
click_chatter("[DelayTcp] Could not get src from IP header");
_print_error = true;
output(0).push(p);
return;
}
if(_fixed_delay > 0) {
// just add fixed delay
if(!is_in_hash_table(_pktTables,src) || !_pktTables.get(src)) {
VERB_DEBUG_CHATTER("[DelayTcp] Adding packet table for source %s", src.unparse().c_str());
_pktTables.set(src, new PacketTable());
}
if(!is_in_hash_table(_routeTables,src) || !_routeTables.get(src)) {
VERB_DEBUG_CHATTER("[DelayTcp] Adding route table for source %s", src.unparse().c_str());
_routeTables.set(src, new RouteTable());
}
PacketTable *_ptable = _pktTables.get(src);
RouteTable *_rtable = _routeTables.get(src);
assert(_ptable);
assert(_rtable);
_ptable->set(hdr->_seq, p);
_rtable->set(hdr->_seq, hdr->_route);
createPacketTimer(src,hdr->_seq, Timestamp(_fixed_delay));
return;
}
uint16_t packet_seq = hdr->_seq;
_now.assign_now();
_active_timer.schedule_after_sec(30);
if(_debug && !_print_timer.scheduled())
_print_timer.schedule_after_sec(1);
if(click_random(0,10) == 0) {
// once every 10 packet, keep packet delay (includes clock skew)
_froute2rate[src][hdr->_route] = ((1000-alpha_ts)*_froute2rate[src][hdr->_route] + alpha_ts*hdr->_rate_demand)/1000;
_froute2delta[src][hdr->_route] = ((1000-alpha_ts)*_froute2delta[src][hdr->_route] + alpha_ts*(_now-hdr->_ts))/1000;
}
//once every 100 packets, recompute delay diff
if(click_random(0,100) == 0) {
if(_froute2delta[src].size() == 2) {
if(!_print_timer.scheduled()) {
if(!is_in_hash_table(_route_max,src))
_print_timer.schedule_after_sec(1);
else
_print_timer.schedule_after_sec(2);
}
int i=0;
uint32_t rate_max;
uint32_t total_rate = 0;
for (HashTable<FormatedRoute, Timestamp>::iterator it = _froute2delta[src].begin(); it.live(); it++) {
if(i==0) {
// keep timestamp of route 1
_diff_route_delays.set(src,it.value());
_route_max.set(src,it.key());
rate_max = _froute2rate[src][it.key()];
total_rate+=rate_max;
}
else {
_diff_route_delays.set(src,_diff_route_delays[src]-it.value());
_route_min.set(src,it.key());
total_rate += _froute2rate[src][it.key()];
if(_diff_route_delays[src] < 0) {
// delay of route 1 smaller than delay of route 2; switch max and min
_diff_route_delays[src] = -_diff_route_delays[src];
_route_min[src] = _route_max[src];
_route_max[src] = it.key();
rate_max = _froute2rate[src][it.key()];
}
}
i++;
}
if(total_rate > 0)
_route_max_proportion.set(src,(1000*rate_max)/total_rate);
}
}
Timestamp time_to_wait;
if(_route_max_proportion[src] > 50 && _route_max_proportion[src] < 950) {
// delay unless it is already late
Timestamp diff = _froute2delta[src][_route_max[src]] - (_now-hdr->_ts);
if(diff.msecval() > 0) {
time_to_wait = diff;
if(_avg_waiting_time==Timestamp())
_avg_waiting_time = diff;
else
_avg_waiting_time = ((1000-alpha_ts)*_avg_waiting_time+alpha_ts*diff)/1000;
}
}
VERB_DEBUG_CHATTER("[DelayTcp %s] Received packet %d on route %s (ts %s)", _now.unparse().c_str(),
packet_seq, hdr->_route.unparse().c_str(), time_to_wait.unparse().c_str());
if(time_to_wait == Timestamp()) {
// if route is the route with max delay, just push packet
_packets_sent_directly++;
pushPackets(src,hdr->_seq,hdr->_route);
_last_seq.set(hdr->_route,hdr->_seq);
VERB_DEBUG_CHATTER("[DelayTcp %s] Pushing directly packet %d", Timestamp::now().unparse().c_str(), packet_seq);
output(0).push(p);
return;
}
// route is route with min delay; enqueue packet, timer expire after delay_diff
if(!_pktTables.get(src)) {
VERB_DEBUG_CHATTER("[DelayTcp] Adding packet table for source %s", src.unparse().c_str());
_pktTables.set(src, new PacketTable());
}
if(!_routeTables.get(src)) {
VERB_DEBUG_CHATTER("[DelayTcp] Adding route table for source %s", src.unparse().c_str());
_routeTables.set(src, new RouteTable());
}
PacketTable *_ptable = _pktTables.get(src);
RouteTable *_rtable = _routeTables.get(src);
assert(_ptable);
assert(_rtable);
_ptable->set(packet_seq, p);
_rtable->set(packet_seq, hdr->_route);
createPacketTimer(src,packet_seq,time_to_wait);
}
////////////////////////// PRIVATE FUNCTIONS ////////////////////////////////////
void DelayTcp::reset() {
click_chatter("[DelayTcp %s] Resetting DelayTcp", Timestamp::now().unparse().c_str());
for (HashTable<IPAddress, HashTable<FormatedRoute, Timestamp> >::iterator iter = _froute2delta.begin(); iter.live(); iter++) {
iter.value().clear();
}
_froute2delta.clear();
for (HashTable<IPAddress, HashTable<FormatedRoute, uint32_t> >::iterator iter = _froute2rate.begin(); iter.live(); iter++) {
iter.value().clear();
}
_froute2rate.clear();
_print_error = false;
for(HashTable<IPAddress, PacketTable*>::iterator iter = _pktTables.begin(); iter.live(); iter++) {
if(iter.value()) {
for(PacketTable::iterator it = iter.value()->begin();it.live();it++) {
if(it.value())
it.value()->kill();
}
}
}
_pktTables.clear();
_routeTables.clear();
_last_seq.clear();
_route_max.clear();
_route_min.clear();
_route_max_proportion.clear();
_diff_route_delays.clear();
}
void DelayTcp::run_timer(Timer *t) {
VERB_DEBUG_CHATTER("[DelayTcp %s] Run timer.", Timestamp::now().unparse().c_str());
if(t == &_active_timer) {
reset();
}
if(t == &_print_timer) {
if(_active) {
for (HashTable<IPAddress, HashTable<FormatedRoute, Timestamp> >::iterator iter = _froute2delta.begin(); iter.live(); iter++) {
IPAddress src = iter.key();
if(!is_in_hash_table(_froute2rate,src))
continue;
String s;
int nb_routes = iter.value().size();
if (nb_routes == 2) {
s="Delay difference between "+_route_min[src].unparse()+" and "+_route_max[src].unparse()+" is "+
_diff_route_delays[src].unparse()+" seconds ("+_froute2delta[src][_route_min[src]].unparse()+" ; "+
_froute2delta[src][_route_max[src]].unparse()+"). Rate proportion "+String(_route_max_proportion[src])+"/1000. Avg waiting time "+
_avg_waiting_time.unparse()+" seconds. ";
_avg_waiting_time = Timestamp();
}
int waiting_packets=0;
if(is_in_hash_table(_pktTables,src) && _pktTables[src])
waiting_packets=_pktTables[src]->size();
DEBUG_CHATTER("[DelayTcp %s] %s",Timestamp::now().unparse().c_str(),s.c_str());
DEBUG_CHATTER("\t %d packets waiting for %d routes (%d received, %d sent directly, %d sent delayed, %d ooo waiting %s sec)", waiting_packets,
nb_routes, _packets_received, _packets_sent_directly, _packets_sent_delayed, _packets_delayed_ooo, _avg_waiting_time_ooo.unparse().c_str());
_packets_received = 0;
_packets_sent_directly = 0;
_packets_sent_delayed = 0;
_packets_delayed_ooo = 0;
}
}
}
}
void DelayTcp::pushPackets(IPAddress src, uint16_t seq, FormatedRoute route_this_packet) {
PacketTable *_ptable = _pktTables[src];
RouteTable *_rtable = _routeTables[src];
if(_ptable && _rtable && is_in_hash_table(_last_seq,route_this_packet)) {
uint32_t i=1;
uint32_t first_seq = _last_seq[route_this_packet];
uint16_t current_seq = (uint16_t) ((first_seq+i)%MAX_SEQ);
_now.assign_now();
while( current_seq != (seq%MAX_SEQ) ) {
if(is_in_hash_table(*_rtable, current_seq) && _rtable->get(current_seq) == route_this_packet) {
Packet* packet = _ptable->get(current_seq);
if(packet) {
VERB_DEBUG_CHATTER("[DelayTcp] Pushing previous packet %d from route %s", current_seq, route_this_packet.unparse().c_str());
_packets_sent_delayed++;
_packets_delayed_ooo++;
_last_seq.set(route_this_packet,current_seq);
output(0).push(packet);
_ptable->erase(current_seq);
_rtable->erase(current_seq);
if(is_in_hash_table(_expiration_time,src)) {
if(is_in_hash_table(_expiration_time[src],current_seq)) {
if(_avg_waiting_time_ooo==Timestamp())
_avg_waiting_time_ooo = _expiration_time[src][current_seq]-_now;
else
_avg_waiting_time_ooo = ((1000-alpha_ts)*_avg_waiting_time_ooo + alpha_ts*(_expiration_time[src][current_seq]-_now))/1000;
_expiration_time[src].erase(current_seq);
}
}
}
}
i++;
current_seq = (uint16_t) ((first_seq+i)%MAX_SEQ);
if(i==1000) {
click_chatter("[DelayTcp %s] Count to infinity? i=1000, first_seq=%u, seq=%u for route %s. New last seq %u", Timestamp::now().unparse().c_str(),
first_seq, seq, route_this_packet.unparse().c_str(), _last_seq[route_this_packet]);
break;
}
}
}
}
/////////////////////// TIMEOUT staff /////////////////////
void DelayTcp::handleExpiryPacket(Timer*, void * data) {
PacketTimerData *tdata = (PacketTimerData*) data;
assert(tdata);
tdata->elmt->expire_packet(tdata);
}
void DelayTcp::expire_packet(PacketTimerData *tdata) {
// push expired packet
VERB_DEBUG_CHATTER("[DelayTcp %s] Packet timer expired for packet %d", Timestamp::now().unparse().c_str(), tdata->seq);
if(!is_in_hash_table(_pktTables,*tdata->src))
return;
PacketTable *_ptable = _pktTables[*tdata->src];
RouteTable *_rtable = _routeTables[*tdata->src];
// send previous packets of same route
pushPackets(*tdata->src, tdata->seq, _rtable->get(tdata->seq));
Packet* packet = _ptable->get(tdata->seq);
if(packet) {
VERB_DEBUG_CHATTER("[DelayTcp] Pushing packet %d from route %s", tdata->seq, _rtable->get(tdata->seq).unparse().c_str());
_packets_sent_delayed++;
_last_seq.set(_rtable->get(tdata->seq),tdata->seq);
output(0).push(packet);
_ptable->erase(tdata->seq);
_rtable->erase(tdata->seq);
if(is_in_hash_table(_expiration_time,*tdata->src))
_expiration_time[*tdata->src].erase(tdata->seq);
}
}
void DelayTcp::createPacketTimer(IPAddress const& src, uint16_t seq, Timestamp expire_time) {
// create a timer for the source and use it to have timeout on the waiting of out of DelayTcp packets
VERB_DEBUG_CHATTER("[DelayTcp %s] Creating timer for packet from %s with seq %u. Expires in %s", Timestamp::now().unparse().c_str(),
src.unparse().c_str(), seq, expire_time.unparse().c_str());
PacketTimerData* timerdata = new PacketTimerData();
timerdata->elmt = this;
timerdata->src = new IPAddress(src);
timerdata->seq = seq;
Timer* timer = new Timer(&DelayTcp::handleExpiryPacket, timerdata);
timer->initialize(this);
if(!is_in_hash_table(_expiration_time,src))
_expiration_time.set(src,HashTable<uint16_t,Timestamp>());
_expiration_time[src].set(seq,_now+expire_time);
timer->schedule_after(expire_time);
}
int
DelayTcp::active_handler(const String &s, Element *e, void *,
ErrorHandler *errh) {
DelayTcp *elmt = (DelayTcp *)e;
int active;
if(!cp_integer(s, &active))
return errh->error("Active must be 0 or 1");
if (!(active == 0 || active == 1))
return errh->error("Active must be 0 or 1");
elmt->set_active(active==1);
return 0;
}
int
DelayTcp::fixed_delay_handler(const String &s, Element *e, void *,
ErrorHandler *errh) {
DelayTcp *elmt = (DelayTcp *)e;
double arg;
if(!cp_double(s, &arg))
return errh->error("Active must be double");
elmt->set_fixed_delay(arg);
return 0;
}
int
DelayTcp::debug_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
DelayTcp *elmt = (DelayTcp *)e;
int debug;
if(!cp_integer(s, &debug))
return errh->error("Debug must be 0 or 1");
if (!(debug == 0 || debug == 1))
return errh->error("Debug must be 0 or 1");
if(((intptr_t) a) == 0)
elmt->set_debug(debug==1);
else if(((intptr_t) a) == 1)
elmt->set_verb_debug(debug==1);
return 0;
}
void DelayTcp::add_handlers() {
add_write_handler("debug", debug_handler, 0);
add_write_handler("verb_debug", debug_handler, 1);
add_write_handler("active", active_handler, 0);
add_write_handler("fixed_delay", fixed_delay_handler);
}
CLICK_ENDDECLS
EXPORT_ELEMENT(DelayTcp)
ELEMENT_PROVIDES(DelayTcp)
Event Timeline
Log In to Comment