Page MenuHomec4science

srcmab.hh
No OneTemporary

File Metadata

Created
Fri, Jul 4, 16:11

srcmab.hh

#ifndef CLICK_SRCMAB_HH
#define CLICK_SRCMAB_HH
#include <clicknet/wifi.h>
#include <clicknet/ether.h>
#include <clicknet/udp.h>
#include <click/packet_anno.hh>
#include "hybridMAC.h"
#include <click/element.hh>
#include <click/notifier.hh>
#include <click/router.hh>
#include <click/confparse.hh>
#include <click/timer.hh>
#include <click/error.hh>
#include <click/etheraddress.hh>
#include <click/hashtable.hh>
#include <click/vector.hh>
#include <click/config.h>
#include <click/args.hh>
#include <click/string.hh>
#include <click/straccum.hh>
#include <click/standard/storage.hh>
#include <elements/standard/simplequeue.hh>
#include "util.hh"
#include "routingpaths.hh"
#include "acnesource.hh"
#include "../elements/standard/bwratedsplitter.hh"
#include "measurementswifi.hh"
#include "measurementsplc.hh"
#define ACNE_SOURCE_FACTOR 10 // if > 10, AcneSource sends a bit more (factor ACNE_SOURCE_FACTOR/10)
#define NUMBER_FIRST_TRIALS 5
CLICK_DECLS
class RoutingPaths;
class BandwidthRatedSplitter;
class AcneSource;
class SimpleQueue;
class MeasurementsWifi;
class MeasurementsPLC;
/*
* class SrcMab:
* Sends traffic at rates based on the info it has (RoutingPaths and feedback)
*/
struct MabSituation {
// to keep information dependgîng on active flows
IPAddress _dst;
uint32_t _flow_hash;
MabSituation(IPAddress ip, uint32_t n) {
_dst = ip;
_flow_hash = n;
}
inline uint32_t hashcode() const {
return (_dst.hashcode()+_flow_hash);
}
inline bool operator==(const MabSituation a) const {
return (a._dst == _dst && a._flow_hash == _flow_hash);
}
inline bool operator!=(const MabSituation a) const {
return (a._dst != _dst || a._flow_hash != _flow_hash);
}
String unparse() const {
return _dst.unparse()+"-"+String(_flow_hash);
}
};
class SrcMab : public Element, public Storage {
public:
SrcMab();
~SrcMab();
const char *class_name() const { return "SrcMab"; }
const char *port_count() const { return "2/1"; }
// push input 0: messages from host
// push input 1: acks for flows it is source of
// push output 0: regular traffic
const char *processing() const { return PUSH; }
int configure(Vector<String> &, ErrorHandler *);
int initialize(ErrorHandler *);
void push(int, Packet*);
void run_timer(Timer *timer);
inline bool enq(Packet*);
inline Packet* deq();
//#ifdef __linux__
inline void packet_memory_barrier(Packet* volatile&);
//#endif
// Handlers
void add_handlers();
static int bool_handler(const String &, Element *, void *,ErrorHandler *);
static int set_int_handler(const String &, Element *, void *,ErrorHandler *);
String get_rate_route();
uint32_t get_reset_current_rate(int);
uint32_t get_total_rate();
int get_nr_arms() {return _nr_arms;}
void set_debug(bool b) {_debug = b;if(_acne_source_elmt && _debug) _acne_source_elmt->set_debug(true);}
void set_verb_debug(bool debug) {_verb_debug=debug; if(_verb_debug) set_debug(true);}
void set_debug_nack(bool b) {_debug_nack = b;}
void set_stop_explore(bool b) {_stop_explore=b;}
void set_active(bool);
void set_nacks(bool b) {_nacks=b;}
void set_variable_min_proba(bool b) {_variable_min_proba=b;}
void set_debug_explore(bool b) {_debug_explore=b;}
void set_tcp_retrans(bool b) {_tcp_retrans=b;}
void set_rand_no_ucb(bool b) {_rand_no_ucb=b;}
void set_kill_not_empty(bool b) {_kill_if_not_empty=b;}
void set_deterministic_explore(bool b) {_deterministic_explore=b;}
void set_silent_slot(int arg) {_silent_slot = arg; }
void set_nr_arms(int n) {_nr_arms = n;}
void set_nr_routes(int n) {_nr_routes = n;}
void set_max_count_packets(int);
void set_delta(int);
void set_burst_ctrl(int arg) {_burst_ctrl=arg;}
void set_queue_capacity(int arg) {_max_capacity=arg;}
void set_queue_capacity_exploit(int arg) {_max_capacity_exploit=arg;}
void set_tb_capacity_factor(int arg) {_tb_capacity_factor=arg;}
void set_min_proba(int arg) {_min_proba = arg*10000;} // arg in percent
void set_alpha(int arg) {_alpha=arg;}
void set_small_delta(int arg){_small_delta = Timestamp(0,arg*Timestamp::subsec_per_msec);}
void set_clear_after(int arg) {_clear_after=arg;}
void set_freq_computed_routes(int arg) {_freq_computed_routes=arg;}
void set_verb_debug_freq(int arg) {_verb_debug_freq=arg;}
void set_max_consecutive_killed(int arg) {_max_consecutive_killed=arg;}
void set_variable_min_proba_min(int arg) {_variable_min_proba_min=arg*10000;}
void set_variable_min_proba_max(int arg) {_variable_min_proba_max=arg*10000;}
void clear();
// methods to be notified by Measurements elements that channel is empty
void notification_empty_channel_wifi();
void notification_empty_channel_plc();
private:
////////////// variables /////////////
Packet* volatile * _q;
// for notification of empty channel
MeasurementsWifi *_measurements_wifi;
MeasurementsPLC *_measurements_plc;
bool _notified_wifi;
bool _notified_plc;
bool _deterministic_explore;
Timer _trial_timer;
Timer _active_timer;
Timer _defered_packet_timer;
Timer _dequeue_timer;
Timer _send_packets_timer;
int _alpha;
bool _ewma;
int _trial_duration;
Timestamp _now;
int _nr_routes;
int _nr_arms;
int _ind_arm;
HashTable<IPAddress, int> _nr_trials;
bool _timeout_bool;
int _silent_slot;
int _current_capacity;
int _max_capacity;
int _max_capacity_exploit;
HashTable<IPAddress,int> _first_trial;
HashTable<IPAddress,bool> _dequeue_first;
int _min_proba;
int _consecutive_high_error;
int _delta;
int _freq_check_meas;
bool _is_tcp_ack;
Timestamp _last_active;
Timestamp _begin;
HashTable<IPAddress,SimpleQueue *> _priority_queue;
bool _tcp_retrans;
HashTable<IPAddress,int64_t> _current_tcp_seq;
int _retrans_tries;
int _retrans_sent;
HashTable<IPAddress, int> _is_tcp;
uint64_t _killed_too_much;
uint64_t _killed_too_much_enq;
uint64_t _killed_stop;
uint64_t _killed_mark;
uint64_t _sent_dequeued;
#ifdef CLICK_USERLEVEL
double _lambda_mab;
#endif
bool _active;
bool _mark_last_packet;
bool _last_packet_marked;
bool _new_explore;
bool _stop_sending;
bool _stop_explore; // can be set for debug purpose
bool _end_packets_sent;
int _burst_ctrl;
int _tb_capacity_factor;
Timestamp _can_send_at;
Timestamp _last_explore;
int _consecutive_killed;
int _max_consecutive_killed;
Timestamp _last_computed_routes;
int _freq_computed_routes;
AcneSource *_acne_source_elmt;
BandwidthRatedSplitter *_rate_shaper_elmt;
RoutingPaths *_routing;
HashTable<IPAddress, Vector<Vector<Route> > > _multipaths;
HashTable<MabSituation, Vector<MabArmInfo*> > _arms;
HashTable<MabSituation, Ring<int> > _last_explores;
HashTable<MabSituation, MabArmInfo*> _current_arm;
HashTable<MabSituation, MabArmInfo*> _best_arm;
HashTable<MabSituation, int> _best_arm_id;
HashTable<MabSituation, uint32_t> _scaling;
HashTable<MabArmInfo*, int> _reduce_rate;
HashTable<IPAddress, Packet* > _dummy_packet;
HashTable<IPAddress, uint8_t> _current_nb_flows;
HashTable<Multipath, uint32_t> _last_estimated_rates;
Vector<Route> _last_multipath;
Vector<FlowInfoLB> _last_sent;
IPAddress _current_dst; // for now, only one destination at a time
uint32_t _current_hash_flow;
HashTable<IPAddress,uint16_t> _seqNumTable;
HashTable<IPAddress,uint16_t> _currentId;
HashTable<IPAddress,HashTable<uint16_t, MabArmInfo*> > _rateId2MabArmInfo;
HashTable<IPAddress, int> _i_to_use;
HashTable<IPAddress, FormatedRoute> _froute_to_use;
HashTable<IPAddress, FlowInfoLB*> _info_to_use;
HashTable<IPAddress, int> _count_packets;
bool _nacks;
HashTable<IPAddress, PacketTable*> _packets_sent;
HashTable<IPAddress, uint16_t> _last_seq_acked;
Timestamp _small_delta;
bool _debug;
bool _debug_nack;
bool _verb_debug;
int _verb_debug_freq;
int _verb_debug_count;
bool _debug_explore;
bool _kill_if_not_empty;
bool _variable_min_proba;
int _variable_min_proba_min;
int _variable_min_proba_max;
int _max_count_packets;
int _dummy_packet_sent;
int _retrans_nacks;
int _clear_after;
Packet *_defered_packet;
HashTable<IPAddress, HashTable<uint16_t,Timestamp> > _time_defered_packet;
bool _rand_no_ucb;
////////////// functions /////////////////
uint16_t getCurrentSeqNum(IPAddress const& addr);
int pick_arm_mab();
void update_best_rate(MabArmInfo *, uint16_t);
bool is_exception_packet(Packet *);
void new_trial();
void send_defered_packet(Packet *);
void send_packet(Packet *, bool, bool);
void send_control_packets(uint8_t, MabArmInfo *);
void send_control_packets(uint8_t);
bool handle_traffic_packet(Packet *);
bool handle_traffic_packet(Packet *, bool);
bool handle_traffic_packet(Packet *, bool, bool,bool);
void handle_ack(Packet *);
void handle_nack_order(Packet *);
bool can_send(IPAddress);
void compute_new_routes(IPAddress);
/////////////// static functions /////////////
static Vector<uint64_t> solve_22_lin_sys(Vector<Vector<uint64_t> > A, Vector<uint64_t> b, bool debug) {
Vector<uint64_t> res;
// this solves a 2*2 system: check sizes
if(A.size() != 2 || A[0].size() != 2 || A[1].size() != 2) {
if(debug) {
if (A.size() != 2)
click_chatter("[SrcMab] Error while solving 2*2 system: wrong matrix size (size of A is %d)", A.size());
else
click_chatter("[SrcMab] Error while solving 2*2 system: wrong matrix size (sizes of A members are %d %d)",
A[0].size(), A[1].size());
}
return res;
}
if(b.size() != 2) {
if(debug)
click_chatter("[SrcMab] Error while solving 2*2 system: wrong vector size (size of b is %d)", b.size());
return res;
}
// use Cramer's rule to solve the system
int64_t detA = ((int64_t) A[0][0]*A[1][1]) - ((int64_t) A[0][1]*A[1][0]);
if(myabs(detA) <= 1) { // determinant is too small
if(debug)
click_chatter("[SrcMab:solve_22] Determinant is too small (%d) for matrix %s;%s", (uint64_t) detA,
print_vector(A[0]).c_str(), print_vector(A[1]).c_str());
return res;
}
// int64 by int64 division not supported, need to transform in int64 by uint32 division
int64_t res0 = int64_divide(((int64_t) b[0]*A[1][1]) - ((int64_t) b[1]*A[0][1]),detA);
if (res0 <= 0)
res.push_back(0);
else
res.push_back((uint64_t) res0);
int64_t res1 = int64_divide(((int64_t) b[1]*A[0][0]) - ((int64_t) b[0]*A[1][0]),detA);
if(res1 <= 0)
res.push_back(0);
else
res.push_back((uint64_t) res1);
if(debug)
click_chatter("[SrcMab:solve_22] A=[%s;%s], b=[%s], determinant %s, res of size %d [%s]", print_vector(A[0]).c_str(), print_vector(A[1]).c_str(),
print_vector(b).c_str(), String(detA).c_str(), res.size(), print_vector(res).c_str());
return res;
}
static Vector<uint32_t> flow_info_vector_to_rate_vector(Vector<FlowInfoLB> fi_vec) {
Vector<uint32_t> rate_vec;
for(int i=0;i<fi_vec.size();i++)
rate_vec.push_back(fi_vec[i].rate);
return rate_vec;
}
static Vector<FlowInfoLB> rate_vector_to_flow_info_vector(Vector<uint32_t> rate_vec, int factor) {
Vector<FlowInfoLB> fi_vec;
Timestamp now=Timestamp::now();
for(int i=0;i<rate_vec.size();i++)
fi_vec.push_back(FlowInfoLB(factor*int_divide(rate_vec[i],1000), now));
return fi_vec;
}
template<typename T>
static Vector<T> vector_scaling(Vector<T> rate_vec, int mult_scale, int div_scale) {
Vector<T> res;
for(int i=0;i<rate_vec.size();i++) {
res.push_back(int_divide((rate_vec[i])*((T) mult_scale),(T) div_scale));
}
return res;
}
template<typename T>
static Vector<uint64_t> vector_scaling64(Vector<T> rate_vec, int mult_scale, int div_scale) {
Vector<uint64_t> res;
for(int i=0;i<rate_vec.size();i++) {
res.push_back(int_divide(((uint64_t) rate_vec[i])*((uint64_t) mult_scale),(uint32_t) div_scale));
}
return res;
}
static void insert_in_alpha_mat(HashTable<LinkRouting, Vector<uint64_t> > *alpha_mat, LinkRouting link, Vector<uint64_t> alphas) {
Vector<LinkRouting> to_remove;
for(HashTable<LinkRouting, Vector<uint64_t> >::iterator it=alpha_mat->begin();it;++it) {
if(is_vector_min(alphas, it.value())) // alphas is smaller than one alpha already there, do not add it
return;
else if(is_vector_min(it.value(), alphas)) // alphas is greater: remove old one
to_remove.push_back(it.key());
}
for(int i=0;i<to_remove.size();i++)
alpha_mat->erase(to_remove[i]);
// if we have not returned, it means that alphas should be added
alpha_mat->set(link, alphas);
}
};
//#ifdef __linux__
inline void
SrcMab::packet_memory_barrier(Packet * volatile &packet)
{
__asm__ volatile("" : : "m" (packet), "m" (_head), "m" (_tail));
}
//#endif
CLICK_ENDDECLS
#endif

Event Timeline