Page MenuHomec4science

cc.cc
No OneTemporary

File Metadata

Created
Mon, Jul 7, 04:02
#include <click/config.h>
#include "cc.hh"
CLICK_DECLS
Cc::Cc() : _notifier(ActiveNotifier::SEARCH_CONTINUE_WAKE), _update_timer(this), _rates_timer(this), _active_timer(this), _packet_timer(this) {}
Cc::~Cc(){}
void * Cc::cast(const char *n)
{
if (strcmp(n, ActiveNotifier::EMPTY_NOTIFIER) == 0){
return &_notifier;
}
else
return Element::cast(n);
}
int Cc::configure(Vector<String> &conf, ErrorHandler *errh){
click_chatter("[Cc] enters configure.\n");
_notifier.initialize(ActiveNotifier::EMPTY_NOTIFIER, router());
_debug = false;
_experiment = false;
_update_period_ms = 1000; //1000 msec default
_rates_period_ms = 100; //100 msec default
_nr_routes = 1; //single path by default
int node_index = -1;
_send_wifi_route = false;
_send_plc_route = false;
_init_nr_aggressive_slots = 20;
_max_num_hop_routes = 1;
_delete_route_no_traffic = false;
_burst_size = 1;
_count_packets = 0;
_delta = 0;
_saturate_routes = false;
#ifdef CLICK_USERLEVEL
_alpha_rate = 0.1;
_alpha_rate_aggressive = 0.2;
_alpha_momentum = 0.1;
_beta = 0.1;
_scaling = 1.0;
_alpha_div = 1.0;
#else
_alpha_rate = 100;
_alpha_rate_aggressive = 200;
_alpha_momentum = 100;
#endif
if (Args(conf, this, errh)
.read_m("ROUTING", _routingPaths_name)
.read_m("LEAKYBUCKET_INT1", _lb_int1_name)
.read("LEAKYBUCKET_INT2", _lb_int2_name)
.read("NR_ROUTES", _nr_routes)
.read("ALPHA_RATE", _alpha_rate) //in 1/1000 in kernel mode
.read("ALPHA_MOMENTUM", _alpha_momentum) //in 1/1000 in kernel mode
.read("ALPHA_PRICE_AGGRESSIVE", _alpha_rate_aggressive)
#ifdef CLICK_USERLEVEL
.read("A", _scaling) //step size for derivative of utility (constant a)
.read("BETA", _beta)
#endif
.read("PERIOD", _rates_period_ms) //period at which rates are updated
.read("DEBUG", _debug)
.read("EXPERIMENT", _experiment)
.read("NODE_INDEX", node_index)
.read("DELETE_ROUTE_NO_TRAFFIC", _delete_route_no_traffic)
.read("BURST_SIZE", _burst_size)
#ifdef CLICK_USERLEVEL
.read("DELTA", _delta) // optional, just used for initial rate
#endif
.read("NR_SLOTS_AGGRESSIVE", _init_nr_aggressive_slots)
.complete() < 0)
return -1;
#ifndef CLICK_USERLEVEL
_alpha_s = int_divide( (_alpha_rate * SCALING_FACTOR), (uint32_t)1000);
_s_minus_alpha_s = SCALING_FACTOR - _alpha_s;
_alpha_momentum_s = int_divide( (_alpha_momentum * SCALING_FACTOR), (uint32_t)1000);
_s_minus_alpha_momentum_s = SCALING_FACTOR - _alpha_momentum_s;
#endif
_nr_aggressive_slots = _init_nr_aggressive_slots;
if(node_index != -1){
//we are part of a compound element, we need to adapt the names of the elements...
String index_str = String(node_index);
String tempstr = String("node")+index_str.c_str();
tempstr = tempstr + "/";
_routingPaths_name = tempstr + _routingPaths_name.c_str();
_lb_int1_name = tempstr + _lb_int1_name.c_str();
if (_lb_int2_name.length() > 0)
_lb_int2_name = tempstr + _lb_int2_name.c_str();
}
if(_debug){
click_chatter("[Cc] configure done.\n");
click_chatter("Size of ACNE header = %d", sizeof(empower_header));
}
return 0;
}
int Cc::initialize(ErrorHandler *errh){
click_chatter("[Cc %s] Initializing...\n", Timestamp::now().unparse().c_str());
Element::initialize(errh);
add_read_handler("FIRST_RATE", rate_callback);
add_read_handler("RATES", rates_callback);
//routingPaths
_routingPaths_element = (RoutingPaths*)router()->find(_routingPaths_name, errh);
_mfl_int1 = (MFLeakyBucketSA*)router()->find(_lb_int1_name, errh);
if (_lb_int2_name.length() > 0)
_mfl_int2 = (MFLeakyBucketSA*)router()->find(_lb_int2_name, errh);
else
_mfl_int2 = 0;
if(!_routingPaths_element || !_mfl_int1 || (_lb_int2_name.length() > 0 && !_mfl_int2)){
if(_debug)
click_chatter("[Cc] Error: invalid parameter for Element RoutingPaths or MF-leaky bucket.\n");
return -1;
}
_mfl_int1->set_cc_elmt(this);
if(_mfl_int2)
_mfl_int2->set_cc_elmt(this);
//init timer for table updates
_update_timer.initialize(this);
_rates_timer.initialize(this);
_active_timer.initialize(this);
_packet_timer.initialize(this);
_time=Timestamp(0);
return 0;
}
void Cc::push(int port, Packet* p){
//Receive layer 2 frames
//Input 0: traffic coming from host
if(!_rates_timer.scheduled())
_rates_timer.schedule_after_msec(_rates_period_ms);
_now.assign_now();
if(!_update_timer.scheduled()) {
_ts_packets_sent=_now;
_packets_sent_table.clear();
_update_timer.schedule_after_msec(_update_period_ms);
}
_packet_timer.schedule_after_msec(2*_rates_period_ms);
if (port == 0){
//check valid packet from host
if (!p->has_network_header()){
click_chatter("[Cc] Invalid packet : no dstIP\n");
p->kill();
return;
}
//read routing table
const click_ip* ip4_header = p->ip_header();
const IPAddress ip_src(ip4_header->ip_src.s_addr);
const IPAddress ip_dst(ip4_header->ip_dst.s_addr);
// check if in stop_flow table
if(_stop_flow_table[FlowTuple(ip_src, ip_dst)] > 0) {
p->kill();
return;
}
bool tcp_ack = false;
bool route0 = false;
if(ip4_header->ip_p == IP_PROTO_TCP) {
click_tcp *tcp_hdr = (click_tcp *)(p->tcp_header());
if(tcp_hdr->th_flags & TH_SYN) {
route0 = true;
tcp_ack = true;
}
else if(tcp_hdr->th_flags & TH_ACK) {
// use ClassifyAck
uint16_t a = htons(ip4_header->ip_len);
uint32_t b = tcp_hdr->th_off;
uint32_t c = a - b*4 - 20;
if(c==0) {
route0 = true;
tcp_ack = true;
}
}
}
FormatedRoute froute;
if(_count_packets >= _burst_size || _route_to_send==FormatedRoute()) { // send packets in burst: use last computed route
_count_packets = 0;
//get multi routes (for now : compute N=1 sets of _nr_routes paths).
Vector<Route> routes;
if(_send_wifi_route)
routes = _routingPaths_element->get_multipath_wifi(ip_dst, _nr_routes);
else if(_send_plc_route)
routes = _routingPaths_element->get_multipath_plc(ip_dst, _nr_routes);
else
routes = _routingPaths_element->get_multipath(ip_dst, _nr_routes);
//Test if there is a route:
if(routes.empty() || routes.front().links.empty()){
if (_debug && _drops == 0)
click_chatter("[Cc] Error: no route known for destination with IP %s. Dropping packet (proto %d).",
ip_dst.unparse().c_str(), (int) ip4_header->ip_p);
_drops++;
p->kill();
return;
}
// for TCP syn, acks, always use route 0
if (route0)
routes = Vector<Route>(1,routes[0]);
//0. Multiplex : Chose route(s)
_force_packet = FormatedRoute();
uint32_t ind_route = initMultiplexedRoute(routes);
if(_saturate_routes) // if saturating route, send with equal probability
ind_route = click_random(0,routes.size()-1);
if(route0) {
ind_route=0; //should already be 0
_count_packets = _burst_size;
}
Route route = routes[ind_route];
froute = FormatedRoute(route);
_route_to_send=froute;
}
else {
_count_packets++;
froute = _route_to_send;
}
//Julien: we assume that the routing element returns correctly formated routes:
/*if(route.links.size() > HDR_ROUTE_SIZE){
if(_debug) click_chatter("[Cc] Error: route is too long (size %d). Dropping packet.", route.links.size());
p->kill();
return;
}*/
/*if(_debug) {
now.assign_now();
click_chatter("[Cc %s] Chose route from %s to %s: %s", now.unparse().c_str(),
route.links.front().src.unparse().c_str(),route.links.back().dst.unparse().c_str(), route.unparse().c_str());
}*/
WritablePacket* out;
//1.build header
out = p->push(sizeof(empower_header));
memset(out->data(), 0, sizeof(empower_header));
if (out==NULL){
click_chatter("[Cc] Error : can't add space for empower header.");
p->kill();
return;
}
// if(!is_in_hash_table(_packets_sent_table, froute))
// _packets_sent_table.set(froute,0);
_packets_sent_table[froute]+=out->length();
empower_header* header = (empower_header*)out->data();
header->_route = froute;
header->_route_price = 0;
header->_hop=0;
header->_seq=0; // will be set by MFL
header->_ts.assign_now();
rateTuple_entry *ratetuple = _rateTable.get_pointer(froute);
if(ratetuple != 0 && ratetuple->_incr_timeout) {
ratetuple->_incr_timeout = false;
ratetuple->_timeout+=1;
}
// demand is min(3*real_rate, actual_demand);
// if only real rate, CC rate can go to infinity if link capacity over-estimated; only demand does not work for low throughput flows
if(ratetuple) {
if(_now <= (_first_packet + Timestamp(0,Timestamp::subsec_per_sec/20)) && !tcp_ack ) { // 50 msec
header->_rate_demand = ratetuple->_rate;
}
else {
if(ip4_header->ip_p == IP_PROTO_TCP) {
// rate_demand for normal traffic, real rate for acks
if(tcp_ack) {
// acks
Timestamp ts_diff = (_now-_ts_packets_sent);
uint32_t sent_kbps = 100;
// when first packet received (_now==_ts_packets_sent), set small rate
// TODO: implement rate computation as moving average
if(ts_diff.msecval() > 0)
sent_kbps = (8*_packets_sent_table[froute])/ts_diff.msecval();
header->_rate_demand = sent_kbps;
}
else {
// normal traffic
header->_rate_demand = ratetuple->_rate;
}
}
else {
// rate demand in kb/s, _packets_sent_table in B
Timestamp ts_diff = (_now-_ts_packets_sent);
uint32_t sent_kbps = 100;
// when first packet received (_now==_ts_packets_sent), set small rate
// TODO: implement rate computation as moving average
if(ts_diff.msecval() > 0)
sent_kbps = (8*_packets_sent_table[froute])/ts_diff.msecval();
uint32_t real_rate = 30*(sent_kbps/10);
header->_rate_demand = mymin(ratetuple->_rate, real_rate);
}
}
}
else
if(_debug) click_chatter("[Cc] WARNING: no rate for route %s", froute.unparse().c_str());
if(ip4_header->ip_p == PROTO_IP_TEST)
header->_type=PROTO_IP_TEST;
else
header->_type=REGULAR_TRAFFIC;
output(0).push(out);
if(_force_packet!=FormatedRoute()) { // send a packet on this route (with no seq, just to get price)
if(_debug) click_chatter("[Cc %s] Send a forced packet on route %s", Timestamp::now().unparse().c_str(), _force_packet.unparse().c_str());
WritablePacket *p_forced = Packet::make(sizeof(empower_header));
memset(p_forced->data(),0,p_forced->length());
empower_header *header_forced = (empower_header *) p_forced->data();
header_forced->_type=TRAFFIC_FORCED;
header_forced->_route = _force_packet;
header_forced->_route_price = 0;
header_forced->_hop=0;
header_forced->_seq=0; // will be set by MFL
header_forced->_ts.assign_now();
output(0).push(p_forced);
}
}
//Input 1: control traffic from network (ACKs)
//Assumes Classifier ensures that input is ACK for this interface
else if (port ==1) {
empower_header* ackHeader = (empower_header*)p->data();
empower_ack* ackPayload = (empower_ack*)(p->data() + sizeof(empower_header));
if(ackPayload->_stop_flow) {
click_chatter("[Cc %s] Stop traffic for flow %s", Timestamp::now().unparse().c_str(),
ackPayload->_flow.unparse().c_str());
_stop_flow_table.set(ackPayload->_flow,1);
}
else {
#ifdef CLICK_USERLEVEL
if(_debug)
click_chatter("[Cc-userlevel] Incoming ACK for route %s. It contains price q_r = %f, seq = %d, hop =%d\n",
ackPayload->_target_route.unparse().c_str(), ackPayload->_route_price, ackHeader->_seq, ackHeader->_hop);
#else
if(_debug)
click_chatter("[Cc] Incoming ACK for route %s. It contains price q_r = %d, seq = %d, hop =%d\n",
ackPayload->_target_route.unparse().c_str(), ackPayload->_route_price, ackHeader->_seq, ackHeader->_hop);
#endif
_now.assign_now();
if(!is_in_hash_table(_removed_routes, ackPayload->_target_route) || (_now-_removed_routes[ackPayload->_target_route]).sec() > 100) {
rateTuple_entry* ratetuple = _rateTable.get_pointer(ackPayload->_target_route);
if(ratetuple == 0){
if(_debug)
click_chatter("[Cc] ACK for unknown route: [%s] ; from [%s]. Perhaps the route has been discarded in the meanwhile.\n", ackPayload->_target_route.unparse().c_str(), ackHeader->_route.unparse().c_str());
p->kill();
return;
}
ratetuple->_timeout = 0; //reset timeout
_priceTable.set(ackPayload->_target_route, ackPayload->_route_price); //store price. It will be read by timer to set new rate
_removed_routes.erase(ackPayload->_target_route);
}
}
p->kill();
}
}
void Cc::updateAllRates(){
#ifdef CLICK_USERLEVEL
for(HashTable<FormatedRoute,float>::iterator it = _priceTable.begin(); it; ++it){
#else
for(HashTable<FormatedRoute,uint32_t>::iterator it = _priceTable.begin(); it; ++it){
#endif
uint32_t new_rate = dtMultipathUpdateController(it.key(), it.value()); //get new rate
// set the corresponding rate accordingly (B/s)
#ifdef CLICK_USERLEVEL
if(_debug) click_chatter("[Cc-userlevel] setting new rate %f Mbps", (double)new_rate / (double)1000.0);
#else
if(_debug) click_chatter("[Cc] setting new rate: %d kbps = %d B/s", new_rate, 128*new_rate);
#endif
if(!_mfl_int1->set_rate(it.key(), 128*new_rate))
if (_mfl_int2 != 0)
_mfl_int2->set_rate(it.key(), 128*new_rate);
}
}
/**
* A timer is bound to the Cc element, in charge of maintaining the _rateTable of the Cc element.
* Updates every _period_ms the _rateTable by incrementing the timer fields
* removes the elements whose timer have reached timeout
*/
void Cc::run_timer(Timer *timer){
if(_drops > 1000)
_drops = 0;
if(_debug) click_chatter("[Cc] Run timer...");
// check if routes have become invalid
Vector<Route> invalid_paths = _routingPaths_element->get_and_reset_invalid_routes();
for(int i=0;i<invalid_paths.size();i++) {
Route route = invalid_paths[i];
FormatedRoute froute = FormatedRoute(route);
int ind = find_in_vector(_dst2routes[route.ip], froute);
if (ind != -1)
_dst2routes[route.ip].erase(&_dst2routes[route.ip][ind]);
_rateTable.erase(froute);
_priceTable.erase(froute);
if(!_mfl_int1->remove_route(froute))
if (_mfl_int2 != 0)
_mfl_int2->remove_route(froute);
}
if(timer == &_packet_timer) {
_first_packet = Timestamp();
}
else if(timer == &_rates_timer){ // scheduled in push
updateAllRates();
}
else if(timer == &_update_timer){
_active_timer.schedule_after_sec(30);
Vector<FormatedRoute> toremove = Vector<FormatedRoute>();
for(HashTable<FormatedRoute,rateTuple_entry>::iterator it = _rateTable.begin(); it; ++it){
rateTuple_entry *ratetuple = _rateTable.get_pointer(it.key());
if(_debug) click_chatter("[Cc %s] Checking route %s. Current timeout = %d", Timestamp::now().unparse().c_str(), it.key().unparse().c_str(), it.value()._timeout);
if(!is_in_hash_table(_removed_routes, it.key()) && (it.value()._timeout >= int_divide(ROUTE_TIMEOUT * 1000,_update_period_ms) ||
(_delete_route_no_traffic && it.value()._no_traffic >= int_divide(ROUTE_TIMEOUT * 1000,_update_period_ms)) )){
toremove.push_back(it.key());
if(ratetuple!=0)
ratetuple->_rate = 100;
}
if(ratetuple != 0)
ratetuple->_incr_timeout = true; // increment timeout at next packet received
if(it.value()._rate == 100)
it.value()._no_traffic++;
else
it.value()._no_traffic = 0;
}
if(toremove.size()>0)
_now.assign_now();
for(int i=0; i<toremove.size(); ++i){
click_chatter("[Cc %s] Removing route %s because didn't receive ACK for too long (possibly because there is no traffic)", _now.unparse().c_str(),
toremove[i].unparse().c_str());
//IPAddress dst_ip = _routingPaths_element->get_ip(toremove[i]._dst);
//int ind = find_in_vector(_dst2routes[dst_ip], toremove[i]);
//if (ind != -1)
// _dst2routes[dst_ip].erase(&_dst2routes[dst_ip][ind]);
//_rateTable.erase(toremove[i]);
//_priceTable.erase(toremove[i]);
_removed_routes.set(toremove[i], Timestamp::now());
}
}
else if (timer == &_active_timer) {
if(_debug) click_chatter("[Cc] Resetting element...");
reset_rates();
}
else {
//shouldn't happen
assert(false);
}
}
/**
* Utility function specification
*/
#ifdef CLICK_USERLEVEL
double Cc::derUtilityFunction(double x){
return _scaling/x;
}
double Cc::invDerUtilityFunction(double x){
return _scaling/x;
}
#else
uint64_t Cc::derUtilityFunction(uint32_t x, uint32_t scaling_factor){
//scale the scaling factor:
//multiply by 10*1024*1024 because rates are expressed in kbps, and
//too slow otherwise (working in kbps yield too small increments)
return int_divide( ((uint64_t)scaling_factor * (uint64_t)10485760), x);
}
#endif
/**
* Discrete time, update controller for the multi path case
* note : timeout is set to 0 at ACK receiving event
* @return the new rate at which the route is transmitting packets
*/
#ifdef CLICK_USERLEVEL
uint32_t Cc::dtMultipathUpdateController(FormatedRoute const& route, float received_price){
#else
uint32_t Cc::dtMultipathUpdateController(FormatedRoute const& route, uint32_t received_price){
#endif
if(_debug) click_chatter("[Cc %s] Entering dtMultipathUpdateController.", Timestamp::now().unparse().c_str());
//sum all rates that have the same dst IP
uint32_t sum = 0;
bool found_route = false;
for(HashTable<IPAddress, Vector<FormatedRoute> >::iterator it = _dst2routes.begin(); it; ++it){
if(is_in_vector(it.value(), route)){
//we found the destination IP. compute sum over all routes for this IP:
Vector<FormatedRoute> froutes_for_dst = it.value();
for(int i=0; i<froutes_for_dst.size(); i++){
if(!is_in_hash_table(_removed_routes, froutes_for_dst[i])) {
rateTuple_entry *rt = _rateTable.get_pointer(froutes_for_dst[i]);
if(rt != 0){
sum += rt->_rate;
found_route = true;
}
}
}
break;
}
}
if(!found_route){
click_chatter("[Cc] Error: did not find any route corresponding to the ACK!");
return 0;
}
if(sum == 0){
click_chatter("[Cc] Warning: sum of previous rates is zero. setting to 100.");
sum = 100;
}
_now.assign_now();
rateTuple_entry *rt = _rateTable.get_pointer(route);
if(rt != 0){
//compute the new rate
#ifdef CLICK_USERLEVEL
//float version, works internally using Mbps
/*if(_nr_routes == 1){
double new_rate;
if(received_price < 0.1){
//do not divide by 0, just let 100 Mbps go through
new_rate = 100.0;
} else{
new_rate = invDerUtilityFunction((double)received_price);
if(new_rate < 0.1) new_rate = 0.1;
}
if(_debug) click_chatter("[Cc-userlevel] one route. new_rate = max(100Mbps, 1/q_r) = %f", new_rate);
rt->_rate = (uint32_t)(new_rate * 1000.0);
} else {*/
double current_rate = (double)(rt->_rate)/1000.0;
double current_moment = (double)(rt->_moment)/1000.0;
double sum_d = (double)sum/1000.0;
//"legacy" rate update:
//double new_rate = (1.0 - _alpha_rate)*current_rate + _alpha_rate*(current_moment + derUtilityFunction(sum_d) - (double)received_price);
//if(_debug) click_chatter("[Cc-userlevel] current_rate = %f, current_moment = %f, sum = %f, derUtilityFunction(sum) = %f, new_rate = %f",
// current_rate, current_moment, (double)sum_d, derUtilityFunction(sum_d), new_rate);
// if short paths, alpha should be greater
int alpha_mult = 1;
if (_max_num_hop_routes == 1)
alpha_mult=4;
if (_max_num_hop_routes == 2)
alpha_mult=2;
// if only one route, alpha should be greater
if(_rateTable.size() == 1)
alpha_mult*=2;
double alpha_loc = _alpha_rate;
double beta_loc = _beta;
if(_nr_aggressive_slots > 0){
alpha_loc = _alpha_rate_aggressive;
beta_loc = _alpha_rate_aggressive;
_nr_aggressive_slots --;
}
alpha_loc = alpha_mult*alpha_loc/_alpha_div;
beta_loc = alpha_mult*beta_loc/_alpha_div;
alpha_loc = mymin(alpha_loc,0.5);
beta_loc = mymin(beta_loc,0.5);
//rate update for prop. fairness:
double new_rate = (1.0 - alpha_loc)*current_rate +alpha_loc*current_moment + beta_loc*(_scaling - (double)received_price*sum_d);
if(_debug) click_chatter("[Cc] Update rate for route %s; current %f, alpha %f, moment %f, beta %f, scaling %f, price %f, sum_d %f: new rate %f",
route.unparse().c_str(), current_rate, alpha_mult*_alpha_rate, current_moment, alpha_mult*_beta, _scaling, received_price, sum_d, new_rate);
//rt->_rate = (1.0-_alpha_rate)*rt->_rate + _alpha_rate*(rt->_moment + derUtilityFunction(sum) - received_price);
if(new_rate < 0.1)
new_rate = 0.1; //min 100 kbps
if(new_rate > rt->_rate) {
if(!_is_rate_increasing[route]) {
// we've reached the min
if(!is_in_hash_table(_min_rates, route))
_min_rates.set(route, Vector<uint32_t>());
_min_rates[route].push_back(rt->_rate);
_is_rate_increasing[route] = true;
}
}
else if (new_rate < rt->_rate) {
if(_is_rate_increasing[route]) {
// we've reached the max
if(!is_in_hash_table(_max_rates, route))
_max_rates.set(route, Vector<uint32_t>());
_max_rates[route].push_back(rt->_rate);
_is_rate_increasing[route] = false;
}
}
// if 6 max, 6 mins and amplitude not decreasing (i.e. one in second half greater than 95% of all in first half), decrease alpha
if((_now-_last_alpha_update).sec() >= 25 && _max_rates[route].size()==_min_rates[route].size() && _max_rates[route].size() >= 6 ) {
int size = _max_rates[route].size();
uint32_t max_ampl = 0;
for(int i=size/2;i<size;i++) {
uint32_t ampl = _max_rates[route][i] - _min_rates[route][i];
if(ampl > max_ampl)
max_ampl = ampl;
}
bool reduce_alpha = false;
for(int i=0;i<size/2;i++) {
uint32_t ampl = _max_rates[route][i] - _min_rates[route][i];
if(max_ampl > (95*ampl)/100) {
reduce_alpha = true;
break;
}
}
if(reduce_alpha) {
_last_alpha_update = _now;
_alpha_div*=2;
click_chatter("[Cc %s] Alpha decreased by a factor %f", _now.unparse().c_str(), _alpha_div);
}
}
rt->_rate = (uint32_t)(new_rate * 1000.0);
rt->_moment = (uint32_t)( ( (1-_alpha_momentum)*current_moment + _alpha_momentum*new_rate) * 1000.0);
//}
#else
//integer version
//TODO: see what can be stored as uint64 instead of always converting...
uint64_t temp1 = (uint64_t)_s_minus_alpha_s * (uint64_t)rt->_rate + (uint64_t)_alpha_s * (uint64_t)rt->_moment + derUtilityFunction(sum, _alpha_s);
if(_debug) click_chatter("[Cc] temp1 = %lu (derUtility(sum, _alpha_s) = %lu", temp1, derUtilityFunction(sum, _alpha_s));
uint64_t temp2 = (uint64_t)_alpha_s * (uint64_t)received_price;
if(temp2 > temp1){
rt->_rate = 100; //min 100 kbps
} else {
uint64_t temp_rate = int_divide( (temp1 - temp2), SCALING_FACTOR);
rt->_rate = (uint32_t)temp_rate;
if(_debug) click_chatter("[Cc] updated rate = %lu => final rate = %d", temp_rate, rt->_rate);
}
uint64_t temp_momentum = (uint64_t)_s_minus_alpha_momentum_s * (uint64_t)rt->_moment + (uint64_t)_alpha_momentum_s * (uint64_t)rt->_rate;
temp_momentum = int_divide(temp_momentum, SCALING_FACTOR);
rt->_moment = (uint32_t)(temp_momentum);
#endif
//rt->_timeout = 0;
return rt->_rate;
} else {
click_chatter("[Cc] Error: did not find route when attempting to update rate (multipath)!");
return 0;
}
}
/**
* Route multiplexing : For a set of routes to a dst, return the route to use according to
* probability p = w_i/SUM(i){w_i}. i.e : p is proportional to weight of route
* note : timeout is set to 0 at first packet sending event
*
* returns the index of the route to use
*/
uint32_t Cc::initMultiplexedRoute(Vector<Route> const& routes){
uint32_t sum = 0;
Vector<uint32_t> intervals = Vector<uint32_t>(routes.size(),0u);
IPAddress dst = routes[0].ip; //read the dst IP from here, (so it's always a local address)
_now.assign_now();
FormatedRoute force_route;
//init rate for unknown routes
Vector<FormatedRoute> froutes;
for (int i=0 ; i< routes.size(); ++i){
FormatedRoute froute = FormatedRoute(routes[i]);
froutes.push_back(froute);
rateTuple_entry* r = _rateTable.get_pointer(froute);
if(is_in_hash_table(_removed_routes, froute)) {
// do not consider routes that have been removed (did not receive an ack); but every second, retry to send one packet
if(r==0 || (_now-r->_last_sent).sec() >= 1) {
force_route = froute;
continue;
}
}
if(r==0) {
//set default route x_r rate
rateTuple_entry r;
if(_delta > 0) {
r._rate = (uint32_t) (routes[i].max_capa*(1-_delta));
r._moment=r._rate = (uint32_t) (routes[i].max_capa*(1-_delta));
}
else {
r._rate=int_divide(9*routes[i].max_capa, 10);
r._moment=int_divide(9*routes[i].max_capa, 10);
}
r._timeout=0;
r._incr_timeout = false;
if(_debug) click_chatter("[Cc %s] No rate tuple for route %s, adding it with rate %u kb/s (max_capa %u kb/s).",
Timestamp::now().unparse().c_str(), froute.unparse().c_str(), r._rate, routes[i].max_capa);
_rateTable.set(froute,r);
//reset seq number
int num_hop_route = froute.number_hops();
if(num_hop_route > _max_num_hop_routes)
_max_num_hop_routes = num_hop_route;
//Julien: also add this route to the dst2routes table (needed to compute sum of rates per destination when refreshing rates)
Vector<FormatedRoute> *routes_for_dest = _dst2routes.get_pointer(dst);
if(routes_for_dest == 0){
Vector<FormatedRoute> *routes_for_dest = new Vector<FormatedRoute>();
routes_for_dest->push_back(froute);
_dst2routes[dst] = *routes_for_dest;
} else if(!is_in_vector(*routes_for_dest, froute)){
routes_for_dest->push_back(froute);
}
sum+=INITIAL_ROUTE_RATE;
intervals[i]=sum;
if(_first_packet == Timestamp()) {
_first_packet.assign_now();
_update_timer.schedule_after_sec(1);
_rates_timer.schedule_after_sec(1);
}
}
else {
if((_now-r->_last_sent).msecval() >= 100) {
if(_debug) click_chatter("[Cc %s] No traffic sent on route %s since 100ms, send one", _now.unparse().c_str(),froute.unparse().c_str());
force_route = froute;
r->_last_sent = _now;
}
sum+=r->_rate;
intervals[i]=sum;
}
}
assert(routes.size() == froutes.size());
if(routes.size() == 1) {
if(!is_in_hash_table(_removed_routes, froutes[0])) {
rateTuple_entry *r=_rateTable.get_pointer(froutes[0]);
if (r!=0)
r->_last_sent = _now;
else
click_chatter("[Cc %s] WARNING: r is null for unique route 0 (%s)!", _now.unparse().c_str(), froutes[0].unparse().c_str());
}
return 0;
}
//chose randomly a route according to the weights
//not allowed to use floats in kernel space!
//double w = ((double)rand() / RAND_MAX)*sum; //0 to sum
//double w = ((double)click_random())/_rand_max_d; //click_random version (still using double)
uint32_t randi = click_random(0, sum); //between 0 and sum (inclusive)
//uint32_t index = 0;
for (uint32_t i=0; i<routes.size(); ++i){
if(randi<intervals[i]){
if(froutes.size() > i) {
rateTuple_entry *r=_rateTable.get_pointer(froutes[i]);
if (r!=0)
r->_last_sent = _now;
else
click_chatter("[Cc %s] WARNING: r is null for route %d/%d (%s); has been removed: %d; intervals %s, sum %d!", _now.unparse().c_str(),
i, routes.size(), froutes[i].unparse().c_str(), is_in_hash_table(_removed_routes, froutes[i]), print_vector(intervals).c_str(), sum);
}
else
click_chatter("[Cc %s] WARNING: no froutes!", _now.unparse().c_str());
if(force_route != FormatedRoute() && force_route != froutes[i])
_force_packet = force_route;
return i;
}
}
int i = (routes.size() - 1);
if(froutes.size() > i) {
rateTuple_entry *r=_rateTable.get_pointer(froutes[i]);
if (r!=0)
r->_last_sent = _now;
else
click_chatter("[Cc %s] WARNING: r is null for end route %d (%s)!", _now.unparse().c_str(), i, froutes[i].unparse().c_str());
if(force_route != FormatedRoute() && force_route != froutes[i]) {
_force_packet = force_route;
}
}
else
click_chatter("[Cc %s] WARNING: no froutes!", _now.unparse().c_str());
return (uint32_t) i;
//return routes[index];
}
/**
* Sequence number mechanics
*/
uint16_t Cc::getCurrentSeqNum(IPAddress const& addr, bool increase){
// if(!is_in_hash_table(_seqNumTable, addr)) {
// _seqNumTable.set(addr,0);
// return 0;
// }
if(increase) {
if(_seqNumTable[addr]==(MAX_SEQ-1)) {
if(_debug)
click_chatter("[Cc %s] Has reached MAX_SEQ.", Timestamp::now().unparse().c_str());
_seqNumTable[addr] = 0;
}
else
_seqNumTable[addr]=_seqNumTable[addr]+1;
}
return _seqNumTable[addr];
}
uint16_t Cc::getCurrentSeqNum(IPAddress const& addr){
return getCurrentSeqNum(addr, true);
}
//////////////////////////////////// HANDLERS METHODS///////////////////////////////////////
/**
* @return the first Route current rate x_r in Mbps
*/
String Cc::getFirstRate(){
if(_rateTable.size() > 0)
return _rateTable.begin().key().unparse()+" : "+String(_rateTable.begin().value()._rate);
else{
String str;
return str;
}
}
String Cc::getAllRates(){
String str;
for(HashTable<FormatedRoute,rateTuple_entry>::iterator it = _rateTable.begin(); it; ++it){
str = str + it.key().unparse()+" --> "+String(it.value()._rate)+" ; "+String(it.value()._timeout)+" ; ";
#ifdef CLICK_USERLEVEL
float *priceptr = _priceTable.get_pointer(it.key());
if(priceptr != 0){
str += String((double)(*priceptr)) + " | ";
} else {
str += "unknown price | ";
}
#else
uint32_t *priceptr = _priceTable.get_pointer(it.key());
if(priceptr != 0){
str += String(*priceptr) + " | ";
} else {
str += "unknown price | ";
}
#endif
}
return str;
}
String Cc::rate_callback(Element *e, void *thunk){
Cc *c = (Cc *)e;
return c->getFirstRate();
}
String Cc::rates_callback(Element *e, void *thunk){
Cc *c = (Cc *)e;
return c->getAllRates();
}
void
Cc::reset_rates() {
click_chatter("[Cc %s] Resetting Cc element", Timestamp::now().unparse().c_str());
_rateTable.clear();
_priceTable.clear();
_drops=0;
_removed_routes.clear();
_packets_sent_table.clear();
_seqNumTable.clear();
_max_num_hop_routes=1;
_first_packet = Timestamp();
_count_packets = 0;
_route_to_send = Route();
#ifdef CLICK_USERLEVEL
_alpha_div = 1.0;
#endif
_stop_flow_table.clear();
_nr_aggressive_slots = _init_nr_aggressive_slots;
}
void Cc::reset_flow(IPAddress ip) {
_stop_flow_table.erase(FlowTuple(_routingPaths_element->my_ip(),ip));
}
int
Cc::reset_flow_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
Cc *elmt = (Cc *)e;
IPAddress ip;
if(!cp_ip_address(s, &ip))
return errh->error("Arg must be an IP address");
if(((intptr_t) a) == 0)
elmt->reset_flow(ip);
return 0;
}
int
Cc::nr_routes_handler(const String &s, Element *e, void *, ErrorHandler *errh) {
Cc *cc_elmt = (Cc *) e;
int new_nr_routes;
if(!cp_integer(s, &new_nr_routes))
return errh->error("Invalid number of routes");
cc_elmt->set_nr_routes(new_nr_routes);
return 0;
}
int
Cc::alpha_rate_handler(const String &s, Element *e, void *, ErrorHandler *errh) {
Cc *cc_elmt = (Cc *) e;
#ifdef CLICK_USERLEVEL
double new_alpha_rate;
if(!cp_double(s, &new_alpha_rate))
return errh->error("Invalid alpha: must be double");
cc_elmt->set_alpha_rate(new_alpha_rate);
#else
uint32_t new_alpha_rate;
if(!cp_integer(s, &new_alpha_rate))
return errh->error("Invalid alpha: must be integer");
cc_elmt->set_alpha_rate(new_alpha_rate);
#endif
return 0;
}
int
Cc::beta_handler(const String &s, Element *e, void *, ErrorHandler *errh) {
Cc *cc_elmt = (Cc *) e;
#ifdef CLICK_USERLEVEL
double new_beta;
if(!cp_double(s, &new_beta))
return errh->error("Invalid beta: must be double");
cc_elmt->set_beta(new_beta);
#endif
return 0;
}
int
Cc::reset_rates_handler(const String &, Element *e, void *,
ErrorHandler *) {
Cc *cc = (Cc *)e;
cc->reset_rates();
return 0;
}
static String
read_parameters_handler(Element *e, void *param) {
Cc *cc_elmt = (Cc *)e;
StringAccum s;
#ifdef CLICK_USERLEVEL
s << "alpha_rate=" << cc_elmt->get_alpha_rate() << "; ";
s << "beta=" << cc_elmt->get_beta();
#endif
return s.take_string();
}
int
Cc::bool_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
Cc *elmt = (Cc *)e;
int arg;
if(!cp_integer(s, &arg))
return errh->error("Arg must be 0 or 1");
if (!(arg == 0 || arg == 1))
return errh->error("Arg must be 0 or 1");
if(((intptr_t) a) == 0)
elmt->set_debug(arg==1);
if(((intptr_t) a) == 1)
elmt->set_saturate(arg==1);
return 0;
}
int
Cc::wifi_plc_route_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
Cc *elmt = (Cc *)e;
int arg;
if(!cp_integer(s, &arg))
return errh->error("Arg must be 0 or 1");
if (!(arg == 0 || arg == 1))
return errh->error("Arg must be 0 or 1");
if(((intptr_t) a) == 0)
elmt->set_wifi_route(arg==1);
else if(((intptr_t) a) == 1)
elmt->set_plc_route(arg==1);
return 0;
}
int
Cc::burst_size_handler(const String &s, Element *e, void *,
ErrorHandler *errh) {
Cc *elmt = (Cc *)e;
int arg;
if(!cp_integer(s, &arg) || arg <= 0)
return errh->error("Arg must be a strictly positive integer");
elmt->set_burst_size(arg);
return 0;
}
int
Cc::delta_handler(const String &s, Element *e, void *, ErrorHandler *errh) {
Cc *elmt = (Cc *)e;
#ifdef CLICK_USERLEVEL
double new_delta;
if(!cp_double(s, &new_delta))
return errh->error("Invalid delta: must be double");
elmt->set_delta(new_delta);
#endif
return 0;
}
void Cc::add_handlers() {
add_write_handler("nr_routes", nr_routes_handler, 0);
add_write_handler("alpha", alpha_rate_handler, 0);
add_write_handler("beta", beta_handler, 0);
add_write_handler("reset_rates", reset_rates_handler,0);
add_write_handler("debug", bool_handler, 0);
add_write_handler("saturate_routes", bool_handler, 1);
add_write_handler("send_wifi_route", wifi_plc_route_handler, 0);
add_write_handler("send_plc_route", wifi_plc_route_handler, 1);
add_write_handler("burst", burst_size_handler, 0);
add_read_handler("parameters", read_parameters_handler,0);
add_write_handler("delta", delta_handler, 0);
add_write_handler("reset_flow", reset_flow_handler, 0);
}
CLICK_ENDDECLS
EXPORT_ELEMENT(Cc)
ELEMENT_PROVIDES(Cc)

Event Timeline