Page MenuHomec4science

order.cc
No OneTemporary

File Metadata

Created
Sat, Jul 5, 14:07

order.cc

#include <click/config.h>
#include "util.hh"
#include "order.hh"
CLICK_DECLS
#define DEBUG_CHATTER(arg, ...) do { if (_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
#define VERB_DEBUG_CHATTER(arg, ...) do { if (_verb_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
// Christina: order element without using timeouts for each packet arriving out of order
Order::Order() : _notifier(ActiveNotifier::SEARCH_CONTINUE_WAKE),_expire_timer_s(this), _packet_timeout_timer(this), _active_timer(this), _nack_timer(this) {}
Order::~Order(){}
void * Order::cast(const char *n) {
if (strcmp(n, ActiveNotifier::EMPTY_NOTIFIER) == 0){
return &_notifier;
}
else
return Element::cast(n);
}
int Order::configure(Vector<String> &conf, ErrorHandler *errh) {
click_chatter("[Order] enters configure.\n");
_notifier.initialize(ActiveNotifier::EMPTY_NOTIFIER, router());
_debug = false;
_verb_debug = false;
_debug_nack = false;
_debug_explore = false;
_active = true;
_packet_lifetime_ms = 0;
_route_lifetime_ms = 1000;
_strip_acne = true;
_max_num_routes = 2;
_dropped = 0;
_timeout_packets = false;
_delayed_reordering = false;
_with_nack_freq = 0;
_source_lifetime_s = 2;
_print_stats_freq = 0;
alpha_ts = 100;
if (Args(this, errh).bind(conf)
//.read("NODE_INDEX", _node_index)
.read("ACTIVE", _active)
.read("DEBUG", _debug)
.read("DEBUG_NACK", _debug_nack)
.read("DEBUG_EXPLORE", _debug_explore)
.read("VERB_DEBUG", _verb_debug)
.read("PACKET_TIMEOUT", _packet_lifetime_ms)
.read("ROUTE_TIMEOUT", _route_lifetime_ms)
.read("SOURCE_TIMEOUT", _source_lifetime_s)
.read("PRINT_STATS_MS", _print_stats_freq)
.read("STRIP_ACNE", _strip_acne)
.read("MAX_ROUTES", _max_num_routes)
.read("TIMEOUT_PACKETS", _timeout_packets)
.read("DELAYED_REORDERING", _delayed_reordering)
.read("WITH_NACK_FREQ_MS", _with_nack_freq) // send NACK messages at this frequency (0: no NACK)
.consume() < 0)
return -1;
if(_verb_debug) _debug=true;
click_chatter("[Order] configure done.\n");
return 0;
}
int Order::initialize(ErrorHandler *errh) {
if(_debug)
click_chatter("[Order] Initializing...\n");
if (!_rtl)
return errh->error("[Order] Needs a RoutingLinks element");
Element::initialize(errh);
_expire_timer_s.initialize(this);
if(_print_stats_freq > 0)
_expire_timer_s.schedule_after_msec(mymin(_print_stats_freq,1000*_source_lifetime_s));
else
_expire_timer_s.schedule_after_sec(_source_lifetime_s);
_packet_timeout_timer.initialize(this);
_active_timer.initialize(this);
_nack_timer.initialize(this);
return 0;
}
void Order::set_rtl(RoutingLinks *rtl) {
click_chatter("[Order] RoutingLinks element is set.");
_rtl = rtl;
}
void
Order::notify_invalid_route(FormatedRoute froute,IPAddress src) {
DEBUG_CHATTER("[Order] I was notified for an invalid route %s for src %s",froute.unparse().c_str(),src.unparse().c_str());
SeqFormRoute *_seqformroutetable = _SeqFormRouteTables[src];
if (_seqformroutetable) {
_seqformroutetable->erase(froute);
DEBUG_CHATTER("[Order] Removing invalid route %s ",froute.unparse().c_str());
}
}
void Order::push(int port, Packet* p){
//Receives layer 2.5 frames, not necessarily in order.
//Needs to wait long enough (but not too long) to deliver in order to upper layer.
//Drop after a timeout (e.g., 50-100 ms)
Timestamp now = Timestamp::now();
acne_header *hdr = (acne_header *)(p->data());
click_ip *ip_hdr = (click_ip *) (p->data() + sizeof(acne_header));
IPAddress src;
if(ip_hdr)
src = ip_hdr->ip_src;
if(hdr->_type == SPECIAL_TRAFFIC) {
VERB_DEBUG_CHATTER("[Order] Push special traffic packet.");
output(0).push(p);
return;
}
if (hdr->_type == MAB_CONTROL) {
if(_active) {
src = _rtl->get_ip(hdr->_route._src);
mab_control *ctrl_hdr = (mab_control *) (p->data() + sizeof(acne_header));
if(ctrl_hdr->_type == MAB_CTRL_BEGIN) {
if(_debug_explore)
_verb_debug=true;
_sent_explore = 0;
_sent_explore_timeout = 0;
_sent_explore_error = 0;
_received_explore = 0;
DEBUG_CHATTER("[Order %s] MAB control BEGIN message %d, id %d, remove routes from source %s...",
now.unparse().c_str(), ctrl_hdr->_type, hdr->_id_rate_mab, src.unparse().c_str());
remove_routes(src);
}
else if(ctrl_hdr->_type == MAB_CTRL_END) { // TODO: works bad if END is lost, or if explore traffic is received after
if(_debug_explore)
_verb_debug=false;
DEBUG_CHATTER("[Order %s] MAB control END message %d, id %d, remove routes from source %s, has received %d packets, sent %d (%d timeout, %d error)...",
now.unparse().c_str(), ctrl_hdr->_type, hdr->_id_rate_mab, src.unparse().c_str(), _received_explore, _sent_explore, _sent_explore_timeout, _sent_explore_error);
remove_routes(src);
}
}
p->kill();
return;
}
if (!src) {
src = _rtl->get_ip(hdr->_route._src);
}
if(!src) {
VERB_DEBUG_CHATTER("[Order] Could not get src at all, push packet.");
_sent_explore_error++;
output(0).push(p);
return;
}
if(!_active) {
_sent_explore_error++;
output(0).push(delete_routing_header(p));
return;
}
// keep packet delay (includes clock skew)
uint16_t packet_seq = hdr->_seq;
if(_verb_debug) {
bool traffic = true;
String tcp_seq;
if(ip_hdr->ip_p == IP_PROTO_TCP) {
click_tcp *tcp_hdr = (click_tcp *)(p->tcp_header());
if(tcp_hdr == 0)
tcp_hdr = (click_tcp *) (p->data()+sizeof(acne_header) + sizeof(click_ip));
if(tcp_hdr) {
if(tcp_hdr->th_flags & TH_SYN)
traffic = false;
else if(tcp_hdr->th_flags & TH_ACK) {
uint16_t a = htons(ip_hdr->ip_len);
uint32_t b = tcp_hdr->th_off;
uint32_t c = a - b*4 - 20;
if(c==0)
traffic = false;
}
if(traffic)
tcp_seq = " (TCP seq "+String(htonl(tcp_hdr->th_seq)+htons(ip_hdr->ip_len) - 52)+")";
}
}
click_chatter("[Order %s] Receiving packet from %s, seq %u%s", now.unparse().c_str(), src.unparse().c_str(), packet_seq, tcp_seq.c_str());
}
// if(!is_in_hash_table(_froute2delta,src)) {
// _froute2delta.set(src, HashTable<FormatedRoute, Timestamp>());
// _froute2rate.set(src, HashTable<FormatedRoute, uint32_t>());
// }
// if(!is_in_hash_table(_froute2rate[src],hdr->_route))
// _froute2rate[src].set(hdr->_route, hdr->_rate_demand);
// else
// _froute2rate[src][hdr->_route] = ((1000-alpha_ts)*_froute2rate[src][hdr->_route] + alpha_ts*hdr->_rate_demand)/1000;
// if(!is_in_hash_table(_froute2delta[src], hdr->_route))
// _froute2delta[src].set(hdr->_route, now-hdr->_ts);
// else
// _froute2delta[src][hdr->_route] = ((1000-alpha_ts)*_froute2delta[src][hdr->_route] + alpha_ts*(now-hdr->_ts))/1000;
_current_arrived_seq = packet_seq;
uint16_t cur_seq = getCurrentSeqNum(src, packet_seq);
_froute2ts.set(hdr->_route, now);
_src2ts.set(src, now);
hdr->_ts = now;
_received_explore++;
int cur_seq_int = (int) cur_seq;
int packet_seq_int = (int) packet_seq;
int diff = cur_seq_int - packet_seq_int;
// check if packet_seq has not already been seen (might happen with nack restransmissions), ie smaller than cur_seq
if( (diff >= 0 && diff <= ((int) MAX_SEQ/2)) || diff <= -1*((int) MAX_SEQ/2) ) {
_dropped++;
VERB_DEBUG_CHATTER("[Order %s] Received packet seq %u, smaller than current seq %u, kill it", now.unparse().c_str(), packet_seq, cur_seq);
p->kill();
return;
}
_received_packets++;
if(!_packet_timeout_timer.scheduled() && _route_lifetime_ms > 0)
_packet_timeout_timer.schedule_after_msec(_route_lifetime_ms);
if(_with_nack_freq == 0 && !_nack_timer.scheduled())
_nack_timer.schedule_after_msec(100); // needs to send empty acks so that source erases packets
bool ordered_packet = false;
if(packet_seq == cur_seq + 1 || (packet_seq==0 && cur_seq == MAX_SEQ-1)) {
// normal case: seq is (previous seq + 1)
ordered_packet = true;
}
// else if(packet_seq == ((((cur_seq + 1) % MAX_SEQ) + 1) % MAX_SEQ) ) {
// // seq is previous_seq + 2: OK if previous_seq+1 has been sent on this route (field hdr->_previous_seq_route), means missed
// if(hdr->_previous_seq_route == ((cur_seq + 1) % MAX_SEQ)) {
// VERB_DEBUG_CHATTER("[Order %s] Packet %u for src %s missed", now.unparse().c_str(), (cur_seq+1)%MAX_SEQ, src.unparse().c_str());
// _caught_missed_packet++;
// _missed_packets[src].set((cur_seq+1)%MAX_SEQ,1);
// ordered_packet = true;
// }
// }
if (ordered_packet) {
// if(_debug && (cur_seq==MAX_SEQ && packet_seq == 0))
// click_chatter("[Order %s] Has reached MAX_SEQ.", now.unparse().c_str());
// for each packet that arrives, store the last sequence seen from a specific route
// ASSUMPTION: packets from a specific route arrive in increasing order (i.e. they are not re-ordered)
update_route(hdr->_route, src, packet_seq, p);
// packet in the correct order, forward immediately
_sent_explore++;
output(0).push(delete_routing_header(p));
VERB_DEBUG_CHATTER("[Order %s] Forwarding packet with seq %u, for src %s from route %s\n", now.unparse().c_str(), packet_seq, src.unparse().c_str(), hdr->_route.unparse().c_str());
// forward stored packets that arrived earlier
_missed_packets[src].erase(packet_seq);
pushOldPackets(src, packet_seq, true);
}
// else if(packet_seq<=cur_seq && dist_mod_maxseq(packet_seq, cur_seq) <= MAX_SEQ/2) {
// // might happen for repeated sequence numbers (e.g. packets to maintain routes); just push
// DEBUG_CHATTER("[Order %s] Pushing repeated sequence number.", now.unparse().c_str());
// output(0).push(delete_routing_header(p));
// }
else if ( dist_mod_maxseq(cur_seq, packet_seq) > MAX_SEQ/2 && !_newsrcTable.get(src)) {
// Will drop packet (if the following algorithm works well, we should drop very few packets)
if((_dropped%500) == 0)
DEBUG_CHATTER("[Order %s] Dropping packet with seq %u, for src %s current seq %u; has dropped %d\n", now.unparse().c_str(),
packet_seq, src.unparse().c_str(), cur_seq, _dropped);
else
VERB_DEBUG_CHATTER("[Order %s] Dropping packet with seq %u, for src %s current seq %u; has dropped %d\n", now.unparse().c_str(),
packet_seq, src.unparse().c_str(), cur_seq, _dropped);
_dropped++;
p->kill();
}
else {
// for each packet that arrives, store the last sequence seen from a specific route
// ASSUMPTION: packets from a specific route arrive in increasing order (i.e. they are not re-ordered)
VERB_DEBUG_CHATTER("[Order %s] Saving packet with seq %u, for src %s, from route %s \n", now.unparse().c_str(),
packet_seq, src.unparse().c_str(), hdr->_route.unparse().c_str());
_missed_packets[src].erase(packet_seq);
update_route(hdr->_route, src, packet_seq, p);
PacketTable* _ptable = _pktTables.get(src);
assert(_ptable);
_ptable->set(packet_seq, p);
if(ip_hdr->ip_p == IP_PROTO_TCP && _delayed_reordering && _diff_route_delays.get(src)!=Timestamp()) {
_saved_packets++;
createPacketTimer(src,packet_seq,_saved_packets*_diff_route_delays[src]); // packet expire after difference between routes; TODO: check expiration time
// if(!is_in_hash_table(_expiration_time, src))
// _expiration_time.set(src,_saved_packets*_diff_route_delays[src]);
// else
// _expiration_time.set(src,((1000-alpha_ts)*_expiration_time[src] + alpha_ts*_saved_packets*_diff_route_delays[src])/1000);
}
if(_packet_lifetime_ms > 0) {
createPacketTimer(src,packet_seq,Timestamp(0,_packet_lifetime_ms*Timestamp::subsec_per_msec)); // packet expire after _packet_lifetime_ms
}
// instead of having timeouts for out-of-order packets we run the following procedure
bool should_forward = true;
if (_newsrcTable.get(src))
should_forward = false;
// check if some packets should be forwarded, by observing the last sequence seen "last_seq" from all routes.
// if last_seq > cur_seq +1 for all routes, it means we have lost all the packets with sequence from cur_seq +1 up to the minimum of "last_seq" from all routes.
SeqFormRoute *_seqformroutetable = _SeqFormRouteTables.get(src);
assert(_seqformroutetable);// for sure this table was created above (in update route)
// We have to find the minimum last seen sequence, because in case we decide to forward old packets, it means all packets up to this minimum are lost
// Because sequences wrap, we use the minimum of the (absolute) difference between the last seen sequence and the "cur_seq +1"
uint16_t minimum = MAX_SEQ;
uint16_t min_distance = MAX_SEQ;
uint16_t distance;
for (HashTable<FormatedRoute, uint16_t>::iterator iter = _seqformroutetable->begin(); iter.live(); iter++) {
if(!should_forward) {
VERB_DEBUG_CHATTER("[Order %s] New source table is true, do not forward",now.unparse().c_str());
break;
}
VERB_DEBUG_CHATTER("[Order %s] route %s (%d routes), last seen id %u (cur_seq %u)",
now.unparse().c_str(), iter.key().unparse().c_str(), _seqformroutetable->size(), iter.value(), cur_seq);
// Christina: the following should work, unless we miss more than MAX_SEQ/2 packets
if (((cur_seq + 1 > iter.value()) && ( (uint16_t) (cur_seq + 1 - iter.value()) < MAX_SEQ / 2)) || ((cur_seq + 1 < iter.value()) && ((uint16_t) (iter.value() - cur_seq - 1) > MAX_SEQ / 2))) {
should_forward = false;
break;
}
if (cur_seq + 1 > iter.value())
distance = (uint16_t) (MAX_SEQ - (cur_seq + 1) + iter.value());
else
distance = iter.value() - cur_seq - 1;
if (distance < min_distance) {
min_distance = distance;
minimum = iter.value();
}
}
// Attention: should_forward should be true only if some packets are lost
if (should_forward) {
if(_with_nack_freq>0 && _last_seq_checked[src]!=0)
cur_seq=_last_seq_checked[src];
cur_seq_int = (int) cur_seq;
int minimum_int = (int) minimum;
diff = cur_seq_int - minimum_int;
// check that cur_seq smaller than minimum
if( (diff >= 0 && diff <= ((int) MAX_SEQ/2)) || diff <= -1*((int) MAX_SEQ/2) ) {
VERB_DEBUG_CHATTER("[Order %s] Packets already checked (cur_seq %u, minimum %u)", now.unparse().c_str(), cur_seq, minimum);
}
else {
uint16_t new_minimum = minimum - 1;
if (cur_seq > minimum)
// it means we should wrap cur_seq soon
new_minimum = MAX_SEQ-1;
//DEBUG_CHATTER("[Order] cur_seq %u, minimum %u",cur_seq, new_minimum);
if(_with_nack_freq>0) {
if(_verb_debug || _debug_nack)
click_chatter("[Order %s] Will check old packets, up to seq %u, starting at seq %u (last checked %u)",
now.unparse().c_str(), minimum, cur_seq, _last_seq_checked[src]);
}
else
VERB_DEBUG_CHATTER("[Order %s] Will forward old packets, up to seq %u, starting at seq %u", now.unparse().c_str(), minimum, cur_seq);
if(_with_nack_freq > 0)
_last_seq_checked.set(src, minimum);
int count = 0;
while (cur_seq <= new_minimum) {
count ++;
if (count >= MAX_SEQ/2) {
DEBUG_CHATTER("Infinite loop? count=%d, cur_seq=%d, new_min=%d", count, cur_seq, new_minimum);
break;
}
int diff = ((int) new_minimum) - ((int) cur_seq);
if (diff > MAX_SEQ/2 && (new_minimum > MAX_SEQ/2)) {
VERB_DEBUG_CHATTER("[Order %s] Seq %u and %u very far apart", Timestamp::now().unparse().c_str(), cur_seq, new_minimum);
break;
}
// the following function searches for packets with sequences >= "cur_seq+1"
uint32_t i;
uint32_t seq32 = (uint32_t) cur_seq;
if((i=pushOldPackets(src, cur_seq, _with_nack_freq==0))<=1) {
VERB_DEBUG_CHATTER("[Order %s] Packet seq %u for src %s missed", now.unparse().c_str(), (cur_seq+1)%MAX_SEQ, src.unparse().c_str());
if(_with_nack_freq > 0) {
if(_missed_packets[src][(cur_seq+1)%MAX_SEQ] == Timestamp()) { // if not already in hash table
_missed_packets_int++;
_missed_packets[src].set((cur_seq+1)%MAX_SEQ,Timestamp());
_nack_timer.schedule_now();
VERB_DEBUG_CHATTER("[Order %s] Schedule timer", now.unparse().c_str());
if(_debug_nack && !_verb_debug)
click_chatter("[Order %s] Packet seq %u for src %s missed, schedule timer", now.unparse().c_str(), (cur_seq+1)%MAX_SEQ, src.unparse().c_str());
}
}
else
_missed_packets_int++;
}
uint16_t cur_seq_new = (uint16_t) ((seq32 + i - 1)%MAX_SEQ);
// move the index for sequences, have to search for new packet now
cur_seq = (cur_seq_new + 1)%MAX_SEQ;
if ((new_minimum == MAX_SEQ-1) && (cur_seq < minimum) && (minimum != 0) && minimum!=MAX_SEQ-1)
// we have wrapped the sequence "cur_seq", wrap also the minimum if it was changed before entering the loop
new_minimum = minimum - 1;
}
}
}
}
// check every _route_lifetime_ms if all routes have received at least one packet. If not, delete the route when the timer expires.
if(_route_lifetime_ms > 0) {
if (_timersTable.get(src)) {
if (!_timersTable.get(src)->scheduled())
updateTimer(src);
}
else
createTimer(src);
}
}
////////////////////////// PRIVATE FUNCTIONS ////////////////////////////////////
void Order::reset() {
DEBUG_CHATTER("[Order %s] Resetting Order", Timestamp::now().unparse().c_str());
_seqTable.clear();
_seqTableTimer.clear();
for (HashTable<IPAddress, PacketTable*>::iterator iter = _pktTables.begin(); iter.live(); iter++) {
PacketTable* _ptable = _pktTables.get(iter.key());
if(!_ptable)
continue;
for(HashTable<uint16_t, Packet*>::iterator it = _ptable->begin();it.live();it++)
it.value()->kill();
}
_pktTables.clear();
_SeqFormRouteTables.clear();
_SeqFormRouteTablesTemp.clear();
_timersTable.clear();
_newsrcTable.clear();
_froute2ts.clear();
// for (HashTable<IPAddress, HashTable<FormatedRoute, Timestamp> >::iterator iter = _froute2delta.begin(); iter.live(); iter++) {
// iter.value().clear();
// }
// _froute2delta.clear();
// for (HashTable<IPAddress, HashTable<FormatedRoute, uint32_t> >::iterator iter = _froute2rate.begin(); iter.live(); iter++) {
// iter.value().clear();
// }
// _froute2rate.clear();
// _waiting_time.clear();
// _expiration_time.clear();
_src2ts.clear();
_packet_1period_ago.clear();
_packet_2period_ago.clear();
_nack_timer.unschedule();
_missed_packets.clear();
_missed_packets_int=0;
}
void Order::run_timer(Timer *t) {
VERB_DEBUG_CHATTER("[Order %s] Run timer %p", Timestamp::now().unparse().c_str(), t);
if(t == &_active_timer) {
reset();
}
if(t == &_expire_timer_s) {
if(_active && _seqTable.size() > 0) {
if(_dropped > 0) {
DEBUG_CHATTER("[Order %s] Has dropped %d packets during the last %d ms", Timestamp::now().unparse().c_str(), _dropped, mymin(_print_stats_freq,1000*_source_lifetime_s));
_dropped = 0;
}
else
VERB_DEBUG_CHATTER("[Order %s] No dropped packet during the last %d ms", Timestamp::now().unparse().c_str(), mymin(_print_stats_freq,1000*_source_lifetime_s));
// this timer is used to remove sources that are inactive more than 5 seconds
VERB_DEBUG_CHATTER("[Order] timer expired");
for (HashTable<IPAddress, uint16_t>::iterator iter = _seqTableTimer.begin(); iter.live(); iter++) {
PacketTable* _ptable = _pktTables.get(iter.key());
if(!_ptable)
continue;
// String s;
int nb_routes = 0;
SeqFormRoute *routes = _SeqFormRouteTables[iter.key()];
if(routes)
nb_routes = routes->size();
// IPAddress src = iter.key();
//
// if(is_in_hash_table(_froute2delta,iter.key())) {
// nb_routes = _froute2delta[iter.key()].size();
// if (nb_routes == 2) {
// int i=0;
// uint32_t rate_max;
// uint32_t total_rate = 0;
// for (HashTable<FormatedRoute, Timestamp>::iterator it = _froute2delta[src].begin(); it.live(); it++) {
// if(i==0) {
// // keep timestamp of route 1
// _diff_route_delays.set(src,it.value());
// _route_max.set(src,it.key());
// rate_max = _froute2rate[iter.key()][it.key()];
// total_rate+=rate_max;
// }
// else {
// _diff_route_delays.set(src,_diff_route_delays[src]-it.value());
// _route_min.set(src,it.key());
// total_rate += _froute2rate[iter.key()][it.key()];
// if(_diff_route_delays[src] < 0) {
// // delay of route 1 smaller than delay of route 2; switch max and min
// _diff_route_delays[src] = -_diff_route_delays[src];
// _route_min[src] = _route_max[src];
// _route_max[src] = it.key();
// rate_max = _froute2rate[iter.key()][it.key()];
// }
// }
// i++;
// }
// _route_max_proportion.set(src,(1000*rate_max)/total_rate);
// s="Delay difference between "+_route_min[src].unparse()+" and "+_route_max[src].unparse()+" is "+
// _diff_route_delays[src].unparse()+" seconds. Rate proportion "+String(_route_max_proportion[src])+"/1000.\n";
// if(_delayed_reordering) {
// s+="\tPackets currently saved: "+String(_saved_packets)+"; sent "+String(_packets_sent_delayed)+" delayed packets ("+
// String(_expired_not_sent)+" expired before). Moving avg expiration time "+_expiration_time[src].unparse()+" seconds.";
// _packets_sent_delayed = 0;
// _expired_not_sent = 0;
// }
// }
// }
if(_debug) {
click_chatter("[Order %s] Size of _pktTable for src %s with %d routes: %u (%d/%d packets missed, %d caught, size of missed packet table %d, dropped %d). %d timeouts, %d nacks. Current seq is %d",
Timestamp::now().unparse().c_str(), iter.key().unparse().c_str(), nb_routes, _ptable->size(), _missed_packets_int, _received_packets,
_caught_missed_packet, _missed_packets[iter.key()].size(), _dropped, _sent_timeout, _nacks_sent, _seqTable[iter.key()]);
for(SeqFormRoute::iterator it_route=routes->begin();it_route.live();it_route++) {
click_chatter("\t Last seen id on route %s is %u", it_route.key().unparse().c_str(), it_route.value());
}
// if(s.length() > 0)
// click_chatter("\t%s", s.c_str());
}
_missed_packets_int = 0;
_caught_missed_packet = 0;
_received_packets = 0;
_sent_timeout = 0;
_nacks_sent = 0;
if (getCurrentSeqNum(iter.key()) == iter.value() &&
_src2ts.get(iter.key()) != Timestamp() && (Timestamp::now()-_src2ts[iter.key()]).sec() >= _source_lifetime_s) {
if(_debug)
click_chatter("[Order %s] Removing src %s, since it is inactive for %u sec", Timestamp::now().unparse().c_str(), iter.key().unparse().c_str(), _source_lifetime_s);
for(HashTable<uint16_t, Packet*>::iterator it = _ptable->begin();it.live();it++)
it.value()->kill();
_ptable->clear();
_pktTables.erase(iter.key());
_seqTable.erase(iter.key());
_SeqFormRouteTables.erase(iter.key());
SeqFormRoute *_seqformroutetabletemp = _SeqFormRouteTablesTemp.get(iter.key());
if (_seqformroutetabletemp)
_seqformroutetabletemp->clear();
_SeqFormRouteTablesTemp.erase(iter.key());
_timersTable.erase(iter.key());
_newsrcTable.erase(iter.key());
VERB_DEBUG_CHATTER("[Order] Src removed");
}
}
_seqTableTimer = HashTable<IPAddress,uint16_t>(_seqTable);
}
if(_print_stats_freq > 0)
_expire_timer_s.schedule_after_msec(mymin(_print_stats_freq,1000*_source_lifetime_s));
else
_expire_timer_s.schedule_after_sec(_source_lifetime_s);
}
else if(t==&_packet_timeout_timer) {
_active_timer.schedule_after_sec(30);
if(_timeout_packets) {
Timestamp begin;
if(_debug) begin.assign_now();
uint32_t timeout_sent = 0;
for (HashTable<IPAddress, uint16_t>::iterator it = _seqTableTimer.begin(); it.live(); it++) {
if(_packet_1period_ago.get(it.key()) > 0 && _packet_2period_ago.get(it.key()) > 0) {
PacketTable* _ptable = _pktTables.get(it.key());
if(_packet_1period_ago[it.key()] > _packet_2period_ago[it.key()]) { // normal case
for (uint16_t i=_packet_2period_ago[it.key()]; i < _packet_1period_ago[it.key()]; i++) {
Packet *packet = _ptable->get(i);
if(packet) {
_sent_explore_timeout++;
output(0).push(delete_routing_header(packet));
_ptable->erase(i);
timeout_sent++;
}
}
}
else if (_packet_1period_ago[it.key()] < _packet_2period_ago[it.key()]) { // sequence numbers might have wrapped
if(dist_mod_maxseq(_packet_1period_ago[it.key()],_packet_2period_ago[it.key()]) >= MAX_SEQ/2) { // check that they are indeed far away
for (uint16_t i=_packet_2period_ago[it.key()]; i < MAX_SEQ; i++) {
Packet *packet = _ptable->get(i);
if(packet) {
_sent_explore_timeout++;
output(0).push(delete_routing_header(packet));
_ptable->erase(i);
timeout_sent++;
}
}
for (uint16_t i=0; i <= _packet_1period_ago[it.key()]; i++) {
Packet *packet = _ptable->get(i);
if(packet) {
_sent_explore_timeout++;
output(0).push(delete_routing_header(packet));
_ptable->erase(i);
timeout_sent++;
}
}
}
}
}
if(_packet_1period_ago.get(it.key()) > 0)
_packet_2period_ago.set(it.key(),_packet_1period_ago[it.key()]);
_packet_1period_ago.set(it.key(),_current_arrived_seq);
}
if(timeout_sent > 0 && _debug) {
Timestamp now = Timestamp::now();
click_chatter("[Order %s] Have sent %d timeout packets (took %s sec).", now.unparse().c_str(), timeout_sent, (now-begin).unparse().c_str());
}
}
}
else if (t==&_nack_timer) {
if(noutputs() > 1) {
for (HashTable<IPAddress, uint16_t>::iterator iter = _seqTableTimer.begin(); iter.live(); iter++) {
int nack_sent = 0;
HashTable<uint16_t,Timestamp> not_sent = _missed_packets[iter.key()];
if(_verb_debug || _debug_nack)
click_chatter("[Order %s] Has %d nacks to send", Timestamp::now().unparse().c_str(), not_sent.size());
while(!not_sent.empty() || nack_sent == 0) {
nack_sent++;
HashTable<uint16_t,Timestamp> remaining_nacks;
int nb_nacks = mymin((int) not_sent.size(), (int) (1400/sizeof(uint16_t)));
WritablePacket *nack_packet = Packet::make(100,0,sizeof(acne_header) + sizeof(nack_order) + nb_nacks*sizeof(uint16_t), 0);
memset(nack_packet->data(), 0, nack_packet->length());
acne_header *acne_hdr = (acne_header *) nack_packet->data();
nack_order *nack = (nack_order *) (nack_packet->data()+sizeof(acne_header));
// set NACKS
nack->flow_src_addr = iter.key();
nack->flow_dst_addr = _rtl->my_ip();
int count = 0;
Timestamp now = Timestamp::now();
for(HashTable<uint16_t, Timestamp>::iterator it=not_sent.begin();it.live();it++) {
if(it.value() != Timestamp() && (now-it.value()).msecval()<_with_nack_freq)
continue;
if(count < nb_nacks) {
nack->nacks[count] = it.key();
count++;
_missed_packets[iter.key()].set(it.key(), now);
if(_verb_debug || _debug_nack)
click_chatter("[Order %s] Add seq %u for source %s to nacks (last sent was %s, now %s)", now.unparse().c_str(),
it.key(), iter.key().unparse().c_str(), it.value().unparse().c_str(), _missed_packets[iter.key()][it.key()].unparse().c_str());
}
else {
if(_verb_debug || _debug_nack)
click_chatter("[Order] No more space: adding seq %u to remaining nacks", it.key());
remaining_nacks.set(it.key(),it.value());
}
}
nb_nacks = count;
nack->nb_nacks = nb_nacks;
not_sent = remaining_nacks;
nack->last_seq = getCurrentSeqNum(iter.key());
// set acne_header
acne_hdr->_type = NACK_ORDER;
acne_hdr->_hop = 0;
acne_hdr->_seq = 0;
// send on one route (id to inform source)
int count_routes = 0;
for (HashTable<FormatedRoute, uint16_t>::iterator iter2 = _SeqFormRouteTables.get(iter.key())->begin(); iter2.live(); iter2++) {
if(count_routes > 0)
break;
WritablePacket *p = nack_packet->clone()->uniqueify();
acne_header *acne_hdr_p = (acne_header *) p->data();
if(_reversed_routes[iter2.key()] == FormatedRoute()) {
Route route = _rtl->get_route_from_froute(iter2.key()).reverse_route();
_reversed_routes[iter2.key()] = FormatedRoute(route);
}
acne_hdr_p->_route = _reversed_routes[iter2.key()];
if(_verb_debug || _debug_nack)
click_chatter("[Order %s] Sending NACK for %d missed packets (last seq %u) on route %s", Timestamp::now().unparse().c_str(),
nack->nb_nacks, nack->last_seq, acne_hdr_p->_route.unparse().c_str());
_nacks_sent+=nb_nacks;
output(1).push(p);
count_routes++;
}
nack_packet->kill();
}
}
if(_with_nack_freq > 0)
_nack_timer.schedule_after_msec(_with_nack_freq);
}
}
}
// remove all routes
void Order::remove_routes(IPAddress src) {
PacketTable* _ptable = _pktTables.get(src);
if(_ptable) {
// kill packets
// int nb_packets = 0;
// for(HashTable<uint16_t, Packet*>::iterator it = _ptable->begin();it.live();it++) {
// if(it.value()) {
// it.value()->kill();
// nb_packets++;
// }
// }
// send packets in Order (with missing packets)
int count = 0;
Timestamp before = Timestamp::now();
uint16_t cur_seq = getCurrentSeqNum(src);
int nb_packets = _ptable->size();
while (_ptable->size() > 0) {
count ++;
if (count >= MAX_SEQ) {
DEBUG_CHATTER("Infinite loop in remove_routes? count=%d, cur_seq=%d", count, cur_seq);
break;
}
// the following function searches for packets with sequences >= "cur_seq+1"
if(pushOldPackets(src, cur_seq, true)<=1) {
VERB_DEBUG_CHATTER("[Order %s] Packet seq %u for src %s missed", Timestamp::now().unparse().c_str(), cur_seq+1, src.unparse().c_str());
}
uint16_t cur_seq_new = getCurrentSeqNum(src);
// move the index for sequences, have to search for new packet now
cur_seq = (cur_seq_new + 1)%MAX_SEQ;
}
Timestamp now = Timestamp::now();
DEBUG_CHATTER("[Order %s] Remove routes: %d packets sent in %s sec", now.unparse().c_str(), nb_packets, (now-before).unparse().c_str());
_ptable->clear();
}
_pktTables.erase(src);
_seqTable.erase(src);
_SeqFormRouteTables.erase(src);
SeqFormRoute *_seqformroutetabletemp = _SeqFormRouteTablesTemp.get(src);
if (_seqformroutetabletemp)
_seqformroutetabletemp->clear();
_SeqFormRouteTablesTemp.erase(src);
_timersTable.erase(src);
_newsrcTable.erase(src);
_seqTableTimer = HashTable<IPAddress,uint16_t>(_seqTable);
_missed_packets[src].clear();
_missed_packets.erase(src);
//_expire_timer_s.reschedule_after_sec(_source_lifetime_s);
}
// forward stored packets arrived earlier starting from sequence number seq+1
// update the seq. number of the last forwarded packet
uint32_t Order::pushOldPackets(IPAddress const& src, uint16_t const seq, bool send) {
uint16_t newseq = seq;
Timestamp now;
if(_debug)
now.assign_now();
PacketTable* _ptable = _pktTables.get(src);
assert(_ptable);
// if(send)
// VERB_DEBUG_CHATTER("[Order] Received seq %u, forward packets", seq);
// else
// VERB_DEBUG_CHATTER("[Order] Received seq %u, check packets", seq);
uint32_t i = 1;
uint32_t seq32 = (uint32_t) seq;
Packet* packet = _ptable->get((uint16_t) ((seq32 + i)%MAX_SEQ));
while (packet) {
if(send) {
VERB_DEBUG_CHATTER("[Order %s] Forwarding an old packet of size %d from ptable %p with seq %u for src %s \n", now.unparse().c_str(),
packet->length(), _ptable, (seq32 + i)%MAX_SEQ, src.unparse().c_str());
_sent_explore++;
output(0).push(delete_routing_header(packet));
_ptable->erase((uint16_t) ((seq32 + i)%MAX_SEQ));
}
i++;
packet = _ptable->get((uint16_t) ((seq32 + i)%MAX_SEQ));
}
if(i>1 && send)
VERB_DEBUG_CHATTER("[Order %s] Pushed %d old packets", now.unparse().c_str(), i-1);
if(send) {
newseq = (uint16_t) ((seq32 + i - 1)%MAX_SEQ);
_seqTable.set(src, newseq);
_last_seq_checked.set(src, newseq);
VERB_DEBUG_CHATTER("[Order] New seq is %u", newseq);
}
_saved_packets = mymax(0,_saved_packets-(((int) i)-1));
return i;
}
Packet *
Order::delete_routing_header(Packet *p_in) {
click_ip *ip;
if(_strip_acne) {
p_in->pull(sizeof(acne_header));
ip=(click_ip *)(p_in->data());
}
else
ip=(click_ip *)(p_in->data()+sizeof(acne_header));
p_in->set_ip_header(ip, sizeof(click_ip));
return p_in;
}
void
Order::update_route(FormatedRoute froute, IPAddress const& src, uint16_t seq, Packet *p) {
//VERB_DEBUG_CHATTER("[Order] Updating route %s for %s, id %d", froute.unparse().c_str(), src.unparse().c_str(), seq);
SeqFormRoute *_seqformroutetable = _SeqFormRouteTables.get(src);
acne_header *hdr = (acne_header *)(p->data());
if (!_seqformroutetable) {
DEBUG_CHATTER("[Order %s] Adding new src %s, route %s with packet of type %d, id %d (seq %u)", Timestamp::now().unparse().c_str(), src.unparse().c_str(),
froute.unparse().c_str(), hdr->_type, hdr->_id_rate_mab, seq);
_seqformroutetable = new SeqFormRoute();
_SeqFormRouteTables.set(src, _seqformroutetable);
_seqformroutetable->set(froute, seq);
if(!_nack_timer.scheduled() && _with_nack_freq > 0)
_nack_timer.schedule_after_msec(_with_nack_freq);
}
else {
if(_seqformroutetable->set(froute, seq)) {
DEBUG_CHATTER("[Order %s] New route %s, source %s with packet of type %d, id %d (seq %u)", Timestamp::now().unparse().c_str(), froute.unparse().c_str(), src.unparse().c_str(),
hdr->_type, hdr->_id_rate_mab, seq);
}
}
// remove old routes if they exceed the number of max. routes
if (_seqformroutetable->size() > _max_num_routes) {
uint16_t cur_seq = getCurrentSeqNum(src, seq);
uint16_t minimum = MAX_SEQ;
uint16_t min_distance = MAX_SEQ;
uint16_t distance;
FormatedRoute toremove;
DEBUG_CHATTER("[Order] Source %s has %u routes, should remove 1 route (current seq %u)...", src.unparse().c_str(), _seqformroutetable->size(), cur_seq);
for (HashTable<FormatedRoute, uint16_t>::iterator iter = _seqformroutetable->begin(); iter.live(); iter++) {
// Christina: the following should work, unless we miss more than MAX_SEQ/2 packets, noticed it can happen if we use infinite source (acne-offline script) with non-blocking queues
if (cur_seq > iter.value())
distance = (uint16_t) (MAX_SEQ - cur_seq + iter.value());
else
distance = iter.value() - cur_seq;
DEBUG_CHATTER("[Order] route %s last seen id %u, distance %u", iter.key().unparse().c_str(), iter.value(), distance);
if(distance > MAX_SEQ/2) {
// very far seq number, remove this route
minimum = iter.value();
toremove = iter.key();
break;
}
if (distance < min_distance) {
min_distance = distance;
minimum = iter.value();
toremove = iter.key();
}
}
_seqformroutetable->erase(toremove);
DEBUG_CHATTER("[Order] Removing route %s", toremove.unparse().c_str());
SeqFormRoute *_seqformroutetabletemp = _SeqFormRouteTablesTemp.get(src);
if (_seqformroutetabletemp)
_seqformroutetabletemp->erase(toremove);
}
//VERB_DEBUG_CHATTER("[Order] Route updated.");
}
uint16_t Order::getCurrentSeqNum(IPAddress const& src, uint16_t seq) {
if (!_seqTable.get_pointer(src)) {
VERB_DEBUG_CHATTER("[Order %s] New source table", Timestamp::now().unparse().c_str());
_seqTable.set(src, (uint16_t) (seq - 1));
PacketTable *ptable = new PacketTable();
_pktTables.set(src, ptable);
_newsrcTable.set(src, true);
if(_route_lifetime_ms > 0)
_packet_timeout_timer.schedule_after_msec(_route_lifetime_ms);
}
return _seqTable.get(src);
}
/////////////////////// TIMEOUT staff /////////////////////
// handle expiry for timers of the data packets
void Order::handleExpiry(Timer*, void * data) {
TimerData *tdata = (TimerData*) data;
assert(tdata);
tdata->order->expire(*tdata->src, tdata);
}
void Order::expire(const IPAddress & src, TimerData * tdata) {
VERB_DEBUG_CHATTER("[Order] Timer expired for src %s", src.unparse().c_str());
Timestamp now = Timestamp::now();
_newsrcTable.set(src, false);
SeqFormRoute *_seqformroutetable = _SeqFormRouteTables.get(src);
if (_seqformroutetable) {
SeqFormRoute *_seqformroutetabletemp = _SeqFormRouteTablesTemp.get(src);
if (_seqformroutetabletemp) {
for (HashTable<FormatedRoute, uint16_t>::iterator iter = _seqformroutetabletemp->begin(); iter.live(); iter++) {
//DEBUG_CHATTER("[Order] %s %u",iter.key().unparse().c_str(),iter.value());
if (_seqformroutetable->get(iter.key())) {
if (_route_lifetime_ms > 0 && _seqformroutetable->get(iter.key()) == iter.value() &&
_froute2ts.get(iter.key()) != Timestamp() && (now-_froute2ts[iter.key()]).msecval() >= _route_lifetime_ms) {
_seqformroutetable->erase(iter.key());
DEBUG_CHATTER("[Order %s] Removing formatted route %s, since it is inactive for %d msec", now.unparse().c_str(), iter.key().unparse().c_str(), _route_lifetime_ms);
}
}
}
*_seqformroutetabletemp = HashTable<FormatedRoute,uint16_t>(*_seqformroutetable);
_SeqFormRouteTablesTemp.set(src, _seqformroutetabletemp);
}
else {
SeqFormRoute *_seqformroutetabletemp = new SeqFormRoute(*_seqformroutetable);
_SeqFormRouteTablesTemp.set(src, _seqformroutetabletemp);
DEBUG_CHATTER("[Order %s] creating table", now.unparse().c_str());
}
}
}
void Order::createTimer(IPAddress const& src) {
// create a timer for the source and use it to have timeout on the routes
DEBUG_CHATTER("[Order] Creating timer for src %s",src.unparse().c_str());
TimerData* timerdata = new TimerData();
timerdata->order = this;
timerdata->src = new IPAddress(src);
Timer* timer = new Timer(&Order::handleExpiry, timerdata);
timer->initialize(this);
timer->schedule_after_msec(100);
_timersTable.set(src, timer);
}
void Order::updateTimer(IPAddress const& src) {
// set timer for specific src
Timer *t = _timersTable.get(src);
if(_route_lifetime_ms > 0)
t->schedule_after_msec(_route_lifetime_ms);
}
void Order::handleExpiryPacket(Timer*, void * data) {
PacketTimerData *tdata = (PacketTimerData*) data;
assert(tdata);
tdata->elmt->expire_packet(tdata);
}
void Order::expire_packet(PacketTimerData *tdata) {
// push expired packet
IPAddress src = *tdata->src;
VERB_DEBUG_CHATTER("[Order %s] Packet timer expired for packet %d", Timestamp::now().unparse().c_str(), tdata->seq);
PacketTable *_ptable = _pktTables[src];
if(!_ptable)
return;
Packet* packet = _ptable->get(tdata->seq);
if(packet) {
// packet has not been sent: push all packets up to this sequence number
VERB_DEBUG_CHATTER("[Order %s] Timeout: will forward old packets, up to seq %u", Timestamp::now().unparse().c_str(), tdata->seq);
uint16_t new_seq = tdata->seq - 1;
uint16_t cur_seq = getCurrentSeqNum(src);
if (cur_seq > tdata->seq)
// it means we should wrap cur_seq soon
new_seq = MAX_SEQ-1;
int count = 0;
while (cur_seq <= new_seq) {
count ++;
if (count >= MAX_SEQ/2) {
DEBUG_CHATTER("Infinite loop? count=%d, cur_seq=%d, new_seq=%d", count, cur_seq, new_seq);
break;
}
int diff = ((int) new_seq) - ((int) cur_seq);
if (diff > MAX_SEQ/2 && (new_seq > MAX_SEQ/2)) {
VERB_DEBUG_CHATTER("[Order %s] Seq %u and %u very far apart", Timestamp::now().unparse().c_str(), cur_seq, new_seq);
break;
}
// the following function searches for packets with sequences >= "cur_seq+1"
uint32_t i=pushOldPackets(src, cur_seq, true);
_sent_timeout+=(i-1);
uint16_t cur_seq_new = getCurrentSeqNum(src);
VERB_DEBUG_CHATTER("[Order %s] Packet seq %u for src %s missed: erase it from missed",
Timestamp::now().unparse().c_str(), (cur_seq_new+1)%MAX_SEQ, src.unparse().c_str());
_missed_packets[src].erase((cur_seq_new+1)%MAX_SEQ);
// move the index for sequences, have to search for new packet now
cur_seq = (cur_seq_new + 1)%MAX_SEQ;
if ((new_seq == MAX_SEQ-1) && (cur_seq < tdata->seq) && (tdata->seq != 0))
// we have wrapped the sequence "cur_seq", wrap also the minimum if it was changed before entering the loop
new_seq = tdata->seq - 1;
}
}
else
_expired_not_sent++;
// Packet* packet = _ptable->get(tdata->seq);
// if(packet) {
// VERB_DEBUG_CHATTER("[Order] Pushing packet %d", tdata->seq);
// _packets_sent_delayed++;
// output(0).push(packet);
// _ptable->erase(tdata->seq);
// }
// else
// _expired_not_sent++;
}
void Order::createPacketTimer(IPAddress const& src, uint16_t seq, Timestamp expire_time) {
// create a timer for the source and use it to have timeout on the packets
PacketTimerData* timerdata = new PacketTimerData();
timerdata->elmt = this;
timerdata->src = new IPAddress(src);
timerdata->seq = seq;
Timer* timer = new Timer(&Order::handleExpiryPacket, timerdata);
timer->initialize(this);
//Timestamp expire_time = (_route_max_proportion[src]*_diff_route_delays[src])/1000;
timer->schedule_after(expire_time);
VERB_DEBUG_CHATTER("[Order %s] Creating timer for packer from %s with seq %u. Expires in %s", Timestamp::now().unparse().c_str(),
src.unparse().c_str(), seq, expire_time.unparse().c_str());
}
int
Order::active_handler(const String &s, Element *e, void *,
ErrorHandler *errh) {
Order *elmt = (Order *)e;
int active;
if(!cp_integer(s, &active))
return errh->error("Active must be 0 or 1");
if (!(active == 0 || active == 1))
return errh->error("Active must be 0 or 1");
elmt->set_active(active==1);
return 0;
}
int
Order::reset_handler(const String &s, Element *e, void *,
ErrorHandler *errh) {
Order *elmt = (Order *)e;
elmt->reset();
return 0;
}
int
Order::bool_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
Order *elmt = (Order *)e;
int b;
if(!cp_integer(s, &b))
return errh->error("Debug must be 0 or 1");
if (!(b == 0 || b == 1))
return errh->error("Debug must be 0 or 1");
if(((intptr_t) a) == 0)
elmt->set_debug(b==1);
else if(((intptr_t) a) == 1)
elmt->set_verb_debug(b==1);
else if(((intptr_t) a) == 2)
elmt->set_debug_nack(b==1);
else if(((intptr_t) a) == 3)
elmt->set_debug_explore(b==1);
return 0;
}
int
Order::int_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
Order *elmt = (Order *)e;
int arg;
if(!cp_integer(s, &arg))
return errh->error("Arg must be an integer");
if(((intptr_t) a) == 0) {
elmt->set_packet_timeout(arg);
}
else if(((intptr_t) a) == 1) {
elmt->set_route_timeout(arg);
}
else if(((intptr_t) a) == 2) {
elmt->set_nack_freq(arg);
}
return 0;
}
void Order::add_handlers() {
add_write_handler("debug", bool_handler, 0);
add_write_handler("verb_debug", bool_handler, 1);
add_write_handler("debug_nack", bool_handler, 2);
add_write_handler("debug_explore", bool_handler, 3);
add_write_handler("active", active_handler, 0);
add_write_handler("reset", reset_handler, 0);
add_write_handler("packet_timeout", int_handler, 0);
add_write_handler("route_timeout", int_handler, 1);
add_write_handler("nack_freq", int_handler, 2);
}
CLICK_ENDDECLS
EXPORT_ELEMENT(Order)
ELEMENT_PROVIDES(Order)

Event Timeline