Page MenuHomec4science

countbytes.cc
No OneTemporary

File Metadata

Created
Sat, Jul 5, 12:35

countbytes.cc

#include <click/config.h>
#include "util.hh"
#include "countbytes.hh"
#define DEBUG_CHATTER(arg, ...) do { if (_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
CLICK_DECLS
CountBytes::CountBytes() : _timer(this)
{
}
CountBytes::~CountBytes()
{
}
int CountBytes::configure(Vector<String> &conf, ErrorHandler *errh){
_active = true;
_debug = false;
double MB_objective = 0;
_freq_print = 0;
String rtp_name;
_send_stop_ack = false;
if(Args(this, errh).bind(conf)
.read("ACTIVE", _active)
.read("DEBUG", _debug)
.read_m("IP", _my_ip)
.read("MBYTES_OBJECTIVE", MB_objective)
.read("FREQ_PRINT", _freq_print)
.read("SEND_STOP_ACK", _send_stop_ack)
.read("ROUTING_PATHS", rtp_name)
.complete() < 0)
return -1;
if(MB_objective > 0)
_bytes_objective = (uint64_t) (MB_objective*1024.0*1024.0);
else
_bytes_objective = 0;
_rtp = (RoutingPaths*)router()->find(rtp_name, errh);
if(_send_stop_ack && !_rtp){
click_chatter("[CountBytes] Error: invalid parameter name for element RoutingPaths (must be set to send final ack).\n");
return -1;
}
return 0;
}
int
CountBytes::initialize(ErrorHandler *errh) {
_timer.initialize(this);
if(_freq_print > 0)
_timer.schedule_after_sec(_freq_print);
return 0;
}
void
CountBytes::push(int, Packet *p_in) {
count_packet(p_in);
output(0).push(p_in);
}
Packet *
CountBytes::pull(int) {
Packet *p = input(0).pull();
if(p != 0)
count_packet(p);
return p;
}
void
CountBytes::count_packet(Packet *p_in) {
if(_bytes_objective == 0)
return;
const click_ip *iph = p_in->ip_header();
if(iph == 0)
return;
IPAddress ip_src = IPAddress(iph->ip_src.s_addr);
IPAddress ip_dst = IPAddress(iph->ip_dst.s_addr);
if(ip_dst!=_my_ip)
return;
_last_active[ip_src].assign_now();
if(_first_packet[ip_src] == Timestamp())
_first_packet[ip_src] = _last_active[ip_src];
_bytes_flow[ip_src]+=p_in->length();
if(_bytes_flow[ip_src] > _bytes_objective) {
Timestamp delay = _last_active[ip_src] - _first_packet[ip_src];
click_chatter("[CountBytes %s] Flow from source %s reached its objective of %llu in %s seconds", _last_active[ip_src].unparse().c_str(),
ip_src.unparse().c_str(), _bytes_objective, delay.unparse().c_str());
reset_flow(ip_src);
_time_table.set(ip_src, delay);
if(_send_stop_ack && noutputs() == 2) {
Packet* out = Packet::make(sizeof(acne_header)+sizeof(acne_ack));
acne_header* out_h = (acne_header*)out->data();
Route route = _rtp->get_best_path_ack(ip_src);
out_h->_route = FormatedRoute(route);
out_h->_type = ACK_RTE_PRICES;
out_h->_hop = 0;
acne_ack* out_payload = (acne_ack*)(out->data()+sizeof(acne_header));
out_payload->_stop_flow = true;
out_payload->_flow = FlowTuple(ip_src,ip_dst);
if(_debug)
click_chatter("[CountBytes] Sending ack to stop traffic on route %s", out_h->_route.unparse().c_str());
output(1).push(out);
}
}
}
void CountBytes::run_timer(Timer *timer) {
if(timer == &_timer) {
Timestamp now = Timestamp::now();
for(HashTable<IPAddress,uint64_t>::iterator it = _bytes_flow.begin();it.live();it++) {
click_chatter("[CountBytes %s] For flow from source %s, has received %d Mbytes in %s seconds.", now.unparse().c_str(),
it.key().unparse().c_str(), (int) (it.value()/(1024*1024)), (now - _first_packet[it.key()]).unparse().c_str());
if((now - _last_active[it.key()]).sec() >= 30) {
reset_flow(it.key());
}
}
_timer.schedule_after_sec(_freq_print);
}
}
void CountBytes::reset() {
if(_debug)
click_chatter("[CountBytes %s] Reset all flows.", Timestamp::now().unparse().c_str());
_bytes_flow.clear();
_first_packet.clear();
_last_active.clear();
_time_table.clear();
}
void CountBytes::reset_flow(IPAddress ip) {
if(_debug)
click_chatter("[CountBytes %s] Reset for flow from source %s.", Timestamp::now().unparse().c_str(), ip.unparse().c_str());
_bytes_flow.erase(ip);
_first_packet.erase(ip);
_last_active.erase(ip);
_time_table.erase(ip);
}
int
CountBytes::bool_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
CountBytes *elmt = (CountBytes *)e;
int arg;
if(!cp_integer(s, &arg))
return errh->error("Arg must be 0 or 1");
if (!(arg == 0 || arg == 1))
return errh->error("Arg must be 0 or 1");
if(((intptr_t) a) == 0)
elmt->set_active(arg==1);
else if(((intptr_t) a) == 1 && arg == 1)
elmt->reset();
if(((intptr_t) a) == 2)
elmt->set_send_stop_ack(arg==1);
return 0;
}
int
CountBytes::integer_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
CountBytes *elmt = (CountBytes *)e;
int arg;
if(!cp_integer(s, &arg))
return errh->error("Arg must be an integer");
if(((intptr_t) a) == 0)
elmt->set_freq_print(arg);
return 0;
}
int
CountBytes::reset_flow_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
CountBytes *elmt = (CountBytes *)e;
IPAddress ip;
if(!cp_ip_address(s, &ip))
return errh->error("Arg must be an IP address");
if(((intptr_t) a) == 0)
elmt->reset_flow(ip);
return 0;
}
int
CountBytes::objective_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
CountBytes *elmt = (CountBytes *)e;
double arg;
if(!cp_double(s, &arg))
return errh->error("Arg must be double");
if(((intptr_t) a) == 0)
elmt->set_objective((uint64_t) (arg*1024.0*1024.0));
return 0;
}
static String
get_time_handler(Element *e, void *param) {
CountBytes *elmt = (CountBytes *)e;
StringAccum s;
HashTable<IPAddress, Timestamp> table = elmt->get_time_table();
for(HashTable<IPAddress, Timestamp>::const_iterator it = table.begin();it.live();it++) {
s << it.key().unparse() << ":" << it.value().unparse() << " ";
}
return s.take_string();
}
void CountBytes::add_handlers() {
add_write_handler("active", bool_handler, 0);
add_write_handler("reset_all", bool_handler, 1);
add_write_handler("send_stop_ack", bool_handler, 2);
add_write_handler("freq_print", integer_handler,0);
add_write_handler("reset_flow", reset_flow_handler, 0);
add_write_handler("Mbytes_objective", objective_handler, 0);
add_read_handler("get_time", get_time_handler, 0);
}
EXPORT_ELEMENT(CountBytes)
CLICK_ENDDECLS

Event Timeline