Page MenuHomec4science

srcmab.cc
No OneTemporary

File Metadata

Created
Fri, Jul 4, 15:16

srcmab.cc

#include <click/config.h>
#include "util.hh"
#include "srcmab.hh"
#ifdef CLICK_USERLEVEL
#include <math.h>
#endif
#define DEBUG_CHATTER(arg, ...) do { if (_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
#define VERB_DEBUG_CHATTER(arg, ...) do { if (_verb_debug && (_verb_debug_count%_verb_debug_freq==0 || _verb_debug_count <= 20)) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
#define DEBUG_NACK_CHATTER(arg, ...) do { if ( (_verb_debug || _debug_nack) && (_verb_debug_count%_verb_debug_freq==0 || _verb_debug_count <= 20)) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
CLICK_DECLS
SrcMab::SrcMab() : _q(0), _trial_timer(this), _active_timer(this), _defered_packet_timer(this), _dequeue_timer(this), _send_packets_timer(this)
{
}
SrcMab::~SrcMab()
{
}
int SrcMab::configure(Vector<String> &conf, ErrorHandler *errh){
_debug = false;
_verb_debug = false;
_verb_debug_freq = 100;
_verb_debug_count=0;
_debug_nack = false;
_debug_explore = false;
String routing_name;
_nr_routes = 2;
_nr_arms = 1;
_trial_duration = 1000;
_active = true;
String acne_src_name;
String rate_shaper_name;
_acne_source_elmt = 0;
_stop_explore = false;
_consecutive_killed=0;
_max_consecutive_killed=3;
#ifdef CLICK_USERLEVEL
_lambda_mab = 1;
#endif
_max_capacity = 200;
_timeout_bool = true;
_max_count_packets = 1;
_freq_check_meas=50;
_new_explore = false;
_mark_last_packet = false;
_last_packet_marked = false;
_stop_sending = false;
_killed_mark = 0;
_killed_stop = 0;
_killed_too_much = 0;
_killed_too_much_enq = 0;
_burst_ctrl = 1;
_silent_slot = 100;
_delta = 0;
_ewma = false;
_variable_min_proba = false;
_kill_if_not_empty = false;
_tb_capacity_factor = 50;
_tcp_retrans = false;
int ms_small_delta = 150;
_min_proba = 5;
_alpha=100;
_nacks=true;
_clear_after=3600;
_max_capacity_exploit = -1;
_deterministic_explore = false;
_rand_no_ucb = false;
_freq_computed_routes = 0;
_variable_min_proba_max = 10;
_variable_min_proba_min = 2;
String meas_name_wifi;
String meas_name_plc;
if(Args(this, errh).bind(conf)
.read_m("ROUTING", routing_name)
.read("DEBUG", _debug)
.read("DEBUG_NACK", _debug_nack)
.read("DEBUG_EXPLORE", _debug_explore)
.read("VERB_DEBUG", _verb_debug)
.read("VERB_DEBUG_FREQ", _verb_debug_freq)
.read("NR_ROUTES", _nr_routes)
.read("NR_ARMS", _nr_arms)
.read("TRIAL_DURATION", _trial_duration)
.read("ACNE_SOURCE", acne_src_name)
.read("RATE_SHAPER", rate_shaper_name)
.read("ACTIVE", _active)
#ifdef CLICK_USERLEVEL
.read("LAMBDA", _lambda_mab)
#endif
.read("TIMEOUT", _timeout_bool) // if clear things after 10 sec inactive
.read("BURST_CTRL", _burst_ctrl)
.read("MAX_COUNT_PACKETS", _max_count_packets)
.read("SILENT_SLOT", _silent_slot)
.read("QUEUE_CAPACITY", _max_capacity)
.read("QUEUE_CAPACITY_EXPLOIT", _max_capacity_exploit)
.read("DELTA", _delta) // in per 1000
.read("VARIABLE_MIN_PROBA", _variable_min_proba)
.read("VARIABLE_MIN_PROBA_MIN", _variable_min_proba_min) // in percent
.read("VARIABLE_MIN_PROBA_MAX", _variable_min_proba_max)
.read("MIN_PROBA_EXPLORE", _min_proba) // in per 100
.read("SMALL_DELTA", ms_small_delta)
.read("ALPHA", _alpha) // alpha for EWMA for rates in per 1000; if no EWMA, gives number of measurements to average on by 1000/_alpha
.read("EWMA", _ewma) // if rates averaged through EWMA; otherwise, average on 1000/_alpha last measurements
.read("NACKS", _nacks)
.read("TB_CAPACITY_FACTOR", _tb_capacity_factor)
.read("TCP_RETRANS_PRIORITY", _tcp_retrans)
.read("CLEAR_AFTER_SEC", _clear_after)
.read("RAND_NO_UCB", _rand_no_ucb)
.read("FREQ_COMPUTE_ROUTES", _freq_computed_routes) // frequency for recomputing routes (if 0, never)
.read("MAX_CONSECUTIVE_KILLED", _max_consecutive_killed) // max consecutive killed for TCP
.read("MEASUREMENTS_WIFI", meas_name_wifi)
.read("MEASUREMENTS_PLC", meas_name_plc)
.read("DETERMINISTIC_EXPLORE", _deterministic_explore) // explore is deterministic (every 1/prob trial), not random
.read("FREQ_CHECK_MEASUREMENT", _freq_check_meas)
.complete() < 0)
return -1;
String compound_name;
_small_delta = Timestamp(0,ms_small_delta*Timestamp::subsec_per_msec);
_min_proba = _min_proba*10000; // explore probability is given in per 1000000 in the code
_variable_min_proba_max = _variable_min_proba_max*10000;
_variable_min_proba_min = _variable_min_proba_min*10000;
_routing = (RoutingPaths*) router()->find(routing_name, compound_name, errh);
_capacity = _max_capacity;
_current_capacity = _max_capacity;
if(_max_capacity_exploit==-1)
_max_capacity_exploit = _max_capacity/10; // smaller capacity for exploit
if(_routing == 0 || _routing->cast("RoutingPaths") == 0) {
return errh->error("[SrcMab] %s%s is not a RoutingPaths element.",
compound_name.c_str(), routing_name.c_str());
}
if(_nr_routes > 2)
return errh->error("[SrcMab] Up to 2 routes only supported for now.");
if(_delta < 0 || _delta > 1000) {
return errh->error("[SrcMab] Delta should be between 0 and 1000");
}
_trial_timer.initialize(this);
_active_timer.initialize(this);
_defered_packet_timer.initialize(this);
_dequeue_timer.initialize(this);
_send_packets_timer.initialize(this);
_acne_source_elmt = (AcneSource*)router()->find(acne_src_name, "", errh);
if(_acne_source_elmt != 0 && _acne_source_elmt->cast("AcneSource") == 0) {
return errh->error("%s exists but is not an AcneSource.", acne_src_name.c_str());
}
_rate_shaper_elmt = (BandwidthRatedSplitter*)router()->find(rate_shaper_name, "", errh);
if(_rate_shaper_elmt != 0 && _rate_shaper_elmt->cast("BandwidthRatedSplitter") == 0) {
return errh->error("%s exists but is not an BandwidthRatedSplitter.", rate_shaper_name.c_str());
}
_now.assign_now();
if(_acne_source_elmt == 0)
click_chatter("[SrcMab %s]. Configured without AcneSource, small delta %s.", _now.unparse().c_str(), _small_delta.unparse().c_str());
else
click_chatter("[SrcMab %s]. Configured with AcneSource %s, small delta %s.", _now.unparse().c_str(), acne_src_name.c_str(), _small_delta.unparse().c_str());
if(_rate_shaper_elmt == 0)
click_chatter("[SrcMab %s]. Configured without rate shaper.", _now.unparse().c_str());
else
click_chatter("[SrcMab %s]. Configured with rate shaper %s.", _now.unparse().c_str(), rate_shaper_name.c_str());
if(_acne_source_elmt && _debug)
_acne_source_elmt->set_debug(true);
if(meas_name_wifi != "") {
_measurements_wifi = (MeasurementsWifi*) router()->find(meas_name_wifi, compound_name, errh);
if(_measurements_wifi == 0 || _measurements_wifi->cast("MeasurementsWifi") == 0) {
return errh->error("%s%s:%d is not a MeasurementsWifi element.",
compound_name.c_str(), meas_name_wifi.c_str(), _measurements_wifi);
}
}
else
_measurements_wifi = 0;
if(meas_name_plc != "") {
_measurements_plc = (MeasurementsPLC*) router()->find(meas_name_plc, compound_name, errh);
if(_measurements_plc == 0 || _measurements_plc->cast("MeasurementsPLC") == 0) {
return errh->error("%s%s:%d is not a MeasurementsPLC element.",
compound_name.c_str(), meas_name_plc.c_str(), _measurements_plc);
}
}
else
_measurements_plc = 0;
return 0;
}
int
SrcMab::initialize(ErrorHandler *errh) {
// check that it works
if(_rate_shaper_elmt) {
#ifdef CLICK_USERLEVEL
ErrorHandler::static_initialize(new FileErrorHandler(stderr, ""));
#else
ErrorHandler::static_initialize(errh);
#endif
uint32_t total_rate = 125000;
String rate = String(int_divide(ACNE_SOURCE_FACTOR*total_rate,10))+String("Bps");
reconfigure_keyword_handler(rate, _rate_shaper_elmt, (void *) "0 RATE", ErrorHandler::default_handler());
_rate_shaper_elmt->set_active(false);
}
_active_timer.schedule_after_sec(10);
assert(!_q && head() == 0 && tail() == 0);
_q = (Packet **) CLICK_LALLOC(sizeof(Packet *) * (_capacity + 1));
if (_q == 0)
return errh->error("[SrcMab] Out of memory");
return 0;
}
void
SrcMab::push(int port, Packet *p_in) {
if(!p_in)
return;
if(port == 0) { // messages from host: send them at desired rate
_verb_debug_count++;
const click_ip* ip4_header = p_in->ip_header();
const IPAddress ip_dst(ip4_header->ip_dst.s_addr);
if(ip_dst == IPAddress()) {
VERB_DEBUG_CHATTER("[SrcMab %s] No IP destination, kill packet", _now.unparse().c_str());
p_in->kill();
return;
}
_now.assign_now();
_last_active = _now;
if(is_exception_packet(p_in)) { // if packet must go through (e.g. NTP)
handle_traffic_packet(p_in,true, false, false);
return;
}
if(_active) {
_current_dst = ip_dst; //TODO: is that a good idea to modify without checking?
}
MabSituation mab_sit = MabSituation(_current_dst,_current_hash_flow);
if(_active && _arms[mab_sit].empty()) {
if(_multipaths[ip_dst].empty()) {
Vector<Vector<Route> > multipaths = _routing->get_multipaths(ip_dst, _nr_arms, _nr_routes);
if(multipaths.size() > 0) {
DEBUG_CHATTER("[SrcMab %s] Has computed %d multipaths", _now.unparse().c_str(), multipaths.size());
_multipaths[ip_dst] = multipaths;
_last_computed_routes.assign_now();
}
}
if (!_multipaths[ip_dst].empty()) {
if(ip4_header->ip_p == IP_PROTO_TCP)
_is_tcp[ip_dst] = 1;
else
_is_tcp[ip_dst] = 0;
DEBUG_CHATTER("[SrcMab %s] Got first message (length %d, protocol %d) to %s for hash flow %u, compute routes and begin exploration...",
_now.unparse().c_str(), p_in->length(), ip4_header->ip_p, ip_dst.unparse().c_str(), _current_hash_flow);
_begin.assign_now();
if(_tcp_retrans) {
_priority_queue.set(ip_dst,new SimpleQueue());
Vector<String> conf;
cp_argvec(String("CAPACITY ")+String(_max_capacity), conf);
ErrorHandler *errh = new ErrorHandler();
if(_priority_queue.get(ip_dst)->configure(conf, errh) < 0) {
click_chatter("[SrcMab %s] Error while configuring priority queue for %s", _now.unparse().c_str(), ip_dst.unparse().c_str());
_priority_queue.set(ip_dst,0);
}
if(!_priority_queue.get(ip_dst) || _priority_queue.get(ip_dst)->initialize(errh) < 0) {
click_chatter("[SrcMab %s] Error while initializing priority queue for %s", _now.unparse().c_str(), ip_dst.unparse().c_str());
_priority_queue.set(ip_dst,0);
}
}
if(_verb_debug) {
uint16_t sport, dport;
if(ip4_header->ip_p == IP_PROTO_UDP) {
const click_udp *udp_hdr = p_in->udp_header();
sport = udp_hdr->uh_sport;
dport = udp_hdr->uh_dport;
}
if(ip4_header->ip_p == IP_PROTO_TCP) {
click_tcp *tcp_hdr = (click_tcp *)(p_in->tcp_header());
sport = tcp_hdr->th_sport;
dport = tcp_hdr->th_dport;
}
VERB_DEBUG_CHATTER("\t\t Message from %s:%u to %s:%u", IPAddress(ip4_header->ip_src.s_addr).unparse().c_str(), sport,
ip_dst.unparse().c_str(), dport);
}
// initialize arms
_arms.set(mab_sit, Vector<MabArmInfo*>());
_scaling.set(mab_sit,0);
for(int i=0;i<_multipaths[ip_dst].size();i++) {
Vector<Route> routes = _multipaths[ip_dst][i];
if(_debug) {
for(int j=0;j<routes.size();j++)
click_chatter("[SrcMab] Adding to arm %d route %s", i, routes[j].unparse().c_str());
}
_arms[mab_sit].push_back(new MabArmInfo(ip_dst, routes, _alpha));
uint32_t rate_scaled = _arms[mab_sit][i]->_current_total_rate; // scaling is 1.5 max capa computed by routing
if(rate_scaled > _scaling[mab_sit]) {
_best_arm[mab_sit] = _arms[mab_sit][i];
_best_arm_id[mab_sit] = i;
_scaling[mab_sit] = rate_scaled;
}
}
for(int i=0;i<_arms[mab_sit].size();i++) {
_arms[mab_sit][i]->_scaling_rate = _scaling[mab_sit];
_arms[mab_sit][i]->_tb.assign(100000, 10000);
_arms[mab_sit][i]->_min_proba=_min_proba;
}
_last_explores.set(mab_sit,Ring<int>((1000/_alpha)*_arms[mab_sit].size(), -1));
_dummy_packet.set(ip_dst, p_in->clone()->uniqueify());
_current_nb_flows.set(ip_dst, 1);
DEBUG_CHATTER("[SrcMab] Scaling is %d", _scaling[mab_sit]);
_first_trial[ip_dst] = NUMBER_FIRST_TRIALS;
_dequeue_first[ip_dst] = false;
_trial_timer.schedule_now();
enq(p_in);
return;
}
else {
DEBUG_CHATTER("[SrcMab %s] Empty multipath for dst %s", _now.unparse().c_str(), ip_dst.unparse().c_str());
}
}
bool all_dequeued=true;
bool packet_handled = false;
if(_active) {
if(!_trial_timer.scheduled())
_trial_timer.schedule_now();
if(_first_trial[ip_dst] == NUMBER_FIRST_TRIALS) {
VERB_DEBUG_CHATTER("[SrcMab %s] Enqueuing first packet", _now.unparse().c_str());
if(size() >= _current_capacity || !enq(p_in)) {
VERB_DEBUG_CHATTER("[SrcMab %s] Queue is full, kill it", _now.unparse().c_str());
p_in->kill();
}
return;
}
if(_dequeue_first[ip_dst]) {
// dequeue packets enqueued before first trial
int s = size();
if(s>0) {
DEBUG_CHATTER("[SrcMab %s] Dequeuing first %d packets...", _now.unparse().c_str(), s);
for(int i=0;i<s;i++)
handle_traffic_packet(deq(), true);
}
}
_dequeue_first[ip_dst] = false;
bool must_pass = false;
if(!_stop_sending && !(_active && (_now < _can_send_at)) && !_last_packet_marked) {
if(_tcp_retrans && _priority_queue.get(ip_dst)) {
// first try priority packets (TCP retransmissions)
int s_prio = _priority_queue.get(ip_dst)->size();
while(_priority_queue.get(ip_dst)->size() > 0) {
VERB_DEBUG_CHATTER("[SrcMab %s] Trying to send TCP retransmission (%d/%d)...", Timestamp::now().unparse().c_str(),
s_prio-_priority_queue.get(ip_dst)->size(), s_prio);
if(can_send(ip_dst)) {
_retrans_tries++;
Packet *p = _priority_queue.get(ip_dst)->deq();
if(!handle_traffic_packet(p, false, false, true)) { // should not happen (size checked before)
assert(p);
DEBUG_CHATTER("[SrcMab %s] Cannot send, re-enqueue, break...", Timestamp::now().unparse().c_str());
_priority_queue.get(ip_dst)->enq(p);
all_dequeued = false;
break;
}
else
_retrans_sent++;
}
else {
VERB_DEBUG_CHATTER("[SrcMab %s] Cannot send, break...", Timestamp::now().unparse().c_str());
all_dequeued = false;
break;
}
}
}
}
else
must_pass=true;
if(ip4_header->ip_p == IP_PROTO_TCP) {
click_tcp *tcp_hdr = (click_tcp *)(p_in->tcp_header());
VERB_DEBUG_CHATTER("[SrcMab %s] Received packet %p TCP seq %u", _now.unparse().c_str(), p_in, htonl(tcp_hdr->th_seq));
if(_tcp_retrans) {
if(tcp_hdr) {
//int64_t tcp_seq = (int64_t) htonl(tcp_hdr->th_seq);
//int64_t diff = _current_tcp_seq[ip_dst] - tcp_seq;
// check if packet_seq has not already been seen (might happen with nack restransmissions), ie smaller than cur_seq
//if(_current_tcp_seq[ip_dst]!=0 && ((diff >= 0 && diff <= ((int64_t) UINT32_MAX/2)) || diff <= -1*((int64_t) UINT32_MAX/2)) ) {
if(tcp_hdr->th_flags & TH_URG) {
tcp_hdr->th_flags &= ~TH_URG; // reset flag to 0
VERB_DEBUG_CHATTER("[SrcMab %s] Received packet TCP seq %u, current seq is %u, this is a TCP retransmission, send it with high priority", _now.unparse().c_str(),
htonl(tcp_hdr->th_seq), (uint32_t) _current_tcp_seq[ip_dst]);
_retrans_tries++;
packet_handled = true;
if(must_pass || !all_dequeued || !handle_traffic_packet(p_in,false,false,true)) {
assert(p_in);
VERB_DEBUG_CHATTER("[SrcMab %s] Enqueue priority packet (bools %d %d)...", Timestamp::now().unparse().c_str(), must_pass, all_dequeued);
if(_priority_queue.get(ip_dst)) {
if(_priority_queue.get(ip_dst)->size() == _priority_queue.get(ip_dst)->capacity()) {
VERB_DEBUG_CHATTER("[SrcMab %s] Priority queue is full, normal queue...", Timestamp::now().unparse().c_str());
if(!enq(p_in)) {
VERB_DEBUG_CHATTER("[SrcMab %s] Queue is full, kill it...", Timestamp::now().unparse().c_str());
p_in->kill();
}
}
else
_priority_queue.get(ip_dst)->enq(p_in);
}
else {
VERB_DEBUG_CHATTER("[SrcMab %s] No priority queue, normal queue...", Timestamp::now().unparse().c_str());
if(!enq(p_in)) {
VERB_DEBUG_CHATTER("[SrcMab %s] Queue is full, kill it...", Timestamp::now().unparse().c_str());
p_in->kill();
}
}
return;
}
else {
_retrans_sent++;
}
}
}
}
if(!(tcp_hdr->th_flags & TH_URG))
_current_tcp_seq.set(ip_dst, (int64_t) htonl(tcp_hdr->th_seq));
}
if(!must_pass && all_dequeued) {
// try to unqueue packets
int s = size();
if(s>0) {
for(int i=0;i<s;i++) {
VERB_DEBUG_CHATTER("[SrcMab %s] Trying to send dequeued packet (%d/%d)...", Timestamp::now().unparse().c_str(), i, s);
//if((_current_arm[mab_sit]->_explore != 0 && ip4_header->ip_p == IP_PROTO_TCP && !can_send(ip_dst, p_in->length())) || !handle_traffic_packet(deq(), true)) {
if(!can_send(ip_dst) || !handle_traffic_packet(deq(), true)) {
VERB_DEBUG_CHATTER("[SrcMab %s] Cannot send, break...", Timestamp::now().unparse().c_str());
all_dequeued = false;
break;
}
}
}
}
}
if(!packet_handled) {
if(all_dequeued)
handle_traffic_packet(p_in);
else {
if(_kill_if_not_empty) {
VERB_DEBUG_CHATTER("[SrcMab %s] Kill packet (queued packets remaining)", _now.unparse().c_str());
_killed_too_much_enq += p_in->length();
p_in->kill();
return;
}
else {
VERB_DEBUG_CHATTER("[SrcMab %s] Enqueuing packet (queued packets remaining)", _now.unparse().c_str());
if(size() >= _current_capacity || !enq(p_in)) {
_killed_too_much_enq += p_in->length();
p_in->kill();
return;
}
}
}
}
}
else if (port == 1) { // acks for me
if(!_active) {
p_in->kill();
return;
}
acne_header *acne_hdr = (acne_header *) (p_in->data());
if(acne_hdr->_type == MAB_ACK)
handle_ack(p_in);
else if(acne_hdr->_type == NACK_ORDER)
handle_nack_order(p_in);
}
}
bool SrcMab::can_send(IPAddress ip_dst) {
MabSituation mab_sit = MabSituation(ip_dst,_current_hash_flow);
uint32_t len = 1440;
if(!_current_arm[mab_sit]) {
VERB_DEBUG_CHATTER("[SrcMab %s] Current arm is null for %s, kill packet", _now.unparse().c_str(), mab_sit.unparse().c_str());
return false;
}
_current_arm[mab_sit]->_tb.refill();
if(_current_arm[mab_sit]->_tb.contains(len)) {
VERB_DEBUG_CHATTER("[SrcMab %s] TokenBucket is of size %d, can send packet of size %d", _now.unparse().c_str(), _current_arm[mab_sit]->_tb.size(), len);
return true;
}
VERB_DEBUG_CHATTER("[SrcMab %s] TokenBucket is of size %d, less than size of packet %d", _now.unparse().c_str(), _current_arm[mab_sit]->_tb.size(), len);
return false;
}
bool
SrcMab::handle_traffic_packet(Packet *p_in) {
return handle_traffic_packet(p_in, false, false,false);
}
bool
SrcMab::handle_traffic_packet(Packet *p_in, bool dequeued) {
return handle_traffic_packet(p_in, false, dequeued,false);
}
bool
SrcMab::handle_traffic_packet(Packet *p_in, bool force_inactive, bool dequeued, bool keep_packet) {
VERB_DEBUG_CHATTER("[SrcMab] handle_traffic_packet");
if(!p_in)
return false;
click_ip* ip4_header = (click_ip *) p_in->data();
if(!ip4_header) {
VERB_DEBUG_CHATTER("[SrcMab %s] No IP header, kill packet", _now.unparse().c_str());
p_in->kill();
return false;
}
const IPAddress ip_dst(ip4_header->ip_dst.s_addr);
if(ip_dst == IPAddress()) {
VERB_DEBUG_CHATTER("[SrcMab %s] No IP destination, kill packet", _now.unparse().c_str());
p_in->kill();
return false;
}
_now.assign_now();
MabSituation mab_sit = MabSituation(ip_dst,_current_hash_flow);
VERB_DEBUG_CHATTER("[SrcMab] Received packet %p, mab sit %s, force %d, dequeued %d, keep %d, mark %d (marked %d), stop %d, can send %d", p_in, mab_sit.unparse().c_str(),
force_inactive, dequeued, keep_packet, _mark_last_packet, _last_packet_marked, _stop_sending, _now < _can_send_at);
uint32_t len = p_in->length();
if(_active && !force_inactive) {
if(!_current_arm[mab_sit]) {
VERB_DEBUG_CHATTER("[SrcMab %s] Current arm is null for %s, kill packet", _now.unparse().c_str(), mab_sit.unparse().c_str());
p_in->kill();
return false;
}
if(!_mark_last_packet) {
_current_arm[mab_sit]->_tb.refill();
if(!_current_arm[mab_sit]->_tb.remove_if(len)) {
//if(dequeued || ip4_header->ip_p != IP_PROTO_TCP || _current_arm[mab_sit]->_explore == 0) {
if(!keep_packet) {
if(dequeued) {
VERB_DEBUG_CHATTER("[SrcMab %s] TokenBucket is of size %d, less than size of packet %d, kill it", _now.unparse().c_str(), _current_arm[mab_sit]->_tb.size(), len);
_killed_too_much += len;
p_in->kill();
return false;
}
else {
VERB_DEBUG_CHATTER("[SrcMab %s] TokenBucket is of size %d, less than size of packet %d, enqueue it (size is %d, capacity %d)",
_now.unparse().c_str(), _current_arm[mab_sit]->_tb.size(), len, size(), _capacity);
if((size() >= _current_capacity && _consecutive_killed < _max_consecutive_killed) || !enq(p_in)) {
//if((size() >= _current_capacity) || !enq(p_in)) {
_consecutive_killed++;
_killed_too_much_enq += len;
p_in->kill();
//deq()->kill();
//enq(p_in);
}
else {
_consecutive_killed = 0;
}
}
}
else {
VERB_DEBUG_CHATTER("[SrcMab %s] TokenBucket is of size %d, less than size of packet %d, return (keep packet)", _now.unparse().c_str(), _current_arm[mab_sit]->_tb.size(), len);
}
return false;
}
}
VERB_DEBUG_CHATTER("[SrcMab %s] New TB size %d", _now.unparse().c_str(), _current_arm[mab_sit]->_tb.size());
}
if(_stop_sending) {
VERB_DEBUG_CHATTER("[SrcMab %s] Enqueuing packet (stop sending)", _now.unparse().c_str());
if(!keep_packet) {
if(size() >= _current_capacity || !enq(p_in)) {
_killed_stop+=p_in->length();
p_in->kill();
}
//deq()->kill();
//enq(p_in);
}
else
VERB_DEBUG_CHATTER("[SrcMab %s] Keep packet (stop sending)", _now.unparse().c_str());
return false;
}
if(_active && (_now < _can_send_at)) {
// between end of trial and beginning of next one, enqueue or kill packet
VERB_DEBUG_CHATTER("[SrcMab %s] Enqueuing packet (cannot send yet)", _now.unparse().c_str());
if(!keep_packet) {
if(size() >= _current_capacity || !enq(p_in)) {
_killed_stop+=p_in->length();
p_in->kill();
}
//deq()->kill();
//enq(p_in);
}
else
VERB_DEBUG_CHATTER("[SrcMab %s] Keep packet (cannot send yet)", _now.unparse().c_str());
return false;
}
if(_last_packet_marked) {
VERB_DEBUG_CHATTER("[SrcMab %s] Enqueuing packet (last packet marked)", _now.unparse().c_str());
if(!keep_packet) {
if(size() >= _current_capacity || !enq(p_in)) {
_killed_mark+=p_in->length();
p_in->kill();
}
//deq()->kill();
//enq(p_in);
}
else
VERB_DEBUG_CHATTER("[SrcMab %s] Keep packet (last packet marked)", _now.unparse().c_str());
return false;
}
// if(!_dequeue_first[ip_dst]) {
// // dequeue packets that could have been queued
// int s = size();
// if(s>0) {
// DEBUG_CHATTER("[SrcMab %s] Dequeuing %d packets...", _now.unparse().c_str(), s);
// for(int i=0;i<s;i++)
// send_packet(deq(), force_inactive, false);
// }
// }
if(dequeued)
_sent_dequeued+=p_in->length();
send_packet(p_in, force_inactive, false);
return true;
}
void
SrcMab::send_packet(Packet *p_in, bool force_inactive, bool dummy_packet) {
if(!p_in)
return;
click_ip* ip4_header = (click_ip *) p_in->data();
if(!ip4_header) {
VERB_DEBUG_CHATTER("[SrcMab %s] No IP header, kill packet", _now.unparse().c_str());
p_in->kill();
return;
}
IPAddress ip_dst(ip4_header->ip_dst.s_addr);
MabSituation mab_sit = MabSituation(ip_dst,_current_hash_flow);
if(ip_dst == IPAddress()) {
VERB_DEBUG_CHATTER("[SrcMab %s] No IP destination, kill packet", _now.unparse().c_str());
p_in->kill();
return;
}
_now.assign_now();
FormatedRoute froute_to_use;
FlowInfoLB *info_to_use = 0;
int i_to_use = 0;
VERB_DEBUG_CHATTER("[SrcMab %s] Got packet to send of length %d for %s", _now.unparse().c_str(), p_in->length(), ip_dst.unparse().c_str());
if(_active && !force_inactive) {
if(_multipaths[ip_dst].empty()) {
// means that there is no route to this destination
p_in->kill();
return;
}
if(!_current_arm[mab_sit]) {
VERB_DEBUG_CHATTER("[SrcMab %s] Current arm is null, kill packet", _now.unparse().c_str());
p_in->kill();
return;
}
if(_count_packets[ip_dst] >= _max_count_packets || _info_to_use[ip_dst] == 0) {
// choose the route with probability route_rate/total_rate
uint32_t randi = click_random(0, _current_arm[mab_sit]->_current_total_rate); // between 0 and _current_total_rate (inclusive)
uint32_t sum_loc = 0;
bool assigned = false;
for (int i=0; i<_current_arm[mab_sit]->_current_sending_rate.size(); i++){
sum_loc += _current_arm[mab_sit]->_current_sending_rate[i].rate;
if(randi<=sum_loc) {
i_to_use = i;
info_to_use = &_current_arm[mab_sit]->_current_sending_rate[i_to_use];
break;
}
// Timestamp diff = (_now-_current_arm[mab_sit]->_current_sending_rate[i].last_packet);
// if((!assigned && randi<=sum_loc) || (_current_arm[mab_sit]->_explore==0 && diff.msecval() > 50)){ // send a packet at least every 50ms when exploiting
// assigned=true;
// i_to_use = i;
// info_to_use = &_current_arm[mab_sit]->_current_sending_rate[i_to_use];
// if(_current_arm[mab_sit]->_explore != 0)
// break;
// if(diff.msecval() > 50) {
// _count_packets[ip_dst] = _max_count_packets; // do not repeat if sent because of timestamp
// break;
// }
// }
}
if(info_to_use != 0)
froute_to_use = _current_arm[mab_sit]->_froutes[i_to_use];
_info_to_use[ip_dst] = info_to_use;
_froute_to_use[ip_dst] = froute_to_use;
_i_to_use[ip_dst] = i_to_use;
_count_packets[ip_dst] = 0;
}
_count_packets[ip_dst]++;
}
else {
IPAddress ip_wifi = (ip_dst & IPAddress("255.0.0.255")) | IPAddress("0.10.10.0"); // if not active, might be different
IPAddress old_ip_dst = ip_dst;
if(_multipaths[ip_wifi].empty()) {
Vector<Vector<Route> > multipaths = _routing->get_multipaths(ip_wifi, _nr_arms, _nr_routes);
if(multipaths.size() > 0) {
DEBUG_CHATTER("[SrcMab %s] Has computed %d multipaths", _now.unparse().c_str(), multipaths.size());
_multipaths[ip_wifi] = multipaths;
}
}
// second byte: index of multipath; third byte: index of route (10.10.10.x equivalent to 10.0.0.x)
VERB_DEBUG_CHATTER("[SrcMab] Got multipaths, indexes are %d (%s), %d (%s)",
(ip_dst & IPAddress("0.255.0.0")).addr(), (ip_dst & IPAddress("0.255.0.0")).unparse().c_str(),
(ip_dst & IPAddress("0.0.255.0")).addr(), (ip_dst & IPAddress("0.0.255.0")).unparse().c_str());
uint32_t index_mp = ntohl((ip_dst & IPAddress("0.255.0.0")).addr()) >> 16; // addr() in network-byte-order
uint32_t index_rt = ntohl((ip_dst & IPAddress("0.0.255.0")).addr()) >> 8;
VERB_DEBUG_CHATTER("[SrcMab] Must get multipath %d, route %d", index_mp, index_rt);
bool change_ip = true;
if(index_mp == 10 && index_rt == 10)
change_ip = false;
if(index_mp == 10)
index_mp = 0;
if(index_rt == 10)
index_rt = 0;
ip_dst = ip_wifi;
if(index_mp < _multipaths[ip_dst].size() && index_rt < _multipaths[ip_dst][index_mp].size()) {
_info_to_use[ip_dst] = (FlowInfoLB *) 1;
_froute_to_use[ip_dst] = FormatedRoute(_multipaths[ip_dst][index_mp][index_rt]);
_last_multipath = _multipaths[ip_dst][index_mp];
if(_last_sent.size() <= index_rt) {
_last_sent = Vector<FlowInfoLB>(_last_multipath.size(),FlowInfoLB(_now));
}
_last_sent[index_rt].sent+=p_in->length();
}
else {
int size_mult = 0;
if(index_mp < _multipaths[ip_dst].size())
size_mult = _multipaths[ip_dst][index_mp].size();
_info_to_use[ip_dst] = 0;
}
VERB_DEBUG_CHATTER("[SrcMab] Got route, info_to_use=%d", _info_to_use[ip_dst]);
if(change_ip)
ip4_header->ip_dst = ip_dst.in_addr();
VERB_DEBUG_CHATTER("[SrcMab] Sending packet to %s (old %s) using multipath %d, route %d", IPAddress(ip4_header->ip_dst).unparse().c_str(),
old_ip_dst.unparse().c_str(), index_mp, index_rt);
if(change_ip) { // TODO: check that it works
uint16_t len = ip4_header->ip_len;
ip4_header->ip_len = 0;
#if HAVE_FAST_CHECKSUM
ip4_header->ip_sum = ip_fast_csum((unsigned char *) ip4_header, sizeof(click_ip) >> 2);
#else
ip4_header->ip_sum = click_in_cksum((unsigned char *) ip4_header, sizeof(click_ip));
#endif
ip4_header->ip_len = len;
}
}
if(_info_to_use[ip_dst] == 0) { // should not happen
VERB_DEBUG_CHATTER("[SrcMab] No route, killing packet");
p_in->kill();
return;
}
WritablePacket *out = p_in->push(sizeof(acne_header));
if (!out){
DEBUG_CHATTER("[SrcMab] Error : can't add space for acne header.");
p_in->kill();
return;
}
ip4_header = reinterpret_cast<click_ip *>(out->data() + sizeof(acne_header));
if(_info_to_use[ip_dst] != (FlowInfoLB *) 1)
VERB_DEBUG_CHATTER("[SrcMab %s] Send packet of length %d (mark_last_packet=%d)", _now.unparse().c_str(), out->length(), _mark_last_packet);
acne_header* acne_hdr = (acne_header*) out->data();
memset(acne_hdr, 0, sizeof(acne_header));
acne_hdr->_is_last = _mark_last_packet;
if(_active && !force_inactive) {
if(_i_to_use[ip_dst] < _current_arm[mab_sit]->_current_sent_rate.size())
_current_arm[mab_sit]->_current_sent_rate[_i_to_use[ip_dst]].sent += out->length();
else { // should not happen
DEBUG_CHATTER("[SrcMab %s] WARNING: size error for _current_sent_rate, i=%d, size=%d (size _current_sending_rate %d)",
_now.unparse().c_str(), _i_to_use[ip_dst], _current_arm[mab_sit]->_current_sent_rate.size(),
_current_arm[mab_sit]->_current_sending_rate.size());
}
_info_to_use[ip_dst]->sent += (out->length() + sizeof(click_ether)); // there will be Ethernet header
_info_to_use[ip_dst]->last_packet.assign_now();
}
else
_info_to_use[ip_dst] = 0;
acne_hdr->_route = _froute_to_use[ip_dst];
acne_hdr->_hop = 0;
if(force_inactive)
acne_hdr->_type = SPECIAL_TRAFFIC;
else if(ip4_header->ip_p == PROTO_IP_TEST)
acne_hdr->_type = PROTO_IP_TEST;
else if (dummy_packet)
acne_hdr->_type = DUMMY_TRAFFIC_MAB;
else
acne_hdr->_type = REGULAR_TRAFFIC;
acne_hdr->_ts.assign_now();
if(_info_to_use[ip_dst] && acne_hdr->_type == REGULAR_TRAFFIC)
acne_hdr->_rate_demand = _info_to_use[ip_dst]->rate;
if(force_inactive || dummy_packet) // pushed directly by Order or killed before
acne_hdr->_seq = 0;
else
acne_hdr->_seq = getCurrentSeqNum(ip_dst);
if(_active && !force_inactive && _current_arm[mab_sit]->_explore > 0) {
if(_current_arm[mab_sit]->_explore == 1) { // should not happen
DEBUG_CHATTER("[SrcMab %s] WARNING: trying to send packet during silent slot.", _now.unparse().c_str());
out->kill();
return;
}
acne_hdr->_id_rate_mab = _currentId[ip_dst];
}
else
acne_hdr->_id_rate_mab = 0;
//out->set_ip_header(ip4_header, sizeof(click_ip));
if(acne_hdr->_is_last) {
send_defered_packet(out);
return;
}
VERB_DEBUG_CHATTER("[SrcMab %s] Send packet with seq %u on route %s", _now.unparse().c_str(), acne_hdr->_seq, _froute_to_use[ip_dst].unparse().c_str());
if(_active && _nacks) {
if(_packets_sent[ip_dst] == 0)
_packets_sent[ip_dst] = new PacketTable();
if(acne_hdr->_type == REGULAR_TRAFFIC && _current_arm[mab_sit]->_explore == 0) { // save only when exploiting
VERB_DEBUG_CHATTER("[SrcMab] Save packet with id %u", acne_hdr->_seq);
if(_packets_sent[ip_dst]->get(acne_hdr->_seq))
_packets_sent[ip_dst]->get(acne_hdr->_seq)->kill();
_packets_sent[ip_dst]->set(acne_hdr->_seq, out->clone()->uniqueify());
}
}
output(0).push(out);
}
void
SrcMab::handle_ack(Packet *p_in) {
_now.assign_now();
acne_header* acne_hdr = (acne_header*) p_in->data();
if(acne_hdr->_type == MAB_ACK) {
mab_ack *ack = (mab_ack *) (p_in->data() + sizeof(acne_header));
IPAddress ip_dst = ack->_dst;
uint16_t id = acne_hdr->_id_rate_mab;
// change arm only if answering to FLOW_INFORMATION and currently exploiting
if(ack->_flow_hash > 0 && ack->_flow_hash != _current_hash_flow && _current_arm[MabSituation(_current_dst,_current_hash_flow)]->_explore == 0) {
MabSituation mab_sit_old = MabSituation(_current_dst,_current_hash_flow);
MabSituation mab_sit_new = MabSituation(_current_dst,ack->_flow_hash);
if(ack->_max_nb_flows > _current_nb_flows[ip_dst]) {
DEBUG_CHATTER("[SrcMab %s] Hash change (was %u, is %u), flow number increase: was %u, is %u", _now.unparse().c_str(),
_current_hash_flow, ack->_flow_hash, _current_nb_flows[ip_dst], ack->_max_nb_flows);
if(_arms[mab_sit_new].empty()) { // otherwise already exists, nothing to do
// copy information and scale current best rate by the adequate ratio
DEBUG_CHATTER("[SrcMab] Create arms for %s from %s", mab_sit_new.unparse().c_str(), mab_sit_old.unparse().c_str());
for(int i=0;i<_arms[mab_sit_old].size();i++) {
MabArmInfo *new_arm = new MabArmInfo(_arms[mab_sit_old][i]);
_arms[mab_sit_new].push_back(new_arm);
_last_explores.set(mab_sit_new,_last_explores[mab_sit_old]);
if(_current_arm[mab_sit_old] == _arms[mab_sit_old][i])
_current_arm[mab_sit_new] = new_arm;
}
if(_current_arm[mab_sit_new] == 0) {
// no known current arm, start at first trial
_first_trial[_current_dst] = NUMBER_FIRST_TRIALS;
}
_best_arm[mab_sit_new] = _best_arm[mab_sit_old];
_best_arm_id[mab_sit_new] = _best_arm_id[mab_sit_old];
_scaling[mab_sit_new] = (_current_nb_flows[ip_dst]*_scaling[mab_sit_old])/ack->_max_nb_flows;
for(int i=0;i<_arms[mab_sit_new].size();i++) {
_arms[mab_sit_new][i]->_current_best_rate = vector_scaling(_arms[mab_sit_new][i]->_current_best_rate, _current_nb_flows[ip_dst], ack->_max_nb_flows);
_arms[mab_sit_new][i]->_current_best_rate_total = (_arms[mab_sit_new][i]->_current_best_rate_total*_current_nb_flows[ip_dst])/ack->_max_nb_flows;
_arms[mab_sit_new][i]->scale_last_best_rates(_current_nb_flows[ip_dst],ack->_max_nb_flows);
}
}
}
else {
DEBUG_CHATTER("[SrcMab %s] Hash change (was %u, is %u), flow number: was %u, is %u", _now.unparse().c_str(),
_current_hash_flow, ack->_flow_hash, _current_nb_flows[ip_dst], ack->_max_nb_flows);
// nothing to do: at first packet, will create arms if hash flow does not exist, otherwise will use version for this hash
}
_current_hash_flow = ack->_flow_hash;
_current_nb_flows[ip_dst] = ack->_max_nb_flows;
_trial_timer.unschedule();
}
if(id>0) { // id 0 only for flow information
// get arm for this ID
if(_rateId2MabArmInfo[ip_dst][id] == 0)
DEBUG_CHATTER("[SrcMab] WARNING: no arm for this destination %s and this id %d", ip_dst.unparse().c_str(), id);
else {
MabArmInfo *arm_this_id = _rateId2MabArmInfo[ip_dst][id];
int ind_arm = find_in_vector(_arms[MabSituation(_current_dst,_current_hash_flow)], arm_this_id);
int ind_froute = find_in_vector(arm_this_id->_froutes, ack->_target_route);
if (ind_froute == -1)
DEBUG_CHATTER("[SrcMab %s] WARNING: unknown route for this ack from %s, id %d, received rate %d",
_now.unparse().c_str(), ip_dst.unparse().c_str(), id, ack->_route_rate);
else {
if(!arm_this_id->_routes_unacked[id].get(ack->_target_route)) {
// ack already received (it is sent several times to ensure that it is received)
VERB_DEBUG_CHATTER("[SrcMab %s] Got again ack from %s, id %d, route %s, killing it.", _now.unparse().c_str(),
ip_dst.unparse().c_str(), id, ack->_target_route.unparse().c_str());
p_in->kill();
return;
}
if(_debug) {
_now.assign_now();
click_chatter("[SrcMab %s] Got an ack from %s, id %d, route %s, received rate %d, %d flows, delay %s", _now.unparse().c_str(),
ip_dst.unparse().c_str(), id, ack->_target_route.unparse().c_str(), ack->_route_rate, ack->_max_nb_flows,
(Timestamp::now_steady() - ack->_ctrl_id).unparse().c_str());
}
arm_this_id->_routes_unacked[id].erase(ack->_target_route);
if(arm_this_id->_routes_unacked[id].size() == 0)
arm_this_id->_routes_unacked.erase(id);
Route target_route = arm_this_id->_routes[ind_froute];
DEBUG_CHATTER("[SrcMab] Got gamma for %d links", target_route.links.size());
bool pass_update = false;
bool all_small = true;
for (int i=0;i<target_route.links.size();i++) {
if(ack->_route_rate > 0 && ack->_gammas[i] > 100) // if rate received > 0, gamma should be at least 100 to be precise enough
all_small=false;
if (ack->_gammas[i] == -1 || (ack->_route_rate > 0 && ack->_gammas[i] == 0)) { // invalid gamma
DEBUG_CHATTER("[SrcMab] WARNING: got invalid gamma=%s for link %s with rate %d on arm %d? Already removed %d.",
String(ack->_gammas[i]).c_str(), target_route.links[i].unparse().c_str(), ack->_route_rate, ind_arm,
arm_this_id->_explore_ids[id].size() == 0);
pass_update = true;
}
else if (ack->_gammas[i] > SATURATED_THRES) { // means that the link is saturated
DEBUG_CHATTER("[SrcMab] WARNING: got gamma=%s for link %s with rate %d on arm %d, saturated link? Removed %d.",
String(ack->_gammas[i]).c_str(), target_route.links[i].unparse().c_str(), ack->_route_rate, ind_arm,
arm_this_id->_explore_ids[id].size() == 0);
// update, but reduce rate
_reduce_rate.set(arm_this_id, 1);
arm_this_id->_saturated.set(id,1);
arm_this_id->_gammas_table[target_route.links[i]].set(id, ack->_gammas[i]);
}
else {
DEBUG_CHATTER("[SrcMab] Got gamma=%s for link %s", String(ack->_gammas[i]).c_str(), target_route.links[i].unparse().c_str());
// if (!is_in_hash_table(arm_this_id->_gammas_table, target_route.links[i])) {
// arm_this_id->_gammas_table.set(target_route.links[i], HashTable<uint16_t, gamma_type>());
// }
arm_this_id->_gammas_table[target_route.links[i]].set(id, ack->_gammas[i]);
}
if(_debug && ack->_route_rate <= 1000 && ack->_gammas[i] >= int_divide(GAMMA_SCALE,10)) {
// DEBUG: check that airtimes are not anormally high, ie that interface is used in an other route
bool interface_used = false;
bool is_wifi = is_wifi_addr(target_route.links[i].src);
bool is_plc = is_plc_addr(target_route.links[i].src);
for(int ind = 0;ind < arm_this_id->_froutes.size();ind++) {
if(ind != ind_froute) {
for(int j=0;j<arm_this_id->_routes[ind].links.size();j++) {
if(is_wifi && is_wifi_addr(arm_this_id->_routes[ind].links[j].src)) {
interface_used = true;
break;
}
if(is_plc && is_plc_addr(arm_this_id->_routes[ind].links[j].src)) {
interface_used = true;
break;
}
}
}
}
if(!interface_used) {
_now.assign_now();
DEBUG_CHATTER("[SrcMab %s] WARNING: gamma on link %s is %s whereas it should not be used (rate on route %s is %d)", _now.unparse().c_str(),
target_route.links[i].unparse().c_str(), String(ack->_gammas[i]).c_str(), target_route.unparse().c_str(), ack->_route_rate);
arm_this_id->_gammas_table[target_route.links[i]].set(id, 0); // quick fix: should rather try to find why it happens
}
}
}
if(all_small) {
DEBUG_CHATTER("[SrcMab] WARNING: got invalid gammas (too small) for with rate %d on arm %d? Already removed %d.",
ack->_route_rate, ind_arm, arm_this_id->_explore_ids[id].size() == 0);
pass_update = true;
}
if(arm_this_id->_sent_rates_table[id].empty())
arm_this_id->_sent_rates_table.set(id, Vector<uint32_t>(arm_this_id->_froutes.size(), 0));
if(ind_froute < arm_this_id->_sent_rates_table[id].size())
arm_this_id->_sent_rates_table[id][ind_froute] = ack->_route_rate;
else // shoud not happen
DEBUG_CHATTER("[SrcMab %s] WARNING: ind_froute=%d, size _sent_rates_table for id %d is %d", _now.unparse().c_str(),
ind_froute, id, arm_this_id->_sent_rates_table[id].size());
// check if all acks received to update missed acks ring
bool all_ids_received = true;
for (int i=0;i<arm_this_id->_explore_ids_persistent[id].size();i++) {
if(!arm_this_id->_routes_unacked[arm_this_id->_explore_ids_persistent[id][i]].empty()) {
all_ids_received = false;
break;
}
}
if(all_ids_received) {
//arm_this_id->_missed_acks_explore.change(1);
Vector<uint16_t> ids_toremove = arm_this_id->_explore_ids_persistent[id];
for(int i=0;i<ids_toremove.size();i++) {
arm_this_id->_explore_ids_persistent.erase(ids_toremove[i]);
}
}
if(arm_this_id->_explore_ids[id].empty()) {
DEBUG_CHATTER("[SrcMab] Empty explored ids vector (removed because of saturated link)");
}
else if(!pass_update) {
DEBUG_CHATTER("[SrcMab] Received id %d, waiting for %s", id, print_vector(arm_this_id->_explore_ids[id]).c_str());
//check that all acks for exploring ids have been received
if (all_ids_received) {
if(_debug) {
_now.assign_now();
DEBUG_CHATTER("[SrcMab %s] All acks received for this trial, update best rate...", _now.unparse().c_str());
}
update_best_rate(arm_this_id, id);
}
}
else { // remove explore_ids
//arm_this_id->_missed_explore--;
Vector<uint16_t> ids_toremove = arm_this_id->_explore_ids[id];
for(int i=0;i<ids_toremove.size();i++) {
DEBUG_CHATTER("[SrcMab] Pass update and remove id %d", ids_toremove[i]);
arm_this_id->_explore_ids.erase(ids_toremove[i]);
}
if(all_ids_received) {
DEBUG_CHATTER("[SrcMab] Pass update: all ids received");
arm_this_id->_missed_acks_explore.change(1);
}
}
}
}
}
else {
if(_debug) {
_now.assign_now();
click_chatter("[SrcMab %s] Got a flow information ack from %s, route %s, received rate %d, %d flows (hash flow %u), delay %s", _now.unparse().c_str(),
ip_dst.unparse().c_str(), ack->_target_route.unparse().c_str(), ack->_route_rate, ack->_max_nb_flows, ack->_flow_hash,
(Timestamp::now_steady() - ack->_ctrl_id).unparse().c_str());
}
}
}
else
DEBUG_CHATTER("[SrcMab] WARNING: packet received is not an ack.");
p_in->kill();
}
void
SrcMab::handle_nack_order(Packet *p) {
if(!_nacks) {
p->kill();
return;
}
acne_header *acne_hdr = (acne_header *) p->data();
nack_order *nack = (nack_order *) (p->data()+sizeof(acne_header));
_now.assign_now();
MabSituation mab_sit = MabSituation(nack->flow_dst_addr,_current_hash_flow);
DEBUG_NACK_CHATTER("[SrcMab %s] Received NACK", _now.unparse().c_str());
bool send = true;
if(!_current_arm[mab_sit] || _current_arm[mab_sit]->_explore > 0) {
send = false;
if(nack->nb_nacks && _current_arm[mab_sit])
DEBUG_NACK_CHATTER("[SrcMab %s] Received a NACK during explore, do not send.", _now.unparse().c_str());
}
PacketTable *packets_table = _packets_sent[nack->flow_dst_addr];
if(packets_table) {
int count = 0;
int previous_size = packets_table->size();
for(int i=0;i<nack->nb_nacks;i++) {
if(packets_table->get(nack->nacks[i])!=0) {
if(send) {
count++;
WritablePacket *out = packets_table->get(nack->nacks[i])->clone()->uniqueify();
acne_header *acne_hdr_new = (acne_header *) (out->data());
//acne_hdr_new->_seq = getCurrentSeqNum(nack->flow_dst_addr);
//acne_hdr_new->_id_rate_mab = 0;
// send with high priority
acne_hdr_new->_type = NACK_RETRANSMISSION;
DEBUG_NACK_CHATTER("[SrcMab %s] Sending again packet with seq %u (nack %u)", _now.unparse().c_str(), acne_hdr_new->_seq, nack->nacks[i]);
packets_table->set(acne_hdr_new->_seq, out->clone()->uniqueify());
output(0).push(out);
}
//packets_table->get(nack->nacks[i])->kill();
//packets_table->erase(nack->nacks[i]);
}
else
DEBUG_NACK_CHATTER("[SrcMab %s] No packet saved for seq %u", _now.unparse().c_str(), nack->nacks[i]);
}
if(nack->last_seq != _last_seq_acked[nack->flow_dst_addr]) {
for(uint16_t i=nack->last_seq;i!=(_last_seq_acked[nack->flow_dst_addr]+1);i--) {
if(packets_table->get(i)!=0) {
VERB_DEBUG_CHATTER("[SrcMab %s] Erasing packet for seq %u", _now.unparse().c_str(), i);
packets_table->get(i)->kill();
packets_table->erase(i);
}
else
VERB_DEBUG_CHATTER("[SrcMab %s] No packet to erase for seq %u", _now.unparse().c_str(), i);
}
}
_now.assign_now();
DEBUG_NACK_CHATTER("[SrcMab %s] Received NACK message on route %s for flow to %s with %d nacks (seq %d, last seen %d). Have sent %d, size of packet table was %d, is %d",
_now.unparse().c_str(), acne_hdr->_route.unparse().c_str(), nack->flow_dst_addr.unparse().c_str(), nack->nb_nacks, nack->last_seq,
_last_seq_acked[nack->flow_dst_addr], count, previous_size, packets_table->size());
_retrans_nacks+=count;
}
else DEBUG_NACK_CHATTER("[SrcMab] No packet table for dst %s", nack->flow_dst_addr.unparse().c_str());
_last_seq_acked.set(nack->flow_dst_addr,nack->last_seq);
p->kill();
}
void
SrcMab::send_defered_packet(Packet *p) {
_defered_packet = p;
// if(!is_in_hash_table(_time_defered_packet, _current_dst))
// _time_defered_packet.set(_current_dst, HashTable<uint16_t,Timestamp>());
_time_defered_packet[_current_dst].set(_currentId[_current_dst], Timestamp::now());
_defered_packet_timer.schedule_after(Timestamp(0,20*Timestamp::subsec_per_msec));
if(_acne_source_elmt)
_acne_source_elmt->set_rate(0);
_stop_sending = true;
_last_packet_marked = true;
// note: we assume here that when exploring, packets are sent on only one route.
if(_debug) {
_now.assign_now();
click_chatter("[SrcMab %s] Mark last packet for id %d (have killed: stop=%s, mark=%s, too much=%s (enq %s)), will be sent at %s", _now.unparse().c_str(),
_currentId[_current_dst], String(_killed_stop).c_str(), String(_killed_mark).c_str(), String(_killed_too_much).c_str(),
String(_killed_too_much_enq).c_str(),_defered_packet_timer.expiry().unparse().c_str());
}
_send_packets_timer.unschedule();
}
void SrcMab::run_timer(Timer *timer){
VERB_DEBUG_CHATTER("[SrcMab %s] Run timer %p", Timestamp::now().unparse().c_str(), timer);
if(!_active)
return;
_now.assign_now();
MabSituation mab_sit = MabSituation(_current_dst,_current_hash_flow);
MabArmInfo *current_arm = _current_arm[mab_sit];
if(timer == &_defered_packet_timer) {
_trial_timer.unschedule(); // trial timer has been scheduled in order to fire if no packet received
_trial_timer.schedule_after(_small_delta/2); //wait small time so that packets are transmitted along route
// be notified by Measurements if channel empty
if(_measurements_plc && _measurements_wifi) {
_notified_plc=false;
_notified_wifi=false;
_measurements_plc->notify_empty_channel(_freq_check_meas, this); // check regularly
_measurements_wifi->notify_empty_channel(_freq_check_meas, this);
}
if(_defered_packet) {
if(_debug) {
click_chatter("[SrcMab %s] Send defered packet for id %d, trial timer scheduled at %s (queue %d, dummy %d)", _now.unparse().c_str(),
_currentId[_current_dst], _trial_timer.expiry().unparse().c_str(), size(), _dummy_packet_sent);
}
output(0).push(_defered_packet->clone()->uniqueify());
_defered_packet->kill();
_defered_packet = 0;
}
else {
DEBUG_CHATTER("[SrcMab %s] WARNING: defered packet is 0.", _now.unparse().c_str());
}
return;
}
if(timer == &_active_timer) {
if((_now - _last_active).sec() >= _clear_after && _timeout_bool && _multipaths.size() > 0) {
DEBUG_CHATTER("[SrcMab %s] %d seconds inactive, clearing", _now.unparse().c_str(), _clear_after);
clear();
}
_active_timer.schedule_after_sec(_clear_after);
return;
}
if(timer == &_dequeue_timer) {
// dequeue packets that could have been queued
if(!current_arm)
return;
if(_stop_sending)
return;
if(current_arm->_explore == 1) // silent slot
return;
bool all_dequeued = true;
if(_priority_queue.get(_current_dst)) {
// first try priority packets (TCP retransmissions)
int s_prio = _priority_queue.get(_current_dst)->size();
int count = 0;
while(_priority_queue.get(_current_dst)->size() > 0) {
count++;
VERB_DEBUG_CHATTER("[SrcMab %s] Trying to send TCP retransmission in timer (%d/%d)...", Timestamp::now().unparse().c_str(),
count, s_prio);
if(can_send(_current_dst)) {
_retrans_tries++;
Packet *p = _priority_queue.get(_current_dst)->deq();
if(!handle_traffic_packet(p, false, false, true)) { // should not happen (size checked before)
assert(p);
_priority_queue.get(_current_dst)->enq(p);
DEBUG_CHATTER("[SrcMab %s] Cannot send, re-enqueue, break...", Timestamp::now().unparse().c_str());
all_dequeued = false;
break;
}
else
_retrans_sent++;
}
else {
VERB_DEBUG_CHATTER("[SrcMab %s] Cannot send, break...", Timestamp::now().unparse().c_str());
all_dequeued = false;
break;
}
if(_mark_last_packet && count == 1) {
all_dequeued=false;
break;
}
}
}
int s = size();
if(all_dequeued) {
_now.assign_now();
if(s>0) {
int count = 0;
for(int i=0;i<s;i++) {
count++;
VERB_DEBUG_CHATTER("[SrcMab %s] Dequeueing packet %d/%d in dequeue timer", _now.unparse().c_str(), count,s);
if(_dequeue_first[_current_dst])
handle_traffic_packet(deq(), true);
else {
if(!can_send(_current_dst) || !handle_traffic_packet(deq(), true)) {
all_dequeued = false;
break;
}
}
if(_mark_last_packet && count == 1)
break;
}
VERB_DEBUG_CHATTER("[SrcMab %s] Dequeuing %d packets in timer (dequeue_first=%d), have sent %d...",
_now.unparse().c_str(), s, _dequeue_first[_current_dst], count);
}
}
if(!_mark_last_packet) {
if(!all_dequeued) {
// take size of dummy packet as reference to compute rate
_dequeue_timer.schedule_after_msec(mymax(1u,int_divide(1000*_dummy_packet[_current_dst]->length(), current_arm->_current_total_rate)));
}
else
_dequeue_timer.schedule_after_msec(50);
if(current_arm->_explore > 1 && s==0)
_send_packets_timer.schedule_now();
}
}
if(timer == &_send_packets_timer) {
// in exploration mode, send dummy packets if none are being sent (e.g. TCP)
if(!current_arm || current_arm->_explore <= 1)
return;
if(_stop_sending)
return;
if(size() > 0) {
_send_packets_timer.schedule_after_msec(5);
}
//uint32_t rate = mymin(current_arm->_current_total_rate, 3000000u); // send at most 3 MB/s of dummy traffic
if(current_arm->_current_total_rate > 0) {
current_arm->_tb.refill();
uint32_t packet_per_sec = current_arm->_current_total_rate/_dummy_packet[_current_dst]->length();
uint32_t freq_us = 1000000/packet_per_sec;
Timestamp ts = Timestamp(0,freq_us*Timestamp::subsec_per_usec);
if(current_arm->_tb.size() >= mymin(current_arm->_tb.capacity(), 5*_dummy_packet[_current_dst]->length()) || (_mark_last_packet && !_last_packet_marked)) {
// TokenBucket is not empty: no packet is sent, send one
VERB_DEBUG_CHATTER("[SrcMab %s] Send dummy packet of size %d", _now.unparse().c_str(), _dummy_packet[_current_dst]->length());
send_packet(_dummy_packet[_current_dst]->clone()->uniqueify(), false, true);
_dummy_packet_sent+=_dummy_packet[_current_dst]->length();
current_arm->_tb.remove(_dummy_packet[_current_dst]->length());
_send_packets_timer.schedule_after(ts);
}
else {
_send_packets_timer.schedule_after(5*ts);
}
}
}
if(timer == &_trial_timer) {
DEBUG_CHATTER("[SrcMab %s] Run trial timer (expiry %s, queue %d, dummy %d)...", _now.unparse().c_str(),
_trial_timer.expiry().unparse().c_str(), size(), _dummy_packet_sent);
if(_measurements_plc && _measurements_wifi) {
_measurements_plc->stop_notify_empty_channel();
_measurements_wifi->stop_notify_empty_channel();
}
if(_first_trial[_current_dst] != NUMBER_FIRST_TRIALS && current_arm == 0) {
DEBUG_CHATTER("[SrcMab %s] WARNING: %s is not in arms table.", _now.unparse().c_str(), mab_sit.unparse().c_str());
return;
}
uint16_t old_id = _currentId[_current_dst];
String old_rate_str;
if(_first_trial[_current_dst] != NUMBER_FIRST_TRIALS)
old_rate_str = print_vector(current_arm->_current_sending_rate);
if(_first_trial[_current_dst] != NUMBER_FIRST_TRIALS && _mark_last_packet == false && _end_packets_sent == false && current_arm->_explore > 1 && !_new_explore) {
_mark_last_packet = true;
DEBUG_CHATTER("[SrcMab %s] Trial with id %d done (rate was %s), mark last packet.",
_now.unparse().c_str(), old_id, old_rate_str.c_str());
_dequeue_timer.schedule_now();
_send_packets_timer.schedule_after_msec(_small_delta.msecval()/10);
_trial_timer.schedule_after_msec(_small_delta.msecval()); //schedule in case no packet received (should not happen: dummy packet should be sent)
return;
}
VERB_DEBUG_CHATTER("[SrcMab] After trial done");
Timestamp small_delta_ctrl = Timestamp(0,20*Timestamp::subsec_per_msec); // 20 ms
if(_new_explore) {
if(_measurements_plc && _measurements_wifi) {
_measurements_plc->stop_notify_empty_channel();
_measurements_wifi->stop_notify_empty_channel();
}
_new_explore = false;
// wait small time before transmitting regular traffic so that begin packets are transmitted along route before beginning transmission
_now.assign_now();
_can_send_at = (_now + small_delta_ctrl);
if(current_arm->_explore > 1)
_trial_timer.schedule_after_msec(_trial_duration/2 + small_delta_ctrl.msecval());
else // silent slot
_trial_timer.schedule_after_msec(_silent_slot + small_delta_ctrl.msecval());
send_control_packets(MAB_CTRL_BEGIN);
new_trial();
return;
}
VERB_DEBUG_CHATTER("[SrcMab] After BEGIN control");
if(_mark_last_packet) {
if(!_last_packet_marked)
DEBUG_CHATTER("[SrcMab %s] WARNING: last packet has not been sent.", _now.unparse().c_str());
// end of waiting time, should trigger computation of busy time, send high priority packets
_mark_last_packet = false;
_last_packet_marked = false;
_end_packets_sent = true;
// wait a bit to ensure END packets transmitted along route
_trial_timer.schedule_after(small_delta_ctrl);
send_control_packets(MAB_CTRL_END);
return;
}
VERB_DEBUG_CHATTER("[SrcMab] After mark last packet");
_end_packets_sent = false;
Vector<uint32_t> sent = Vector<uint32_t>();
uint32_t total_sent = 0;
Timestamp time_begin;
if(_first_trial[_current_dst] != NUMBER_FIRST_TRIALS && _debug) {
for(int i=0;i<current_arm->_current_sending_rate.size();i++) {
if(current_arm->_current_sending_rate[i].rate == 0) {
if(current_arm->_current_sending_rate[i].sent != 0)
DEBUG_CHATTER("[SrcMab %s] WARNING: have sent %d on route %s whereas should have sent 0",
_now.unparse().c_str(), current_arm->_current_sending_rate[i].sent, current_arm->_froutes[i].unparse().c_str());
sent.push_back(0);
}
else {
if(current_arm->_explore > 0) {
time_begin = _time_defered_packet[_current_dst][_currentId[_current_dst]];
}
if(time_begin == Timestamp())
time_begin = Timestamp::now();
Timestamp diff_ts = (time_begin-current_arm->_current_sending_rate[i].last);
int diff_msec = diff_ts.msecval();
if(diff_msec > 0) {
uint32_t sent_loc = int_divide(current_arm->_current_sending_rate[i].sent,diff_msec)*1000;
total_sent += sent_loc;
sent.push_back(sent_loc);
}
else {
DEBUG_CHATTER("[SrcMab %s] WARNING: negative timestamp difference %s (%s - %s)", _now.unparse().c_str(), diff_ts.unparse().c_str(),
time_begin.unparse().c_str(), current_arm->_current_sending_rate[i].last.unparse().c_str());
sent.push_back(0);
}
}
}
}
VERB_DEBUG_CHATTER("[SrcMab] After measure sent, current arm %p", current_arm);
// begin with traffic to initiate the flow (-> do not explore if _first_trial is positive)
if ((!_is_tcp[_current_dst] || _first_trial[_current_dst] == 0) && _first_trial[_current_dst] != NUMBER_FIRST_TRIALS && current_arm->_explore == 1) {
// silent slot done: send rate on one route
VERB_DEBUG_CHATTER("[SrcMab] Silent slot done");
if(_silent_slot) {
send_control_packets(MAB_CTRL_END);
}
VERB_DEBUG_CHATTER("[SrcMab] After END control");
current_arm->_explore = 2;
// it is a new rate vector: set new ID
_currentId[_current_dst] = (_currentId[_current_dst] % MAX_SEQ) + 1;
for(int i=0;i<current_arm->_froutes.size();i++)
current_arm->_routes_unacked[_currentId[_current_dst]].set(current_arm->_froutes[i], 1);
_rateId2MabArmInfo[_current_dst].set(_currentId[_current_dst], current_arm);
Vector<uint16_t> explore_ids = Vector<uint16_t> (1, _currentId[_current_dst]);
if(current_arm->_routes.size() > 1) {
uint16_t next_id = (_currentId[_current_dst] % MAX_SEQ) + 1;
explore_ids.push_back(next_id);
for(int i=0;i<current_arm->_froutes.size();i++)
current_arm->_routes_unacked[next_id].set(current_arm->_froutes[i], 1);
}
for(int i=0;i<explore_ids.size();i++) {
current_arm->_explore_ids.set(explore_ids[i], explore_ids);
current_arm->_explore_ids_persistent.set(explore_ids[i], explore_ids);
}
String rate_str;
// set first route's rate to 2/3 of best, second route's rate to 0 (if two routes)
if(current_arm->_current_best_rate.size() > 0)
rate_str = "best_rate_mab";
else
rate_str = "route_rate";
current_arm->_current_sending_rate[0] = FlowInfoLB(mymax((1000-_delta)*int_divide(current_arm->_current_best_rate_alone[0],1000), MIN_SENDING_RATE), _now);
if(current_arm->_froutes.size() == 2)
current_arm->_current_sending_rate[1].set_rate(0);
String reduced_rate;
if(_reduce_rate[current_arm] > 0) {
_reduce_rate[current_arm] = 2;
reduced_rate = " (reduced rate)";
current_arm->_current_sending_rate[0].set_rate(int_divide(current_arm->_current_sending_rate[0].rate,2));
}
VERB_DEBUG_CHATTER("[SrcMab] After rates for first trial");
StringAccum s;
s << "[SrcMab "<< _now.unparse() << "] Silent slot done for routes "<< print_vector(current_arm->_froutes) << " (arm "<< _ind_arm << ").";
s << " Rate was " << old_rate_str << " (have sent at rate " << print_vector(sent) << " B/s, total " << total_sent << ", " << String(_sent_dequeued) <<" B dequeued), ";
s << " Queue size is now " << size() << "; ";
s << "have killed: stop=" << _killed_stop << " B, mark=" << _killed_mark << " B, too much=" << _killed_too_much << " B (enq " << _killed_too_much_enq << "),";
s << "is now " << print_vector(current_arm->_current_sending_rate) << reduced_rate << ", new id " << _currentId[_current_dst] << ".";
DEBUG_CHATTER(s.take_string().c_str());
}
else if ((!_is_tcp[_current_dst] || _first_trial[_current_dst] == 0) && _first_trial[_current_dst] != NUMBER_FIRST_TRIALS && current_arm->_explore == 2) {
// half of exploration trial: if more than one route, change rate
VERB_DEBUG_CHATTER("[SrcMab] Half of exploration trial");
if(current_arm->_routes.size() == 1) {
// only one route, so only one probe during explore time: send on best route if it exists
VERB_DEBUG_CHATTER("[SrcMab] One route: exploit for second half");
FormatedRoute old_route = current_arm->_froutes[0];
_current_arm[mab_sit] = _best_arm[mab_sit];
current_arm = _current_arm[mab_sit];
current_arm->_explore = 0;
current_arm->_current_sending_rate = rate_vector_to_flow_info_vector(current_arm->_current_best_rate, 1000-_delta);
if(current_arm->_current_sending_rate.size() != current_arm->_current_sent_rate.size()) {
// should not happen
_now.assign_now();
DEBUG_CHATTER("[SrcMab %s] WARNING: size of _current_sending_rate is %d (size of _current_best_rate %d), size of _current_sent_rate %d.",
_now.unparse().c_str(), current_arm->_current_sending_rate.size(),
current_arm->_current_best_rate.size(), current_arm->_current_sent_rate.size());
current_arm->_current_sent_rate = Vector<FlowInfoLB>(current_arm->_current_sending_rate.size(),FlowInfoLB(_now));
}
VERB_DEBUG_CHATTER("[SrcMab] After rates for second half, exploit");
_last_explore.assign_now();
if(_debug) {
click_chatter("[SrcMab %s] Half of trial %d, id %s, route %s. Rate was %s, have sent at rates %s B/s, total %d, %s B sent dequeued; have killed stop=%s B, mark=%s B, too much=%s B (enq %s); have sent %d B dummy packets.",
_now.unparse().c_str(), _nr_trials[_current_dst], String(old_id).c_str(), old_route.unparse().c_str(), old_rate_str.c_str(),
print_vector(sent).c_str(), (int) total_sent, String(_sent_dequeued).c_str(), String(_killed_stop).c_str(), String(_killed_mark).c_str(), String(_killed_too_much).c_str(),
String(_killed_too_much_enq).c_str(), _dummy_packet_sent);
click_chatter("\t\tExploit arm %d (routes %s) at rate %s. Size is %d, TCP retransmissions %d/%d", _best_arm_id[mab_sit], print_vector(current_arm->_froutes).c_str(),
print_vector(current_arm->_current_sending_rate).c_str(), size(), _retrans_sent, _retrans_tries);
}
_trial_timer.schedule_after_msec(_trial_duration/2);
}
else if(current_arm->_routes.size() == 2) {
VERB_DEBUG_CHATTER("[SrcMab] Two routes");
current_arm->_explore = 3;
_currentId[_current_dst] = (_currentId[_current_dst] % MAX_SEQ) + 1;
_rateId2MabArmInfo[_current_dst].set(_currentId[_current_dst], current_arm);
// set first route's rate to 0, second route's rate tobest
current_arm->_current_sending_rate[1] = FlowInfoLB(mymax((1000-_delta)*int_divide(current_arm->_current_best_rate_alone[1],1000), MIN_SENDING_RATE), _now);
current_arm->_current_sending_rate[0].set_rate(0);
String reduced_rate;
if(_reduce_rate[current_arm] > 0) {
reduced_rate = " (reduced rate)";
current_arm->_current_sending_rate[1].set_rate(int_divide(current_arm->_current_sending_rate[1].rate,2));
}
VERB_DEBUG_CHATTER("[SrcMab] After rates for second half, explore");
StringAccum s;
s << "[SrcMab " << _now.unparse() << "] First half of exploration trial done for routes " << print_vector(current_arm->_froutes);
s << " (arm " << _ind_arm << "), id " << old_id << ". Rate was " << old_rate_str << "(have sent at rate " << print_vector(sent);
s << " B/s (total " << total_sent << "), have killed stop=" << String(_killed_stop) << "B, mark=" << String(_killed_mark);
s << "B, too much=" << String(_killed_too_much) << " B (enq " << _killed_too_much_enq << "); have sent " << String(_dummy_packet_sent) << " B dummy packets, " << String(_sent_dequeued) <<" B dequeued.";
s << " Is now " << print_vector(current_arm->_current_sending_rate) << reduced_rate << ", new id " << _currentId[_current_dst] << ".\n";
s << "Size is " << size() << ", TCP retransmissions ";
s << _retrans_sent << "/" << _retrans_tries;
DEBUG_CHATTER(s.take_string().c_str());
for(int i=0;i<current_arm->_froutes.size();i++)
current_arm->_routes_unacked[_currentId[_current_dst]].set(current_arm->_froutes[i], 1);
}
_reduce_rate[current_arm] = 0;
}
else {
// end of trial: new trial
VERB_DEBUG_CHATTER("[SrcMab] New trial");
if(current_arm && current_arm->_explore==3) // was exploring
_last_explore.assign_now();
// check if need to recompute routes
if(_freq_computed_routes > 0 && (_now-_last_computed_routes).sec() >= _freq_computed_routes) {
compute_new_routes(_current_dst);
}
VERB_DEBUG_CHATTER("[SrcMab] After compute routes");
if(_first_trial[_current_dst] != NUMBER_FIRST_TRIALS) {
if(current_arm->_explore == 0 && _acne_source_elmt)
_acne_source_elmt->set_rate(0);
}
//MabArmInfo *previous_arm = current_arm;
_stop_sending = true;
int prob_explore = -1;
VERB_DEBUG_CHATTER("[SrcMab] Check probas");
// if TCP, do not explore more than once per second to avoid stopping traffic too long, and begin with traffic to initiate the flow
if(_is_tcp[_current_dst] && (_first_trial[_current_dst] > 0 || (_now-_last_explore).sec()==0 ))
prob_explore = 0;
if(_stop_explore)
prob_explore = 0;
if(prob_explore == -1) {
// pick arm
VERB_DEBUG_CHATTER("[SrcMab] Pick arm");
_ind_arm = pick_arm_mab();
_current_arm.set(mab_sit, _arms[mab_sit][_ind_arm]);
current_arm = _current_arm[mab_sit];
VERB_DEBUG_CHATTER("[SrcMab] Choose explore/exploit");
// choose if send or explore
if(current_arm->_num_explore > 0) {
#ifdef CLICK_USERLEVEL
prob_explore = 500000/pow(current_arm->_num_explore, _lambda_mab);
#else
prob_explore = int_divide(500000,current_arm->_num_explore);
#endif
}
else
prob_explore = 1000000;
prob_explore = mymax(_min_proba,mymax(current_arm->_min_proba, prob_explore));
}
if(_first_trial[_current_dst] == NUMBER_FIRST_TRIALS) {
_dequeue_first[_current_dst] = true; // dequeue packets enqueued before first trial
prob_explore = 0; // do not explore first trial
}
int rand;
if(_deterministic_explore && current_arm) {
if(prob_explore==0 || prob_explore == 1000000) {
rand=1;
DEBUG_CHATTER("[SrcMab] Explore probability is %d, rand=%d",
prob_explore, rand);
}
else {
rand=1000000*(current_arm->_nb_cons_picked < 1000000/prob_explore);
DEBUG_CHATTER("[SrcMab] Explore probability is %d, should explore every %d, has been picked %d: rand=%d",
prob_explore, 1000000/prob_explore, current_arm->_nb_cons_picked, rand);
}
}
else {
rand = click_random(0,1000000);
DEBUG_CHATTER("[SrcMab] Random value is %d, explore probability is %d", rand, prob_explore);
}
if (rand <= prob_explore) {
current_arm->_nb_cons_picked=1;
//send_control_packets(TRIGGER_CLEAR, previous_arm);
current_arm->_missed_explore++;
current_arm->_missed_acks_explore.add(0);
_last_explores[mab_sit].add(_ind_arm);
// explore: begin with empty slot
current_arm->_explore = 1;
_current_capacity = _max_capacity;
for(int i=0;i<current_arm->_current_sending_rate.size();i++)
current_arm->_current_sending_rate[i] = FlowInfoLB(0,_now);
DEBUG_CHATTER("[SrcMab %s] End of trial %d, last seq id %d (rate was %s, have sent at rate %s B/s (total %d), have killed stop=%s B, mark=%s B, too much=%s B (enq %s); have sent %d B dummy packets, %s B dequeued, %d retransmissions)...",
_now.unparse().c_str(), _nr_trials[_current_dst], _seqNumTable[_current_dst], old_rate_str.c_str(), print_vector(sent).c_str(), (int) total_sent,
String(_killed_stop).c_str(), String(_killed_mark).c_str(), String(_killed_too_much).c_str(), String(_killed_too_much_enq).c_str(),
_dummy_packet_sent, String(_sent_dequeued).c_str(), _retrans_nacks);
int size_q = 0;
if(_priority_queue[_current_dst])
size_q = _priority_queue[_current_dst]->size();
DEBUG_CHATTER("\t\tExplore arm %d (routes %s), new id %d (explore %d, missed %d). Size is %d, TCP retransmissions %d/%d, current TCP seq %u, size TCP queue %d",
_ind_arm, print_vector(current_arm->_froutes).c_str(), _currentId[_current_dst], current_arm->_num_explore, current_arm->_missed_explore,
size(), _retrans_sent, _retrans_tries, (uint32_t)_current_tcp_seq[_current_dst], size_q);
if(!_silent_slot) {
_trial_timer.schedule_now();
return;
}
}
else {
// exploit
if(current_arm) // increase for picked arm
current_arm->_nb_cons_picked++;
_current_arm[mab_sit] = _best_arm[mab_sit];
current_arm = _current_arm[mab_sit];
current_arm->_explore = 0;
if(_first_trial[_current_dst] == NUMBER_FIRST_TRIALS) // very first trial: send at low rate in order to get number of flows
for(int i=0;i<current_arm->_current_sending_rate.size();i++)
current_arm->_current_sending_rate[i] = FlowInfoLB(100000,_now);
else
current_arm->_current_sending_rate = rate_vector_to_flow_info_vector(current_arm->_current_best_rate, 1000-_delta);
// do not send at rate less than 100kBps (does not improve much and harms reordering)
for(int i=0;i<current_arm->_current_sending_rate.size();i++) {
if(current_arm->_current_sending_rate[i].rate < 100)
current_arm->_current_sending_rate[i].rate = 0;
}
if(current_arm->_current_sending_rate.size() != current_arm->_current_sent_rate.size()) {
// should not happen
_now.assign_now();
DEBUG_CHATTER("[SrcMab %s] WARNING: size of _current_sending_rate is %d (size of _current_best_rate %d), size of _current_sent_rate %d.",
_now.unparse().c_str(), current_arm->_current_sending_rate.size(),
current_arm->_current_best_rate.size(), current_arm->_current_sent_rate.size());
current_arm->_current_sent_rate = Vector<FlowInfoLB>(current_arm->_current_sending_rate.size(),FlowInfoLB(_now));
}
if(_debug) {
if(_first_trial[_current_dst] == NUMBER_FIRST_TRIALS) {
click_chatter("[SrcMab %s] First trial: exploit arm %d (routes %s) at rate %s.", _now.unparse().c_str(),
_best_arm_id[mab_sit], print_vector(current_arm->_froutes).c_str(),
print_vector(current_arm->_current_sending_rate).c_str());
}
else {
click_chatter("[SrcMab %s] End of trial %d, last seq id %d. Rate was %s, have sent at rates %s B/s, total %d; have killed stop=%s B, mark=%s B, too much=%s B (enq %s); have sent %d B dummy packets, %s B sent dequeued, %d retransmissions.",
_now.unparse().c_str(), _nr_trials[_current_dst], _seqNumTable[_current_dst], old_rate_str.c_str(), print_vector(sent).c_str(), (int) total_sent,
String(_killed_stop).c_str(), String(_killed_mark).c_str(), String(_killed_too_much).c_str(), String(_killed_too_much_enq).c_str(),
_dummy_packet_sent, String(_sent_dequeued).c_str(), _retrans_nacks);
int size_q = 0;
if(_priority_queue[_current_dst])
size_q = _priority_queue[_current_dst]->size();
click_chatter("\t\tExploit arm %d (routes %s) at rate %s (explore proba %s). Size is %d, TCP retransmissions %d/%d, current TCP seq %u, size TCP queue %d",
_best_arm_id[mab_sit], print_vector(current_arm->_froutes).c_str(), print_vector(current_arm->_current_sending_rate).c_str(),
String(prob_explore).c_str(), size(), _retrans_sent, _retrans_tries, (uint32_t)_current_tcp_seq[_current_dst], size_q);
}
}
_trial_timer.schedule_after_msec(_trial_duration);
}
_nr_trials[_current_dst]++;
}
VERB_DEBUG_CHATTER("[SrcMab] Decrease _first_trial");
if(_first_trial[_current_dst] > 0)
_first_trial[_current_dst]--;
// if we explore, wait small delta before sending initial packet to trigger reset of measurements
if(current_arm->_explore > 0) {
_new_explore = true;
if(current_arm->_explore == 2 && (_silent_slot || _nr_trials[_current_dst] == 0))
_trial_timer.schedule_after(_small_delta/5); // if no message sent before (silent slot or first trial), does not need to wait as long (no packet sent)
else
_trial_timer.schedule_after(_small_delta);
// be notified by Measurements if channel empty
if(_measurements_plc && _measurements_wifi) {
_notified_plc=false;
_notified_wifi=false;
_measurements_plc->notify_empty_channel(20, this); // check every 10 ms
_measurements_wifi->notify_empty_channel(20, this);
}
return;
}
else {
// send flow information
send_control_packets(FLOW_INFORMATION);
}
VERB_DEBUG_CHATTER("[SrcMab] New trial");
_can_send_at.assign_now();
new_trial();
}
}
void
SrcMab::new_trial() {
_now.assign_now();
uint32_t total_rate = 0;
MabArmInfo *current_arm = _current_arm[MabSituation(_current_dst,_current_hash_flow)];
for (int i=0;i<current_arm->_current_sending_rate.size();i++) {
current_arm->_current_sending_rate[i].reinitialize(_can_send_at);
total_rate += current_arm->_current_sending_rate[i].rate;
}
if(current_arm->_explore == 1) // silent slot
_stop_sending = true;
else
_stop_sending = false;
if(_acne_source_elmt != 0)
_acne_source_elmt->set_rate(int_divide(ACNE_SOURCE_FACTOR*total_rate,10));
if(_rate_shaper_elmt) {
String rate = String(int_divide(ACNE_SOURCE_FACTOR*total_rate,10))+String("Bps");
reconfigure_keyword_handler(rate, _rate_shaper_elmt, (void *) "0 RATE", ErrorHandler::default_handler());
}
current_arm->_current_total_rate = total_rate;
if(total_rate == 0) // if silent slot: send at 100kB/s, enqueue packets instead of killing them (stop_sending is true)
current_arm->_tb.assign_adjust(100000, 10000);
else {
if(current_arm->_explore == 0) {
current_arm->_tb.assign(total_rate, mymax(total_rate/_tb_capacity_factor,10000u));
}
else
current_arm->_tb.assign_adjust(total_rate, mymax(total_rate/_tb_capacity_factor,10000u));
}
_killed_mark = 0;
_killed_stop = 0;
_killed_too_much = 0;
_killed_too_much_enq = 0;
_dummy_packet_sent = 0;
_sent_dequeued = 0;
_retrans_nacks = 0;
_retrans_sent = 0;
_retrans_tries = 0;
_info_to_use[_current_dst] = 0;
_count_packets[_current_dst] = 0;
if(current_arm->_explore != 1) { // do not dequeue for silent slot
if(_can_send_at > _now)
_dequeue_timer.schedule_at(_can_send_at);
else
_dequeue_timer.schedule_now();
}
DEBUG_CHATTER("[SrcMab %s] New trial with sending_rate of size %d, total %d. Can send at %s, expire at %s (queue %d, dummy %d)",
_now.unparse().c_str(), current_arm->_current_sending_rate.size(),
total_rate, _can_send_at.unparse().c_str(), _trial_timer.expiry().unparse().c_str(), size(), _dummy_packet_sent);
if(_debug_explore) {
if(current_arm->_explore > 0)
_verb_debug = true;
else
_verb_debug = false;
}
if(current_arm->_explore == 0)
_current_capacity = _max_capacity_exploit; // smaller capacity of queue
else
_current_capacity = _max_capacity;
}
void
SrcMab::send_control_packets(uint8_t type) {
send_control_packets(type, _current_arm[MabSituation(_current_dst,_current_hash_flow)]);
}
void
SrcMab::send_control_packets(uint8_t type, MabArmInfo *arm) {
if(arm != 0) {
for(int i=0;i<arm->_froutes.size();i++) {
_now.assign_now();
VERB_DEBUG_CHATTER("[SrcMab] Send control packets, i=%d/%d", i, arm->_froutes.size());
// check for missing acks
Vector<uint16_t> missing_acks;
if(type != FLOW_INFORMATION && type != TRIGGER_CLEAR) {
for(HashTable<uint16_t, HashTable<FormatedRoute,int> >::iterator it = arm->_routes_unacked.begin();
it!=arm->_routes_unacked.end();it++) {
if(it.key() < (_currentId[_current_dst]-2)) { //TODO: what if current has reached MAX_SEQ?
if(it.value().get(arm->_froutes[i])) {
if(missing_acks.size() < 50) {
VERB_DEBUG_CHATTER("[SrcMab] Missing id %d on route %s", it.key(), arm->_froutes[i].unparse().c_str());
missing_acks.push_back(it.key());
}
else
break;
}
}
}
}
WritablePacket *out = Packet::make(100, 0, sizeof(acne_header) + sizeof(mab_control) + sizeof(uint16_t)*missing_acks.size(), 20);
if(!out) {
DEBUG_CHATTER("[SrcMab] Error : can't add space for MAB control.");
return;
}
out->set_timestamp_anno(_now);
acne_header *acne_hdr = (acne_header *) out->data();
mab_control *mab_ctrl = (mab_control *) (out->data() + sizeof(acne_header));
memset(out->data(), 0, out->length());
acne_hdr->_route = arm->_froutes[i];
acne_hdr->_hop = 0;
acne_hdr->_type = MAB_CONTROL;
acne_hdr->_seq = 0;
if (type == FLOW_INFORMATION || type == TRIGGER_CLEAR || arm->_explore == 1)
acne_hdr->_id_rate_mab = 0; // to indicate empty slot or flow information
else
acne_hdr->_id_rate_mab = _currentId[_current_dst];
mab_ctrl->_max_nb_flows = 0u;
mab_ctrl->_ts.assign_now_steady();
/*
if(type == MAB_CTRL_BEGIN) {
mab_ctrl->_ts+=(_can_send_at-_now);
}
else if(type == MAB_CTRL_END) {
if(is_in_hash_table(_time_defered_packet, _current_dst) && is_in_hash_table(_time_defered_packet[_current_dst], _currentId[_current_dst]))
mab_ctrl->_ts = _time_defered_packet[_current_dst][_currentId[_current_dst]];
}
*/
mab_ctrl->_flow_src = _routing->my_ip();
mab_ctrl->_flow_dst = _current_dst;
mab_ctrl->_first_message = (_first_trial[_current_dst] == NUMBER_FIRST_TRIALS);
mab_ctrl->_type = type;
if(type != FLOW_INFORMATION && type != TRIGGER_CLEAR) {
mab_ctrl->_nb_missing_acks = (uint8_t) missing_acks.size();
for(int i=0;i<missing_acks.size();i++) {
mab_ctrl->_missing_acks[i] = missing_acks[i];
}
}
DEBUG_CHATTER("[SrcMab %s] Sending %d control packets on route %s of type %d for id %d (timestamp %s) for flow from %s to %s with %d missing acks... Timer scheduled at %s (queue %d, dummy %d)",
_now.unparse().c_str(), _burst_ctrl, arm->_froutes[i].unparse().c_str(), type, acne_hdr->_id_rate_mab, mab_ctrl->_ts.unparse().c_str(),
mab_ctrl->_flow_src.unparse().c_str(),mab_ctrl->_flow_dst.unparse().c_str(), missing_acks.size(), _trial_timer.expiry().unparse().c_str(), size(), _dummy_packet_sent);
for(int i=0;i<_burst_ctrl;i++) {
output(0).push(out->clone()->uniqueify());
if(i==0 && type == FLOW_INFORMATION)
break;
}
out->kill();
}
}
else // should not happen
DEBUG_CHATTER("[SrcMab] WARNING: arm is null");
}
/**
* Sequence number mechanics
*/
uint16_t
SrcMab::getCurrentSeqNum(IPAddress const& addr){
if(_seqNumTable[addr]==(MAX_SEQ-1)) {
_seqNumTable[addr] = 0;
}
else
_seqNumTable[addr]=_seqNumTable[addr]+1;
return _seqNumTable[addr];
}
int
SrcMab::pick_arm_mab() {
if(_debug)
_now.assign_now();
VERB_DEBUG_CHATTER("[SrcMab] Checking non explored arms");
Vector<int> non_explored_arms;
MabSituation mab_sit = MabSituation(_current_dst,_current_hash_flow);
int sum_explores = 0;
Vector<int> explores = Vector<int>(_arms[mab_sit].size(),0);
for(int i=0;i<_arms[mab_sit].size();i++) {
explores[i] = _last_explores[mab_sit].size_equal(i);
sum_explores+=explores[i];
if(explores[i] == 0) {
// do not select each time an arm that has missed explore, or might stay stuck in a wrong arm
if(_arms[mab_sit][i]->_missed_explore <= 2 || click_random(0,2*_arms[mab_sit][i]->_missed_explore) == 0) {
DEBUG_CHATTER("[SrcMab %s] Arm %d has not been explored (missed %d)",
_now.unparse().c_str(), i, _arms[mab_sit][i]->_missed_explore);
non_explored_arms.push_back(i);
}
}
}
VERB_DEBUG_CHATTER("[SrcMab] There are %d non explored arms", non_explored_arms.size());
DEBUG_CHATTER("[SrcMab] explore array %s, ring %s", print_vector(explores).c_str(), _last_explores[mab_sit].unparse().c_str());
if(non_explored_arms.empty()) {
// pick arm according to UCB
int best_i_ucb = 0;
int best_i_rate = -1;
uint32_t max_ucb = 0;
uint32_t max_rate = 0;
Vector<int> do_not_pick;
for(int i=0;i<_arms[mab_sit].size();i++) {
if(explores[i] == 0 ||
click_random(0,mymax(1,_arms[mab_sit][i]->_missed_explore)) > 2*(_arms[mab_sit][i]->_num_explore+1)) {
DEBUG_CHATTER("[SrcMab %s] Do not pick arm %d (explored %d, missed %d)",
_now.unparse().c_str(), i, _arms[mab_sit][i]->_num_explore, _arms[mab_sit][i]->_missed_explore);
do_not_pick.push_back(i);
continue;
}
if(_arms[mab_sit][i]->_current_best_rate_total > _arms[mab_sit][i]->_scaling_rate)
DEBUG_CHATTER("[SrcMab] WARNING: rate is %d, scaling %d", _arms[mab_sit][i]->_current_best_rate_total, _arms[mab_sit][i]->_scaling_rate);
// compute UCB as: scale*sqrt(2*log(trials)/exploration) = sqrt(scale^2*log(trials^2)/exploration) [use log(x^2) = 2log(x) to increase precision of integer computation]
uint32_t ucb_factor = (uint32_t) int_sqrt(int_divide(power(_arms[mab_sit][i]->_scaling_rate,2)*((uint64_t) logE(power(sum_explores,2))),explores[i]));
uint32_t ucb = _arms[mab_sit][i]->_current_best_rate_total + ucb_factor;
String missed;
int sum_ok = _arms[mab_sit][i]->_missed_acks_explore.sum();
if( sum_ok < 3)
missed=" (missed "+String(_arms[mab_sit][i]->_missed_acks_explore.size() - sum_ok)+" out of "+String(_arms[mab_sit][i]->_missed_acks_explore.size())+")";
DEBUG_CHATTER("[SrcMab %s] UCB for arm %d is %d (scale %d, rate %d, nr_trials %d, total nr_explore %d, use %d). Missed %d%s.",
_now.unparse().c_str(), i, ucb, _arms[mab_sit][i]->_scaling_rate, _arms[mab_sit][i]->_current_best_rate_total,
_nr_trials[_current_dst], _arms[mab_sit][i]->_num_explore, explores[i], _arms[mab_sit][i]->_missed_explore, missed.c_str());
if(_arms[mab_sit][i]->_missed_acks_explore.all_equal(0) && _arms[mab_sit][i]->_num_explore > 0) {
DEBUG_CHATTER("[SrcMab] Missed last three on arm %d, reset rates", i);
_arms[mab_sit][i]->reset_rates_missed();
ucb = 0;
}
if(ucb > max_ucb) {
best_i_ucb = i;
max_ucb = ucb;
}
if(_arms[mab_sit][i]->_current_best_rate_total > max_rate && sum_ok >= 3) { // if more than two acks missed in last 5, do not consider
max_rate = _arms[mab_sit][i]->_current_best_rate_total;
_best_arm[mab_sit] = _arms[mab_sit][i];
best_i_rate = i;
}
}
if(best_i_rate!=-1) {
if(_debug && _best_arm_id[mab_sit] != best_i_rate)
click_chatter("[SrcMab] New best arm, is now %d", best_i_rate);
_best_arm_id[mab_sit] = best_i_rate;
}
if(_rand_no_ucb) {
best_i_ucb = click_random(0,_arms[mab_sit].size()-1);
if(_arms[mab_sit].size()!=do_not_pick.size()) {
while(is_in_vector(do_not_pick, best_i_ucb))
best_i_ucb = click_random(0,_arms[mab_sit].size()-1);
}
DEBUG_CHATTER("[SrcMab] Arm %d is picked at random.", best_i_ucb);
}
else
DEBUG_CHATTER("[SrcMab] Arm %d is picked (UCB %d).", best_i_ucb, max_ucb);
return best_i_ucb;
}
else {
int randi = click_random(0,non_explored_arms.size()-1);
return non_explored_arms[randi];
}
return 0;
}
void
SrcMab::update_best_rate(MabArmInfo *arm, uint16_t id) {
Vector<uint16_t> ids = arm->_explore_ids[id];
Vector<uint32_t> best_rate;
uint64_t max_rate = 0;
if(ids.size() == 1) {
if(arm->_sent_rates_table[ids[0]][0] == 0)
return;
gamma_type max_gamma = 0;
for(HashTable<LinkRouting,HashTable<uint16_t, gamma_type> >::const_iterator it=arm->_gammas_table.begin(); it != arm->_gammas_table.end();++it) {
if(it.value()[ids[0]] > max_gamma)
max_gamma = it.value()[ids[0]];
}
#ifdef CLICK_USERLEVEL
max_gamma = mymax(((gamma_type) GAMMA_SCALE)/1000,max_gamma);
#else
max_gamma = mymax(int_divide((gamma_type) GAMMA_SCALE,1000),max_gamma);
#endif
uint32_t rate = (uint32_t) int_divide(arm->_sent_rates_table[ids[0]][0],max_gamma)*1000;
if(rate < MAX_POSSIBLE_RATE) {
if(arm->_current_best_rate_alone[0]==0)
arm->_current_best_rate_alone[0] = (uint32_t) rate;
else
arm->_current_best_rate_alone[0] = int_divide( 7*arm->_current_best_rate_alone[0] + 3*( (uint32_t) rate), 10); // ewma with 30%
DEBUG_CHATTER("[SrcMab] New best rate alone for route %s is %u", arm->_froutes[0].unparse().c_str(), (uint32_t) rate);
best_rate.push_back(rate);
}
else {
DEBUG_CHATTER("[SrcMab] Rate is too high (%d), must be an error", rate);
return;
}
}
else if(ids.size() == 2) {
if(arm->_sent_rates_table[ids[0]].size() != 2 || arm->_sent_rates_table[ids[1]].size() != 2) {
DEBUG_CHATTER("[SrcMab] Error: wrong size for rate vectors, are %d %d",
arm->_sent_rates_table[ids[0]].size(), arm->_sent_rates_table[ids[1]].size());
return;
}
Vector<Vector<uint64_t> > rates_mat; // rates should be in kB/s
rates_mat.push_back(vector_scaling64(arm->_sent_rates_table[ids[0]], 1, 1000)); // scale by 1000 to get in kB/s
rates_mat.push_back(vector_scaling64(arm->_sent_rates_table[ids[1]], 1, 1000));
HashTable<LinkRouting, Vector<uint64_t> > alpha_mat;
DEBUG_CHATTER("[SrcMab] Compute alphas...");
// gammas should be in pro-billion to get good alpha values: alpha in 10^12 sec/B
for(HashTable<LinkRouting,HashTable<uint16_t, gamma_type> >::const_iterator it=arm->_gammas_table.begin();it != arm->_gammas_table.end();++it) {
if(it.value()[ids[0]] > 0 || it.value()[ids[1]]) {
Vector<uint64_t> gammas;
gammas.push_back(((uint64_t) it.value()[ids[0]])*(int_divide(power(10,9),GAMMA_SCALE)));
gammas.push_back(((uint64_t) it.value()[ids[1]])*(int_divide(power(10,9),GAMMA_SCALE)));
VERB_DEBUG_CHATTER("[SrcMab] For link %s, id %d %d, gammas are %s, rates [%s;%s]", it.key().unparse().c_str(), ids[0], ids[1],
print_vector(gammas).c_str(), print_vector(rates_mat[0]).c_str(), print_vector(rates_mat[1]).c_str());
Vector<uint64_t> alphas = solve_22_lin_sys(rates_mat, gammas, _debug);
if(alphas.size() < 2) { // should not happen
_now.assign_now();
DEBUG_CHATTER("[SrcMab %s] Wrong size %d for alpha", _now.unparse().c_str(), alphas.size());
return;
}
insert_in_alpha_mat(&alpha_mat, it.key(), alphas);
}
}
DEBUG_CHATTER("[SrcMab] %d alphas computed, compute rates...", alpha_mat.size());
// we have alpha, solve system to get the optimal rate
uint64_t min_rate_route0 = ULLONG_MAX;
uint64_t min_rate_route1 = ULLONG_MAX;
uint64_t scale = power(10,12); // alpha in 10^12 sec/B: we will get rates in B/sec
Vector<uint64_t> one_vec = Vector<uint64_t>(2,scale);
for(HashTable<LinkRouting, Vector<uint64_t> >::const_iterator it=alpha_mat.begin();it!=alpha_mat.end();++it) {
if(it.value().size() == 2 && (it.value()[0] > 0 || it.value()[1] > 0)) {
uint64_t div0;
if(it.value()[0] > 0 && (div0 = uint64_divide(scale,it.value()[0])) < min_rate_route0) // rates in B/sec
min_rate_route0 = div0;
uint64_t div1;
if(it.value()[1] > 0 && (div1 = uint64_divide(scale,it.value()[1])) < min_rate_route1)
min_rate_route1 = div1;
for(HashTable<LinkRouting, Vector<uint64_t> >::const_iterator it2=alpha_mat.begin();it2!=alpha_mat.end();++it2) {
if(it.key() > it2.key() && it2.value().size() == 2 && (it2.value()[0] > 0 || it2.value()[1] > 0)) {
Vector<Vector<uint64_t> > A_mat;
A_mat.push_back(it.value());
A_mat.push_back(it2.value());
Vector<uint64_t> rate_vec = solve_22_lin_sys(A_mat, one_vec, _debug);
if(rate_vec.size() < 2) { // should not happen
DEBUG_CHATTER("[SrcMab] Wrong size %d for rate_vec", rate_vec.size());
return;
}
// check admissibility of rate vector
bool admissible = true;
for(HashTable<LinkRouting, Vector<uint64_t> >::const_iterator it_check=alpha_mat.begin();it_check!=alpha_mat.end();++it_check) {
uint64_t sum;
if((sum=it_check.value()[0]*rate_vec[0] + it_check.value()[1]*rate_vec[1]) > 102*(scale/100)) { // allow to be slightly higher (rounded)
DEBUG_CHATTER("[SrcMab] Rate vector %s is not admissible (sum for alpha %s is %s)", print_vector(rate_vec).c_str(),
print_vector(it_check.value()).c_str(), String(sum).c_str());
admissible = false;
break;
}
}
uint64_t rate = rate_vec[0]+rate_vec[1];
if(rate < MAX_POSSIBLE_RATE && rate > max_rate && admissible) { // check that rate less than MAX_POSSIBLE_RATE (should not happen)
max_rate = rate;
best_rate.clear();
best_rate.push_back((uint32_t) rate_vec[0]);
best_rate.push_back((uint32_t) rate_vec[1]);
}
}
else if(it2.value().size() != 2)
DEBUG_CHATTER("[SrcMab] Error when computing rate: wrong size of second alpha %d (should be 2)", it.value().size());
else if (it2.value()[0] == 0 && it2.value()[1] == 0)
DEBUG_CHATTER("[SrcMab] Error when computing rate: second alpha is zero");
}
}
else if (it.value().size() != 2)
DEBUG_CHATTER("[SrcMab] Error when computing rate: wrong size of first alpha %d (should be 2)", it.value().size());
else
DEBUG_CHATTER("[SrcMab] Error when computing rate: first alpha is zero");
}
bool pass = false;
if(min_rate_route0 < 5*125*arm->_routes[0].max_capa_alone) { // if greater than three times capacity from routing, it is likely to be a mistake (note: B/s vs. kb/s)
if(arm->_current_best_rate_alone[0]==0)
arm->_current_best_rate_alone[0] = (uint32_t) min_rate_route0;
else
arm->_current_best_rate_alone[0] = int_divide( 7*arm->_current_best_rate_alone[0] + 3*( (uint32_t) min_rate_route0), 10); // ewma with 30%
DEBUG_CHATTER("[SrcMab] New best rate alone for route %s is %u", arm->_froutes[0].unparse().c_str(), (uint32_t) min_rate_route0);
if(min_rate_route0 > max_rate) {
max_rate = min_rate_route0;
best_rate = Vector<uint32_t>(1,(uint32_t) max_rate);
best_rate.push_back(0u);
}
}
else {
DEBUG_CHATTER("[SrcMab] WARNING: computed capacity for route %s is %u, whereas capacity from routing is %u", arm->_froutes[0].unparse().c_str(),
(uint32_t) min_rate_route0,(uint32_t) (arm->_routes[0].max_capa_alone*125));
pass = true;
}
if(min_rate_route1 < 5*125*arm->_routes[1].max_capa_alone) {
if(arm->_current_best_rate_alone[1]==0)
arm->_current_best_rate_alone[1] = (uint32_t) min_rate_route1;
else
arm->_current_best_rate_alone[1] = int_divide( 7*arm->_current_best_rate_alone[1] + 3*( (uint32_t) min_rate_route1), 10); // ewma with 30%
DEBUG_CHATTER("[SrcMab] New best rate alone for route %s is %d", arm->_froutes[1].unparse().c_str(), (uint32_t) min_rate_route1);
if(min_rate_route1 > max_rate) {
max_rate = min_rate_route1;
best_rate = Vector<uint32_t>(1,0u);
best_rate.push_back((uint32_t) max_rate);
}
}
else {
DEBUG_CHATTER("[SrcMab] WARNING: computed capacity for route %s is %u, whereas capacity from routing is %u", arm->_froutes[1].unparse().c_str(),
(uint32_t) min_rate_route1,(uint32_t) (arm->_routes[1].max_capa_alone*125));
pass = true;
}
if(pass)
best_rate.clear();
if(best_rate.size() == ids.size())
DEBUG_CHATTER("[SrcMab] Rate for routes alone are %d %d.", (int) min_rate_route0, (int) min_rate_route1);
}
else {
DEBUG_CHATTER("[SrcMab] WARNING: unsupported number of ids");
return;
}
if(best_rate.size() != ids.size()) {
DEBUG_CHATTER("[SrcMab] Wrong size of vector rate (should be %d, is %d)", ids.size(), best_rate.size());
return;
}
Vector<uint32_t> old_best_rate = arm->_current_best_rate;
uint32_t old_rate_total = arm->_current_best_rate_total;
uint32_t total_rate_this_slot = vector_sum(best_rate);
if(_variable_min_proba)
DEBUG_CHATTER("[SrcMab] Current variable explore probability is %d", _min_proba);
if( ((uint32_t) myabs(((int) total_rate_this_slot)-((int) old_rate_total))) > 3*(old_rate_total/10)) { // error > 40%
if(_variable_min_proba) {
arm->_min_proba = _variable_min_proba_max;
_min_proba = _variable_min_proba_max;
DEBUG_CHATTER("[SrcMab] Increase explore probability, is now %d, %d for arm", _min_proba, arm->_min_proba);
}
if(arm->_num_explore > 5 ) {
arm->_consecutive_high_error++;
}
}
else if (((uint32_t) myabs(((int) total_rate_this_slot)-((int) old_rate_total))) < (old_rate_total/10)) { // within 20%
if(_variable_min_proba) {
arm->_min_proba = _variable_min_proba_min;
_min_proba = mymax(_variable_min_proba_min, (10*_min_proba)/13);
DEBUG_CHATTER("[SrcMab] Decrease explore probability, is now %d, %d for arm", _min_proba, arm->_min_proba);
}
arm->_consecutive_high_error=0;
}
else
arm->_consecutive_high_error=0;
MabSituation mab_sit = MabSituation(_current_dst, _current_hash_flow);
int arm_id;
if(_debug)
arm_id = find_in_vector(_arms[mab_sit], arm);
// if consecutive high level of error (capacity change): reset arm
if(arm->_consecutive_high_error >= 3) {
DEBUG_CHATTER("[SrcMab %s] Reset arm %d: %d high error on this arm", _now.unparse().c_str(),
arm_id, arm->_consecutive_high_error);
arm->reset_rates();
arm->_consecutive_high_error=0;
}
if(max_rate > 2*arm->_scaling_rate)
DEBUG_CHATTER("[SrcMab] WARNING: got rate %d, scaling is %s.", max_rate, String((int)arm->_scaling_rate).c_str());
arm->update_current_best_rate(best_rate, _ewma*_alpha); // 0 for no ewma
_last_estimated_rates.set(Multipath(arm->_routes), arm->_current_best_rate_total);
if(_debug) {
_now.assign_now();
DEBUG_CHATTER("[SrcMab %s] New best rate for arm %d, routes %s, explore %d (missed %d), is %s, total %u (was %s, total %u, estimation at this slot: %s, total %u)",
_now.unparse().c_str(), arm_id, print_vector(arm->_froutes).c_str(), arm->_num_explore, arm->_missed_explore,
print_vector(arm->_current_best_rate).c_str(), arm->_current_best_rate_total, print_vector(old_best_rate).c_str(),
old_rate_total, print_vector(best_rate).c_str(), total_rate_this_slot);
}
for(int i=0;i<ids.size();i++) {
arm->_explore_ids.erase(ids[i]);
}
}
void
SrcMab::compute_new_routes(IPAddress ip_dst) {
_last_computed_routes.assign_now();
DEBUG_CHATTER("[SrcMab %s] Compute new multipaths", _last_computed_routes.unparse().c_str());
// get more multipaths here, for diversity (link estimation hence routing protocol might be wrong), then select the _nr_arms best ones
Vector<Vector<Route> > routes = _routing->get_multipaths(ip_dst, 2*_nr_arms, _nr_routes);
Vector<Multipath> multipaths_mp;
for(int i=0;i<routes.size();i++) {
Multipath mp;
for(int j=0;j<routes[i].size();j++) {
mp.add_route(routes[i][j]);
}
if(is_in_hash_table(_last_estimated_rates,mp)) {
// use last estimated rate, if any
mp._total_capacity = _last_estimated_rates[mp]/125;
}
if(!is_in_vector(_multipaths[ip_dst], mp._routes)) {
// to avoid oscillations that can harm performance, penalize a bit new multipaths
mp._total_capacity = (10*mp._total_capacity)/11;
}
if(!is_in_vector(multipaths_mp,mp))
insert_in_sorted_descent_vector(multipaths_mp, mp);
}
for(int i=0;i<_multipaths[ip_dst].size();i++) {
// if current arms not in new set
Multipath mp;
for(int j=0;j<_multipaths[ip_dst][i].size();j++) {
mp.add_route(_multipaths[ip_dst][i][j]);
}
if(is_in_hash_table(_last_estimated_rates,mp)) {
// use last estimated rate, if any
mp._total_capacity = _last_estimated_rates[mp]/125;
}
if(!is_in_vector(multipaths_mp,mp))
insert_in_sorted_descent_vector(multipaths_mp, mp);
}
// if(_debug) {
// for(int i=0; i < multipaths_mp.size(); i++) {
// click_chatter("[SrcMab] Computed multipath is %s (estimated capa %u)", multipaths_mp[i].unparse().c_str(), _last_estimated_rates.get(multipaths_mp[i]));
// }
// click_chatter("");
// }
// keep _nr_arms best ones
Vector<Vector<Route> > multipaths;
for(int i=0; i < _nr_arms && i < multipaths_mp.size(); i++) {
multipaths.push_back(multipaths_mp[i]._routes);
}
Vector<int> to_add;
Vector<int> to_remove;
for(int i=0;i<multipaths.size();i++) {
if (!is_in_vector(_multipaths[ip_dst], multipaths[i])) {
DEBUG_CHATTER("[SrcMab] Need to add arm %d %s", i, print_vector(multipaths[i]).c_str());
to_add.push_back(i);
}
}
for(int i=0;i<_multipaths[ip_dst].size();i++) {
if (!is_in_vector(multipaths, _multipaths[ip_dst][i])) {
DEBUG_CHATTER("[SrcMab] Need to remove arm %d %s", i, print_vector(_multipaths[ip_dst][i]).c_str());
to_remove.push_back(i);
}
}
int old_size = _multipaths[ip_dst].size();
if(to_add.size()!= 0 || to_remove.size()!=0) {
for(int i=0; i < multipaths_mp.size(); i++) {
click_chatter("[SrcMab] Computed multipath %d is %s (estimated capa %u)", i,
multipaths_mp[i].unparse().c_str(), _last_estimated_rates.get(multipaths_mp[i]));
}
click_chatter("");
for(int i=0;i<multipaths.size();i++) {
click_chatter("[SrcMab] New multipath %d is %s", i, print_vector(multipaths[i]).c_str());
}
click_chatter("");
for(HashTable<MabSituation, int>::iterator it=_best_arm_id.begin();it.live();it++) {
MabSituation mab_sit = it.key();
if(mab_sit._dst == ip_dst) {
for(int i=0;i<_multipaths[ip_dst].size();i++) {
click_chatter("[SrcMab] Current multipath %d is %s", i, print_vector(_multipaths[ip_dst][i]).c_str());
}
click_chatter("");
break;
}
}
}
int count_mab_sit = 0;
for(HashTable<MabSituation, int>::iterator it=_best_arm_id.begin();it.live();it++) { // must do it for each MabSituation
MabSituation mab_sit = it.key();
if(mab_sit._dst == ip_dst) {
// if(count_mab_sit==0 && _debug) {
// for(int i=0;i<_multipaths[ip_dst].size();i++) {
// click_chatter("[SrcMab] Current multipath %d is %s", i, print_vector(_multipaths[ip_dst][i]).c_str());
// }
// }
if(is_in_vector(to_remove, _best_arm_id[mab_sit])) {
// current best arm is removed, replace it
if(to_remove.size() != _arms[mab_sit].size()) {
// choose best among remaining arms
uint32_t best_rate = 0;
for(int j=0;j<_arms[mab_sit].size();j++) {
if(!is_in_vector(to_remove, j)) {
if(_arms[mab_sit][j]->_current_best_rate_total > best_rate) {
best_rate = _arms[mab_sit][j]->_current_best_rate_total;
_best_arm[mab_sit] = _arms[mab_sit][j];
_best_arm_id[mab_sit] = j;
}
}
}
if (count_mab_sit == 0) // print only once
DEBUG_CHATTER("[SrcMab] New best arm is %d, %s", _best_arm_id[mab_sit],
print_vector(_best_arm[mab_sit]->_routes).c_str());
}
else {
_best_arm[mab_sit] = 0;
_best_arm_id[mab_sit] = -1;
}
}
if (to_add.size() == to_remove.size()) { // when _nr_arms multipaths, this should always be the case
for(int i=0;i<to_add.size();i++) {
if (count_mab_sit == 0)
DEBUG_CHATTER("[SrcMab] Replace arm %d with routes %s\n by arm with routes %s", to_remove[i],
print_vector(_arms[mab_sit][to_remove[i]]->_routes).c_str(), print_vector(multipaths[to_add[i]]).c_str());
_arms[mab_sit][to_remove[i]] = new MabArmInfo(ip_dst, multipaths[to_add[i]], _alpha);
_multipaths[ip_dst][to_remove[i]] = multipaths[to_add[i]];
if(_arms[mab_sit][to_remove[i]]->_current_total_rate > _scaling[mab_sit])
_scaling[mab_sit] = _arms[mab_sit][to_remove[i]]->_current_total_rate;
// remove last explore indices
_last_explores[mab_sit].erase(to_remove[i]);
}
}
else if (to_add.size() > to_remove.size()) { // means that we had less than _nr_arms before
for(int i=0;i<to_add.size();i++) {
if(i >= to_remove.size()) {
if (count_mab_sit == 0) {
DEBUG_CHATTER("[SrcMab] Add arm with routes %s", print_vector(multipaths[to_add[i]]).c_str());
_multipaths[ip_dst].push_back(multipaths[to_add[i]]); // add it only once
}
_arms[mab_sit].push_back(new MabArmInfo(ip_dst, multipaths[to_add[i]], _alpha));
if(_arms[mab_sit].back()->_current_total_rate > _scaling[mab_sit])
_scaling[mab_sit] = _arms[mab_sit].back()->_current_total_rate;
}
else {
if (count_mab_sit == 0)
DEBUG_CHATTER("[SrcMab] Replace arm %d with routes %s\n by arm with routes %s", to_remove[i],
print_vector(_arms[mab_sit][to_remove[i]]->_routes).c_str(), print_vector(multipaths[to_add[i]]).c_str());
_arms[mab_sit][to_remove[i]] = new MabArmInfo(ip_dst, multipaths[to_add[i]], _alpha);
_multipaths[ip_dst][to_remove[i]] = multipaths[to_add[i]];
if(_arms[mab_sit][to_remove[i]]->_current_total_rate > _scaling[mab_sit])
_scaling[mab_sit] = _arms[mab_sit][to_remove[i]]->_current_total_rate;
// remove last explore indices
_last_explores[mab_sit].erase(to_remove[i]);
}
}
// increase size of last_explores
_last_explores[mab_sit].vector.resize((1000/_alpha)*_arms[mab_sit].size(),-1);
}
else { // to_add.size() < to_remove.size() // means that we computed less than _nr_arms, do not remove all in previous
for(int i=0;i<to_add.size();i++) {
// to_remove ordered by decreasing size, remove last ones first
int i_remove = to_remove.size()-i-1;
if (count_mab_sit == 0)
DEBUG_CHATTER("[SrcMab] Replace arm %d with routes %s\n by arm with routes %s", to_remove[i_remove],
print_vector(_arms[mab_sit][to_remove[i_remove]]->_routes).c_str(), print_vector(multipaths[to_add[i]]).c_str());
_arms[mab_sit][to_remove[i_remove]] = new MabArmInfo(ip_dst, multipaths[to_add[i]], _alpha);
_multipaths[ip_dst][to_remove[i_remove]] = multipaths[to_add[i]];
if(_arms[mab_sit][to_remove[i_remove]]->_current_total_rate > _scaling[mab_sit])
_scaling[mab_sit] = _arms[mab_sit][to_remove[i_remove]]->_current_total_rate;
// remove last explore indices
_last_explores[mab_sit].erase(to_remove[i_remove]);
}
}
if(to_add.size()!=0 || to_remove.size()!=0) {
for(int i=0;i<_arms[mab_sit].size();i++) {
_arms[mab_sit][i]->_scaling_rate = _scaling[mab_sit];
}
}
count_mab_sit++;
}
}
if(to_add.size()!=0 || to_remove.size()!=0) {
DEBUG_CHATTER("[SrcMab %s] New multipaths for ip %s, %d multipaths computed, had %d, now %d (%d to add, %d to remove), computed in %s sec", _last_computed_routes.unparse().c_str(),
ip_dst.unparse().c_str(), multipaths.size(), old_size, _multipaths[ip_dst].size(), to_add.size(), to_remove.size(), (Timestamp::now()-_last_computed_routes).unparse().c_str());
}
}
void
SrcMab::notification_empty_channel_plc() {
_notified_plc=true;
if(_notified_wifi)
_trial_timer.schedule_now();
_measurements_plc->stop_notify_empty_channel();
}
void
SrcMab::notification_empty_channel_wifi() {
_notified_wifi=true;
if(_notified_plc)
_trial_timer.schedule_now();
_measurements_wifi->stop_notify_empty_channel();
}
inline bool
SrcMab::enq(Packet *p)
{
assert(p);
Storage::index_type h = head(), t = tail(), nt = next_i(t);
if (nt != h) {
_q[t] = p;
packet_memory_barrier(_q[t]);
set_tail(nt);
// if(size()!=_current_capacity && size()%mymax(5,_current_capacity/10)==0 || size()==1)
// DEBUG_CHATTER("[SrcMab %s] Size of queue goes up to %d/%d (killed %s enq %s, current TCP %s)", Timestamp::now().unparse().c_str(),
// size(), _current_capacity, String(_killed_too_much).c_str(), String(_killed_too_much_enq).c_str(),
// String(_current_tcp_seq[_current_dst]).c_str());
return true;
}
else {
return false;
}
}
inline Packet *
SrcMab::deq()
{
Storage::index_type h = head(), t = tail();
if (h != t) {
Packet *p = _q[h];
packet_memory_barrier(_q[h]);
set_head(next_i(h));
assert(p);
// if(size()%mymax(5,_current_capacity/10)==0)
// DEBUG_CHATTER("[SrcMab %s] Size of queue goes down to %d/%d (killed %s enq %s, current TCP %s)", Timestamp::now().unparse().c_str(),
// size(), _current_capacity, String(_killed_too_much).c_str(), String(_killed_too_much_enq).c_str(),
// String(_current_tcp_seq[_current_dst]).c_str());
return p;
}
else
return 0;
}
void
SrcMab::set_active(bool b) {
if (_active && !b) {
DEBUG_CHATTER("[SrcMab %s] Now inactive.", Timestamp::now().unparse().c_str());
// clear();
// _multipaths.clear();
}
if(!_active && b) {
DEBUG_CHATTER("[SrcMab %s] Now active.", Timestamp::now().unparse().c_str());
// clear();
}
_active = b;
}
void
SrcMab::clear() {
DEBUG_CHATTER("[SrcMab] Clearing...");
_arms.clear();
_current_arm.clear();
_best_arm.clear();
_best_arm_id.clear();
_multipaths.clear();
_time_defered_packet.clear();
if(_defered_packet)
_defered_packet->kill();
_defered_packet = 0;
_trial_timer.unschedule();
_reduce_rate.clear();
_currentId.clear();
_info_to_use.clear();
_i_to_use.clear();
_froute_to_use.clear();
_count_packets.clear();
_rateId2MabArmInfo.clear();
_current_dst = IPAddress();
_seqNumTable.clear();
_nr_trials.clear();
if(_rate_shaper_elmt)
_rate_shaper_elmt->set_active(false);
_new_explore = false;
_mark_last_packet = false;
_last_packet_marked = false;
_stop_sending = false;
_killed_mark = 0;
_killed_stop = 0;
_killed_too_much = 0;
_killed_too_much_enq = 0;
int s = size();
for (int i=0;i<s;i++)
deq()->kill();
for(HashTable<IPAddress, Packet *>::iterator it=_dummy_packet.begin();it.live();it++)
if(it.value())
it.value()->kill();
_dummy_packet.clear();
_current_hash_flow = 0;
for (HashTable<IPAddress, PacketTable*>::iterator iter = _packets_sent.begin(); iter.live(); iter++) {
PacketTable* _ptable = _packets_sent.get(iter.key());
if(!_ptable)
continue;
for(HashTable<uint16_t, Packet*>::iterator it = _ptable->begin();it.live();it++)
if(it.value())
it.value()->kill();
}
_packets_sent.clear();
_stop_explore = 0;
}
uint32_t
SrcMab::get_reset_current_rate(int i) {
_now.assign_now();
MabArmInfo *arm = _current_arm[MabSituation(_current_dst,_current_hash_flow)];
if(arm != 0) {
if(i < arm->_current_sent_rate.size()) {
Timestamp diff = _now - arm->_current_sent_rate[i].last;
uint32_t sent_rate = 0;
if(diff.msecval() > 0)
sent_rate = 1000*int_divide(arm->_current_sent_rate[i].sent,diff.msecval());
arm->_current_sent_rate[i].reinitialize(_now);
return sent_rate;
}
else
DEBUG_CHATTER("[SrcMab] WARNING: size error of _current_sent_rate in handler.");
}
else {
if(i < _last_sent.size() && i < _last_multipath.size()) {
Timestamp diff = _now - _last_sent[i].last;
uint32_t sent_rate = 0;
if(diff.msecval() > 0)
sent_rate = 1000*int_divide(_last_sent[i].sent,diff.msecval());
_last_sent[i].reinitialize(_now);
return sent_rate;
}
else
DEBUG_CHATTER("[SrcMab] WARNING: size error of _last_sent in handler.");
}
return 0;
}
uint32_t
SrcMab::get_total_rate() {
uint32_t total_rate = 0;
MabArmInfo *arm = _current_arm[MabSituation(_current_dst,_current_hash_flow)];
if(arm != 0) {
for(int i=0;i<arm->_froutes.size();i++)
total_rate += get_reset_current_rate(i);
}
return total_rate;
}
String
SrcMab::get_rate_route() {
String res = "";
MabSituation mab_sit = MabSituation(_current_dst,_current_hash_flow);
MabArmInfo *arm = _current_arm[mab_sit];
if(arm != 0) {
int arm_id = find_in_vector(_arms[mab_sit], arm);
for(int i=0;i<arm->_froutes.size();i++) {
if(_active || !_acne_source_elmt)
res += (arm->_froutes[i].unparse() + " --> " + String(get_reset_current_rate(i)/128) +
" ("+String(arm->_current_sending_rate[i].rate/128)+") arm "+String(arm_id)+" trial "+String(_nr_trials[_current_dst])+" ts "+Timestamp::now().unparse()+" ;\n");
else
res += (arm->_froutes[i].unparse() + " --> " + String(_acne_source_elmt->get_rate()/128) + " ;\n");
}
}
else {
for(int i=0;i<_last_multipath.size();i++) {
res += (FormatedRoute(_last_multipath[i]).unparse() + " --> " + String(get_reset_current_rate(i)/128) +" ts "+Timestamp::now().unparse()+" ;\n");
}
_last_sent.clear();
}
if(_debug)
click_chatter("%s", res.c_str());
return res;
}
bool
SrcMab::is_exception_packet(Packet *p_in) {
// list packets that must go through in any case
const click_ip* ip4_header = p_in->ip_header();
//VERB_DEBUG_CHATTER("[SrcMab] Received packet, IP protocol %d (UDP is %d)", ip4_header->ip_p, IP_PROTO_UDP);
if(ip4_header->ip_p == IP_PROTO_ICMP)
return true;
if(ip4_header->ip_p == IP_PROTO_UDP) {
const click_udp *udp_hdr = p_in->udp_header();
//VERB_DEBUG_CHATTER("[SrcMab] It's an UDP packet, from port %d (ntoh %d) to port %d (ntoh %d)", udp_hdr->uh_sport, ntohs(udp_hdr->uh_sport), udp_hdr->uh_dport, ntohs(udp_hdr->uh_dport));
if(udp_hdr->uh_sport == htons(123) || udp_hdr->uh_dport == htons(123)) { // NTP
VERB_DEBUG_CHATTER("[SrcMab] Received NTP packet, transmitting it");
return true;
}
if(udp_hdr->uh_sport == htons(6073)) {
VERB_DEBUG_CHATTER("[SrcMab] Received UDP iperf ack, transmitting it");
return true;
}
}
if(ip4_header->ip_p == IP_PROTO_TCP) {
click_tcp *tcp_hdr = (click_tcp *)(p_in->tcp_header());
if(tcp_hdr->th_sport == htons(6073)) {
VERB_DEBUG_CHATTER("[SrcMab] Received TCP iperf ack, transmitting it");
return true;
}
if(tcp_hdr->th_flags & TH_SYN) {
VERB_DEBUG_CHATTER("[SrcMab] Received SYN TCP packet, transmitting it");
return true;
}
if(tcp_hdr->th_flags & TH_RST) {
VERB_DEBUG_CHATTER("[SrcMab] Received RESET TCP packet, transmitting it");
return true;
}
if(tcp_hdr->th_flags & TH_FIN) {
VERB_DEBUG_CHATTER("[SrcMab] Received FIN TCP packet, transmitting it");
return true;
}
if(tcp_hdr->th_flags & TH_ACK) {
// use ClassifyAck
uint16_t a = htons(ip4_header->ip_len);
uint32_t b = tcp_hdr->th_off;
uint32_t c = a - b*4 - 20;
if(c==0) {
VERB_DEBUG_CHATTER("[SrcMab] Received ACK TCP packet, transmitting it");
return true;
}
}
}
if(p_in->length() < 500) { //small packets are not data packets
VERB_DEBUG_CHATTER("[SrcMab] Received small packet, transmitting it");
return true;
}
return false;
}
void
SrcMab::set_max_count_packets(int arg) {
DEBUG_CHATTER("[SrcMab %s] Max count is now %d", Timestamp::now().unparse().c_str(), arg);
_max_count_packets=arg;
}
void
SrcMab::set_delta(int delta) {
if(delta<0 || delta>1000) {
click_chatter("[SrcMab] Delta should be between 0 and 1000");
return;
}
_delta = delta;
}
int
SrcMab::bool_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
SrcMab *elmt = (SrcMab *)e;
int b;
if(!cp_integer(s, &b))
return errh->error("Debug must be 0 or 1");
if (!(b == 0 || b == 1))
return errh->error("Debug must be 0 or 1");
if(((intptr_t) a) == 0)
elmt->set_debug(b==1);
if(((intptr_t) a) == 1)
elmt->set_verb_debug(b==1);
if(((intptr_t) a) == 2)
elmt->set_stop_explore(b==1);
if(((intptr_t) a) == 3)
elmt->clear();
if(((intptr_t) a) == 4)
elmt->set_debug_nack(b==1);
else if(((intptr_t) a) == 5)
elmt->set_active(b==1);
else if(((intptr_t) a) == 6)
elmt->set_nacks(b==1);
else if(((intptr_t) a) == 7)
elmt->set_variable_min_proba(b==1);
else if(((intptr_t) a) == 8)
elmt->set_debug_explore(b==1);
else if(((intptr_t) a) == 9)
elmt->set_tcp_retrans(b==1);
else if(((intptr_t) a) == 10)
elmt->set_rand_no_ucb(b==1);
else if(((intptr_t) a) == 11)
elmt->set_kill_not_empty(b==1);
else if(((intptr_t) a) == 12)
elmt->set_deterministic_explore(b==1);
return 0;
}
int
SrcMab::set_int_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
SrcMab *elmt = (SrcMab *)e;
int arg;
if(!cp_integer(s, &arg))
return errh->error("Arg must be an integer");
if(((intptr_t) a) == 0)
elmt->set_max_count_packets(arg);
else if(((intptr_t) a) == 1)
elmt->set_burst_ctrl(arg);
else if(((intptr_t) a) == 2)
elmt->set_queue_capacity(arg);
else if(((intptr_t) a) == 3)
elmt->set_tb_capacity_factor(arg);
else if(((intptr_t) a) == 4)
elmt->set_min_proba(arg);
else if(((intptr_t) a) == 5)
elmt->set_alpha(arg);
else if(((intptr_t) a) == 6)
elmt->set_silent_slot(arg);
else if(((intptr_t) a) == 7)
elmt->set_delta(arg);
else if(((intptr_t) a) == 8)
elmt->set_small_delta(arg);
else if(((intptr_t) a) == 9)
elmt->set_nr_arms(arg);
else if(((intptr_t) a) == 10)
elmt->set_nr_routes(arg);
else if(((intptr_t) a) == 11)
elmt->set_clear_after(arg);
else if(((intptr_t) a) == 12)
elmt->set_freq_computed_routes(arg);
else if(((intptr_t) a) == 13)
elmt->set_queue_capacity_exploit(arg);
else if(((intptr_t) a) == 14)
elmt->set_verb_debug_freq(arg);
else if(((intptr_t) a) == 15)
elmt->set_max_consecutive_killed(arg);
else if(((intptr_t) a) == 16)
elmt->set_variable_min_proba_min(arg);
else if(((intptr_t) a) == 17)
elmt->set_variable_min_proba_max(arg);
return 0;
}
static String
total_rate_handler(Element *e, void *) {
SrcMab *elmt = (SrcMab *)e;
return String(elmt->get_total_rate());
}
static String
rate_route_handler(Element *e, void *) {
SrcMab *elmt = (SrcMab *)e;
return String(elmt->get_rate_route());
}
static String
nr_arms_handler(Element *e, void *) {
SrcMab *elmt = (SrcMab *)e;
return String(elmt->get_nr_arms());
}
void SrcMab::add_handlers() {
add_read_handler("get_reset_sent_rate", total_rate_handler, 0);
add_read_handler("rate_route", rate_route_handler, 0);
add_read_handler("nr_arms", nr_arms_handler, 0);
add_write_handler("debug", bool_handler, 0);
add_write_handler("verb_debug", bool_handler, 1);
add_write_handler("stop_explore", bool_handler, 2);
add_write_handler("reset", bool_handler, 3);
add_write_handler("debug_nack", bool_handler, 4);
add_write_handler("active", bool_handler, 5);
add_write_handler("nacks", bool_handler, 6);
add_write_handler("variable_min_proba", bool_handler, 7);
add_write_handler("debug_explore", bool_handler, 8);
add_write_handler("tcp_retrans", bool_handler, 9);
add_write_handler("rand_no_ucb", bool_handler, 10);
add_write_handler("kill_not_empty", bool_handler, 11);
add_write_handler("deterministic_explore", bool_handler, 12);
add_write_handler("set_max_count_packets", set_int_handler, 0);
add_write_handler("set_burst_ctrl", set_int_handler, 1);
add_write_handler("queue_capacity", set_int_handler, 2);
add_write_handler("tb_capacity_factor", set_int_handler, 3);
add_write_handler("min_proba_explore", set_int_handler, 4);
add_write_handler("alpha", set_int_handler, 5);
add_write_handler("silent_slot", set_int_handler, 6);
add_write_handler("delta", set_int_handler, 7);
add_write_handler("small_delta", set_int_handler, 8);
add_write_handler("set_nr_arms", set_int_handler, 9);
add_write_handler("set_nr_routes", set_int_handler, 10);
add_write_handler("set_clear_after", set_int_handler, 11);
add_write_handler("freq_computed_routes", set_int_handler, 12);
add_write_handler("queue_capacity_exploit", set_int_handler, 13);
add_write_handler("verb_debug_freq", set_int_handler, 14);
add_write_handler("max_consecutive_killed", set_int_handler, 15);
add_write_handler("variable_min_proba_min", set_int_handler, 16);
add_write_handler("variable_min_proba_max", set_int_handler, 17);
}
EXPORT_ELEMENT(SrcMab)
CLICK_ENDDECLS

Event Timeline