Page MenuHomec4science

updateackmab.cc
No OneTemporary

File Metadata

Created
Sat, Jul 5, 04:15

updateackmab.cc

#include <click/config.h>
#include "util.hh"
#include "updateackmab.hh"
#define DEBUG_CHATTER(arg, ...) do { if (_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
#define VERB_DEBUG_CHATTER(arg, ...) do { if (_verb_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
CLICK_DECLS
UpdateAckMab::UpdateAckMab() : _timer(this), _nb_flows_timer(this)
{
}
UpdateAckMab::~UpdateAckMab()
{
}
int UpdateAckMab::configure(Vector<String> &conf, ErrorHandler *errh){
_debug = false;
_verb_debug = false;
String meas_name_wifi;
String meas_name_plc = "";
String neighbors_name;
String make_burst_name = "";
String make_burst_wifi_name = "";
String make_burst_plc_name = "";
#ifdef CLICK_USERLEVEL
String td_wifi_name = "";
String td_plc_name = "";
String from_device_moni_name = "";
String from_device_plc_name = "";
#endif
_min_burst = 50;
_min_burst_wifi = 100;
_min_burst_plc = 200;
_flow_bdcst_freq = 5;
_link_is_src = true;
_burst_one = true;
_nb_flows_int1 = 0;
_nb_flows_int2 = 0;
_new_flow_id = 0u;
String queue_plc_burst_name, queue_wifi_burst_name, queue_plc_name, queue_wifi_name;
if(Args(this, errh).bind(conf)
.read_m("ADDR_WIFI", _addr_wifi)
.read_m("MEASUREMENTS_WIFI", meas_name_wifi)
.read_m("NEIGHBORS", neighbors_name)
.read("ADDR_PLC", _addr_plc)
.read("MEASUREMENTS_PLC", meas_name_plc)
.read("DEBUG", _debug)
.read("VERB_DEBUG", _verb_debug)
.read("MAKE_BURST", make_burst_name)
.read("MIN_BURST", _min_burst)
.read("MAKE_BURST_WIFI", make_burst_wifi_name)
.read("MIN_BURST_WIFI", _min_burst_wifi)
.read("MAKE_BURST_PLC", make_burst_plc_name)
.read("MIN_BURST_PLC", _min_burst_plc)
#ifdef CLICK_USERLEVEL
.read("TODEVICE_WIFI", td_wifi_name)
.read("TODEVICE_PLC", td_plc_name)
.read("FROMDEVICE_MONI", from_device_moni_name)
.read("FROMDEVICE_PLC", from_device_plc_name)
#endif
.read("LINK_IS_SRC", _link_is_src) // indicate if source of a link computes airtime for the link (else, destination)
.read("FLOW_BROADCAST_FREQUENCY", _flow_bdcst_freq)
.read("QUEUE_TRAFFIC_BURST_PLC", queue_plc_burst_name)
.read("QUEUE_TRAFFIC_BURST_WIFI", queue_wifi_burst_name)
.read("QUEUE_TRAFFIC_PLC", queue_plc_name)
.read("QUEUE_TRAFFIC_WIFI", queue_wifi_name)
.complete() < 0)
return -1;
String compound_name;
_measurements_wifi = (MeasurementsWifi*) router()->find(meas_name_wifi, compound_name, errh);
if(_measurements_wifi == 0 || _measurements_wifi->cast("MeasurementsWifi") == 0) {
return errh->error("%s%s:%d is not a MeasurementsWifi element.",
compound_name.c_str(), meas_name_wifi.c_str(), _measurements_wifi);
}
if(meas_name_plc != "") {
_measurements_plc = (MeasurementsPLC*) router()->find(meas_name_plc, compound_name, errh);
if(_measurements_plc == 0 || _measurements_plc->cast("MeasurementsPLC") == 0) {
return errh->error("%s%s:%d is not a MeasurementsPLC element.",
compound_name.c_str(), meas_name_plc.c_str(), _measurements_plc);
}
}
else
_measurements_plc = 0;
_neighbors = (Neighbors*) router()->find(neighbors_name, compound_name, errh);
if(_neighbors == 0 || _neighbors->cast("Neighbors") == 0) {
return errh->error("%s%s:%d is not a Neighbors element.",
compound_name.c_str(), neighbors_name.c_str(), _neighbors);
}
if(make_burst_name != "") {
_make_burst_elmt = (MakeBurst*)router()->find(make_burst_name, "", errh);
if(_make_burst_elmt != 0 && _make_burst_elmt->cast("MakeBurst") == 0) {
return errh->error("%s exists but is not an MakeBurst.", make_burst_name.c_str());
}
}
if(make_burst_wifi_name != "") {
_make_burst_wifi_elmt = (MakeBurst*)router()->find(make_burst_wifi_name, "", errh);
if(_make_burst_wifi_elmt != 0 && _make_burst_wifi_elmt->cast("MakeBurst") == 0) {
return errh->error("%s exists but is not an MakeBurst.", make_burst_wifi_name.c_str());
}
}
if(make_burst_plc_name != "") {
_make_burst_plc_elmt = (MakeBurst*)router()->find(make_burst_plc_name, "", errh);
if(_make_burst_plc_elmt != 0 && _make_burst_plc_elmt->cast("MakeBurst") == 0) {
return errh->error("%s exists but is not an MakeBurst.", make_burst_plc_name.c_str());
}
}
#ifdef CLICK_USERLEVEL
if(td_wifi_name != "") {
_td_wifi = (ToDevice *)router()->find(td_wifi_name, "", errh);
if(_td_wifi != 0 && _td_wifi->cast("ToDevice") == 0) {
return errh->error("%s exists but is not a ToDevice.", td_wifi_name.c_str());
}
}
if(td_plc_name != "") {
_td_plc = (ToDevice*)router()->find(td_plc_name, "", errh);
if(_td_plc != 0 && _td_plc->cast("ToDevice") == 0) {
return errh->error("%s exists but is not a ToDevice.", td_plc_name.c_str());
}
}
if(from_device_moni_name != "") {
_fd_moni = (FromDevice*)router()->find(from_device_moni_name, "", errh);
if(_fd_moni != 0 && _fd_moni->cast("FromDevice") == 0) {
return errh->error("%s exists but is not a FromDevice.", from_device_moni_name.c_str());
}
}
if(from_device_plc_name != "") {
_fd_plc = (FromDevice*)router()->find(from_device_plc_name, "", errh);
if(_fd_plc != 0 && _fd_plc->cast("FromDevice") == 0) {
return errh->error("%s exists but is not a FromDevice.", from_device_plc_name.c_str());
}
}
if(_td_wifi)
click_chatter("UpdateAckMab configured with ToDevice for Wifi %s", td_wifi_name.c_str());
else
click_chatter("UpdateAckMab configured with no ToDevice for Wifi");
if(_td_plc)
click_chatter("UpdateAckMab configured with ToDevice for PLC %s", td_plc_name.c_str());
else
click_chatter("UpdateAckMab configured with no ToDevice for PLC");
if(_fd_moni)
click_chatter("UpdateAckMab configured with FromDevice for moni %s", from_device_moni_name.c_str());
else
click_chatter("UpdateAckMab configured with no FromDevice for moni");
if(_fd_plc)
click_chatter("UpdateAckMab configured with FromDevice for PLC %s", from_device_plc_name.c_str());
else
click_chatter("UpdateAckMab configured with no FromDevice for PLC");
#endif
if(queue_plc_name != "") {
_queue_plc = (FullNoteQueue*) router()->find(queue_plc_name, compound_name, errh);
if(_queue_plc == 0 || _queue_plc->cast("FullNoteQueue") == 0) {
return errh->error("%s%s:%d is not a FullNoteQueue element.",
compound_name.c_str(), queue_plc_name.c_str(), _queue_plc);
}
click_chatter("UpdateAckMab configured with Queue PLC %s", _queue_plc->name().c_str());
}
if(queue_wifi_name != "") {
_queue_wifi = (FullNoteQueue*) router()->find(queue_wifi_name, compound_name, errh);
if(_queue_wifi == 0 || _queue_wifi->cast("FullNoteQueue") == 0) {
return errh->error("%s%s:%d is not a FullNoteQueue element.",
compound_name.c_str(), queue_wifi_name.c_str(), _queue_wifi);
}
click_chatter("UpdateAckMab configured with Queue Wifi %s", _queue_wifi->name().c_str());
}
if(queue_plc_burst_name != "") {
_queue_plc_burst = (FullNoteQueue*) router()->find(queue_plc_burst_name, compound_name, errh);
if(_queue_plc_burst == 0 || _queue_plc_burst->cast("FullNoteQueue") == 0) {
return errh->error("%s%s:%d is not a FullNoteQueue element.",
compound_name.c_str(), queue_plc_burst_name.c_str(), _queue_plc_burst);
}
click_chatter("UpdateAckMab configured with Queue PLC burst %s", _queue_plc_burst->name().c_str());
}
if(queue_wifi_burst_name != "") {
_queue_wifi_burst = (FullNoteQueue*) router()->find(queue_wifi_burst_name, compound_name, errh);
if(_queue_wifi_burst == 0 || _queue_wifi_burst->cast("FullNoteQueue") == 0) {
return errh->error("%s%s:%d is not a FullNoteQueue element.",
compound_name.c_str(), queue_wifi_burst_name.c_str(), _queue_wifi_burst);
}
click_chatter("UpdateAckMab configured with Queue Wifi burst %s", _queue_wifi_burst->name().c_str());
}
_timer.initialize(this);
_nb_flows_timer.initialize(this);
_nb_flows_timer.schedule_after_sec(_flow_bdcst_freq);
return 0;
}
int
UpdateAckMab::initialize(ErrorHandler *) {
return 0;
}
void
UpdateAckMab::push(int port, Packet *p_in) {
_now.assign_now();
if(port == 2) { // flow information broadcast message
flow_broadcast *flow_info = (flow_broadcast *) (p_in->data()+sizeof(acne_header));
Vector<uint32_t> flow_ids_vect_debug;
int nb_flows = (int) flow_info->_nb_flow_ids;
for(int i=0;i<nb_flows;i++) {
uint32_t flow_id = flow_info->_flow_ids[i];
if(flow_info->_interface == INTERFACE1) {
_flow_table_int1.set(flow_id, 1);
}
else if(flow_info->_interface == INTERFACE2) {
_flow_table_int2.set(flow_id, 1);
}
flow_ids_vect_debug.push_back(flow_id);
}
if(flow_info->_new_flow > 0) {
if(!_flow_last_update.get(flow_info->_new_flow)) // means that there is a new flow around that has not been counted yet
_nb_flows_correction = 1;
}
if(_verb_debug) {
String int_str;
if(flow_info->_interface == INTERFACE1)
int_str = "WIFI";
else
int_str = "PLC";
click_chatter("[UpdateAckMab %s] Got flow broadcast message from %s (int %s) with %d flows (%s)", _now.unparse().c_str(),
flow_info->_src.unparse().c_str(), int_str.c_str(), nb_flows, print_vector(flow_ids_vect_debug).c_str());
}
}
else {
acne_header *acne_hdr = (acne_header *) p_in->data();
_last_active = _now;
if(!_timer.scheduled())
_timer.schedule_after_sec(10);
if (acne_hdr->_type == REGULAR_TRAFFIC || acne_hdr->_type == DUMMY_TRAFFIC_MAB || acne_hdr->_type == PROTO_IP_TEST) {
if(acne_hdr->_id_rate_mab == 0) {
VERB_DEBUG_CHATTER("[UpdateAckMab] Transmitting packet of type %s, id 0, on port %d", String(acne_hdr->_type).c_str(),
port);
output(port).push(p_in);
return;
}
// acne_hdr->_id_rate_mab != 0
if(acne_hdr->_id_rate_mab < _current_id[acne_hdr->_route])
DEBUG_CHATTER("[UpdateAckMab %s] WARNING: receiving out-of-order packets (current id %d, receiving %d)",
Timestamp::now().unparse().c_str(), _current_id[acne_hdr->_route], acne_hdr->_id_rate_mab);
// regular traffic
bool is_last = acne_hdr->_is_last;
if (!(_control_message_received[acne_hdr->_id_rate_mab].get(acne_hdr->_route) > 1)) {
_route_to_rate[acne_hdr->_route][acne_hdr->_id_rate_mab]+=p_in->length();
}
if(is_last) {
LastAddrBytes me_last_bytes_wifi = LastAddrBytes(_addr_wifi);
LastAddrBytes me_last_bytes_plc = LastAddrBytes(_addr_plc);
LastAddrBytes dst_last_bytes = LastAddrBytes(acne_hdr->_route._dst);
if(!(dst_last_bytes == me_last_bytes_wifi || (_addr_plc != EtherAddress() && dst_last_bytes == me_last_bytes_plc))) {
DEBUG_CHATTER("[UpdateAckMab %s] Received last message for id %d on route %s, set burst to 1", _now.unparse().c_str(), acne_hdr->_id_rate_mab, acne_hdr->_route.unparse().c_str());
set_bursts(true); // set burst to 1
}
// reset queues (at source only to avoid killing packets counted in airtime)
//if(acne_hdr->_hop==0)
//reset_queues();
}
output(port).push(p_in);
/*
if(acne_hdr->_id_rate_mab == 0) {
if(!_burst_one) {
_now.assign_now();
click_chatter("[UpdateAckMab %s] WARNING: burst is not 1 whereas exploiting", _now.unparse().c_str());
set_bursts(true);
}
if(_fd_moni && _fd_moni->get_active()) {
DEBUG_CHATTER("[UpdateAckMab] WARNING: Moni FromDevice was still active, now inactive");
_fd_moni->set_active(false,false);
}
send_faifa_request(HPAV_SC_DISABLE);
}
*/
return;
}
if(acne_hdr->_type == MAB_ACK) {
// compute airtime and update gamma's field
mab_ack *ack = (mab_ack *) (p_in->data()+sizeof(acne_header));
if(_link_is_src) {
LastAddrBytes me_last_bytes_wifi = LastAddrBytes(_addr_wifi);
LastAddrBytes me_last_bytes_plc = LastAddrBytes(_addr_plc);
LastAddrBytes dst_last_bytes = LastAddrBytes(ack->_target_route._dst);
if(dst_last_bytes == me_last_bytes_wifi || (_addr_plc != EtherAddress() && dst_last_bytes == me_last_bytes_plc)) {
// destination, just transmit ack
output(port).push(p_in);
return;
}
}
else {
if(ack->_current_index_in_route < 0) { // means that we are the source of the route: push to output 2
LastAddrBytes src_last_bytes = LastAddrBytes(ack->_target_route._src);
LastAddrBytes me_last_bytes_wifi = LastAddrBytes(_addr_wifi);
LastAddrBytes me_last_bytes_plc = LastAddrBytes(_addr_plc);
if(src_last_bytes == me_last_bytes_wifi || (_addr_plc != EtherAddress() && src_last_bytes == me_last_bytes_plc)) {
DEBUG_CHATTER("[UpdateAckMab] Push ack with id %d to SrcMab.", acne_hdr->_id_rate_mab);
output(2).push(p_in);
}
else { // should not happen
_now.assign_now();
if(_addr_plc != EtherAddress())
DEBUG_CHATTER("[UpdateAckMab %s] WARNING: index of ack is 0 whereas node is not source of the route; I'm %s or %s, src is %s",
_now.unparse().c_str(), me_last_bytes_wifi.unparse().c_str(), me_last_bytes_plc.unparse().c_str(),
src_last_bytes.unparse().c_str());
else
DEBUG_CHATTER("[UpdateAckMab %s] WARNING: index of ack is 0 whereas node is not source of the route; I'm %s, src is %s",
_now.unparse().c_str(), me_last_bytes_wifi.unparse().c_str(), src_last_bytes.unparse().c_str());
p_in->kill();
}
return;
}
}
if(acne_hdr->_id_rate_mab != 0) {
String interface_str;
Interface interface;
EtherAddress next_hop_eth = _neighbors->get_eth_addr_from_last_bytes(&(ack->_target_route._route[NB_BYTES_HOP*ack->_current_index_in_route]));
Measurements *measurements = get_measurements(next_hop_eth, interface_str, interface);
if(measurements == 0) {
LastAddrBytes bytes = LastAddrBytes(&acne_hdr->_route._route[NB_BYTES_HOP*acne_hdr->_hop]);
DEBUG_CHATTER("[UpdateAckMab] WARNING: Could not get measurement (MAB_ACK) for address %s (%s, hop %d in route %s).",
bytes.unparse().c_str(), next_hop_eth.unparse().c_str(), ack->_current_index_in_route,
ack->_target_route.unparse().c_str());
p_in->kill();
return;
}
IntIdCouple int_id = IntIdCouple(acne_hdr->_id_rate_mab, interface, FlowTuple(ack->_flow_src, ack->_flow_dst));
gamma_type silent_slot_airtime;
if(_is_diff_ts_ok[ack->_target_route].get(acne_hdr->_id_rate_mab)) {
int diff_us = usec_timestamp(_route_to_diff_ts[ack->_target_route][acne_hdr->_id_rate_mab]);
if(diff_us > 0) {
/*
uint32_t route_rate = 1000*(_route_to_rate[ack->_target_route][acne_hdr->_id_rate_mab]/(diff_us/1000));
if(route_rate > ack->_route_rate) {
DEBUG_CHATTER("[UpdateAckMab %s] New route rate on ack for route %s, id %d, is now %d", _now.unparse().c_str(),
ack->_target_route.unparse().c_str(), acne_hdr->_id_rate_mab, route_rate);
ack->_route_rate = route_rate;
}
*/
if(_busytime_computed.get(int_id))
ack->_gammas[ack->_current_index_in_route] = mymin((gamma_type) 2*SATURATED_THRES,_current_busytime[int_id]);
else {
ack->_gammas[ack->_current_index_in_route] = -1; // invalid gamma
DEBUG_CHATTER("[UpdateAckMab] WARNING: no airtime for id %d, route %s", acne_hdr->_id_rate_mab, ack->_target_route.unparse().c_str());
}
if(_silent_time_computed.get(int_id))
silent_slot_airtime = _silent_busytime[int_id];
else {
silent_slot_airtime = -1; // invalid gamma
DEBUG_CHATTER("[UpdateAckMab] WARNING: no silent slot airtime for id %d, route %s", acne_hdr->_id_rate_mab, ack->_target_route.unparse().c_str());
}
}
else {
ack->_gammas[ack->_current_index_in_route] = -1; // invalid gamma
DEBUG_CHATTER("[UpdateAckMab] WARNING: diff timestamp is 0");
}
}
else {
ack->_gammas[ack->_current_index_in_route] = -1; // invalid gamma
DEBUG_CHATTER("[UpdateAckMab] WARNING: Time difference has not been computed for id %d, route %s (hash %d).",
acne_hdr->_id_rate_mab, ack->_target_route.unparse().c_str(), ack->_target_route.hashcode());
}
if(_debug) {
if(!_ack_printed[acne_hdr->_id_rate_mab].get(ack->_target_route)) {
_ack_printed[acne_hdr->_id_rate_mab][ack->_target_route] = 1;
_now.assign_now();
DEBUG_CHATTER("[UpdateAckMab %s] Update ack for route %s, id %d, airtime on %s (next hop %s) is %s (silent slot airtime %s), index %d", _now.unparse().c_str(),
ack->_target_route.unparse().c_str(), acne_hdr->_id_rate_mab, interface_str.c_str(), next_hop_eth.unparse().c_str(),
String(ack->_gammas[ack->_current_index_in_route]).c_str(), String(silent_slot_airtime).c_str(), (int) ack->_current_index_in_route);
}
}
}
ack->_current_index_in_route = (ack->_current_index_in_route-1);
if(_link_is_src && ack->_current_index_in_route < 0) { // means that we are the source of the route: push to output 2
LastAddrBytes src_last_bytes = LastAddrBytes(ack->_target_route._src);
LastAddrBytes me_last_bytes_wifi = LastAddrBytes(_addr_wifi);
LastAddrBytes me_last_bytes_plc = LastAddrBytes(_addr_plc);
if(src_last_bytes == me_last_bytes_wifi || (_addr_plc != EtherAddress() && src_last_bytes == me_last_bytes_plc)) {
DEBUG_CHATTER("[UpdateAckMab] Push ack to SrcMab.");
output(2).push(p_in);
}
else { // should not happen
_now.assign_now();
if(_addr_plc != EtherAddress())
DEBUG_CHATTER("[UpdateAckMab %s] WARNING: index of ack is 0 whereas node is not source of the route; I'm %s or %s, src is %s",
_now.unparse().c_str(), me_last_bytes_wifi.unparse().c_str(), me_last_bytes_plc.unparse().c_str(),
src_last_bytes.unparse().c_str());
else
DEBUG_CHATTER("[UpdateAckMab %s] WARNING: index of ack is 0 whereas node is not source of the route; I'm %s, src is %s",
_now.unparse().c_str(), me_last_bytes_wifi.unparse().c_str(), src_last_bytes.unparse().c_str());
p_in->kill();
}
return;
}
output(port).push(p_in);
}
else if(acne_hdr->_type == NACK_ORDER) {
if(acne_hdr->_hop == acne_hdr->_route.number_hops()) {
// reached destination, transmit
output(2).push(p_in);
}
else {
output(port).push(p_in);
}
}
else if (acne_hdr->_type == MAB_CONTROL) {
mab_control *mab_ctrl = (mab_control *) (p_in->data() + sizeof(acne_header));
_now.assign_now();
if(acne_hdr->_id_rate_mab > 0)
_current_id.set(acne_hdr->_route, acne_hdr->_id_rate_mab);
uint8_t previous_max = mab_ctrl->_max_nb_flows;
FlowTuple flow = FlowTuple(mab_ctrl->_flow_src, mab_ctrl->_flow_dst);
uint32_t flow_id = flow.hashcode();
if(mab_ctrl->_first_message && !_flow_last_update.get(flow_id)) {
// if first message and this flow has not been counted, add one until timer fires
_nb_flows_correction = 1;
// schedule timer to send broadcast with new flow
_new_flow_id = flow_id;
_old_expiry = _nb_flows_timer.expiry();
_nb_flows_timer.schedule_now();
}
// get local number of flow
mab_ctrl->_max_nb_flows = mymax(mab_ctrl->_max_nb_flows, (uint8_t) (mymax(_nb_flows_int1, _nb_flows_int2) + _nb_flows_correction));
if(previous_max!=mab_ctrl->_max_nb_flows) {
DEBUG_CHATTER("[UpdateAckMab %s] Update max flow: was %u, is now %u (INT1: %d, INT2: %d). Update hash, was %u, is %u", _now.unparse().c_str(),
previous_max, mab_ctrl->_max_nb_flows, _nb_flows_int1, _nb_flows_int2, mab_ctrl->_flow_hash, _current_hash_flow);
mab_ctrl->_flow_hash = _current_hash_flow;
}
String interface_str;
Interface interface;
EtherAddress addr_meas;
LastAddrBytes bytes;
bool measure = true;
bool is_dest = false;
LastAddrBytes me_last_bytes_wifi = LastAddrBytes(_addr_wifi);
LastAddrBytes me_last_bytes_plc = LastAddrBytes(_addr_plc);
LastAddrBytes dst_last_bytes = LastAddrBytes(acne_hdr->_route._dst);
if(dst_last_bytes == me_last_bytes_wifi || (_addr_plc != EtherAddress() && dst_last_bytes == me_last_bytes_plc))
is_dest = true;
if(_link_is_src) {
if(is_dest) {
// destination, do nothing
DEBUG_CHATTER("[UpdateAckMab %s] This control message is for me, just transmit it.", Timestamp::now().unparse().c_str());
output(port).push(p_in);
return;
}
else {
bytes = LastAddrBytes(&acne_hdr->_route._route[NB_BYTES_HOP*acne_hdr->_hop]);
addr_meas = _neighbors->get_eth_addr_from_last_bytes(&(acne_hdr->_route._route[NB_BYTES_HOP*acne_hdr->_hop]));
}
}
else {
if(acne_hdr->_hop == 0) { // we are at the source of the flow, do not measure
measure = false;
}
else {
bytes = LastAddrBytes(&acne_hdr->_route._route[NB_BYTES_HOP*(acne_hdr->_hop-1)]);
addr_meas = _neighbors->get_eth_addr_from_last_bytes(&(acne_hdr->_route._route[NB_BYTES_HOP*(acne_hdr->_hop-1)]));
}
}
Measurements *measurements;
if(measure) {
measurements = get_measurements(addr_meas, interface_str, interface);
if(measurements == 0) {
DEBUG_CHATTER("[UpdateAckMab] WARNING: Could not get measurement (MAB_CTRL) for address %s (%s, hop %d in route %s).",
bytes.unparse().c_str(), addr_meas.unparse().c_str(), _link_is_src ? acne_hdr->_hop : acne_hdr->_hop-1,
acne_hdr->_route.unparse().c_str());
p_in->kill();
return;
}
}
// save flow information
if(measure) {
_flow_table_loc[flow_id].set_addresses(mab_ctrl->_flow_src, mab_ctrl->_flow_dst);
if(measurements == _measurements_wifi) {
_flow_table_int1.set(flow_id, 1);
_flow_table_loc[flow_id].set_int1_use(true);
}
else if(measurements == _measurements_plc) {
_flow_table_int2.set(flow_id, 1);
_flow_table_loc[flow_id].set_int2_use(true);
}
}
IntIdCouple int_id = IntIdCouple(acne_hdr->_id_rate_mab, interface, flow);
if(mab_ctrl->_type == MAB_CTRL_BEGIN) { // begin, save timestamp: set _ts to _route_to_diff_ts
if(acne_hdr->_id_rate_mab>0) {
if (_control_message_received[acne_hdr->_id_rate_mab].get(acne_hdr->_route) > 0) {
DEBUG_CHATTER("[UpdateAckMab %s] Already received BEGIN control message for id %d, route %s, just transmit it",
_now.unparse().c_str(), acne_hdr->_id_rate_mab, acne_hdr->_route.unparse().c_str());
output(port).push(p_in);
return;
}
}
_is_exploring.set(flow,1);
DEBUG_CHATTER("[UpdateAckMab %s] Received BEGIN control packet for id %d, route %s, flow %s, hop %u, measure=%d, addr_meas %s (INT %d) (timestamp %s)...",
_now.unparse().c_str(), acne_hdr->_id_rate_mab, acne_hdr->_route.unparse().c_str(), flow.unparse().c_str(),
acne_hdr->_hop, measure, addr_meas.unparse().c_str(), interface, mab_ctrl->_ts.unparse().c_str());
_control_message_received[acne_hdr->_id_rate_mab][acne_hdr->_route] = 1;
// reset queues (they should be empty if SrcMab's small_delta large enough)
reset_queues();
if(measure) {
DEBUG_CHATTER("[UpdateAckMab] Setting timestamp %s for route %s, id %d...", mab_ctrl->_ts.unparse().c_str(), acne_hdr->_route.unparse().c_str(), acne_hdr->_id_rate_mab);
_route_to_diff_ts[acne_hdr->_route].set(acne_hdr->_id_rate_mab, mab_ctrl->_ts);
// initialize airtime (check if not already initialized)
if(!_initialized.get(int_id) || acne_hdr->_id_rate_mab == 0) {
_initialized[int_id] = 1;
if(_debug) {
_now.assign_now();
click_chatter("[UpdateAckMab %s] Initializing measurements...", _now.unparse().c_str());
}
// initialize all Measurements
#ifdef CLICK_USERLEVEL
if(measurements == _measurements_wifi && _fd_moni) {
DEBUG_CHATTER("[UpdateAckMab] FromDevice moni now active");
_fd_moni->set_active(true,true);
}
#endif
if(measurements == _measurements_plc) {
DEBUG_CHATTER("[UpdateAckMab] Enabling Sniffer mode");
set_faifa(true);
#ifdef CLICK_USERLEVEL
if(_fd_plc) {
DEBUG_CHATTER("[UpdateAckMab] FromDevice PLC now active");
_fd_plc->set_active(true,true);
}
#endif
}
measurements->init_airtime(0,(int)int_id.hashcode());
}
}
output(port).push(p_in);
if(!is_dest)
set_bursts(false); // burst must be set to _min_burst
}
else if(mab_ctrl->_type == MAB_CTRL_END) { // end
if(acne_hdr->_id_rate_mab>0) {
if (_control_message_received[acne_hdr->_id_rate_mab].get(acne_hdr->_route) > 1) {
DEBUG_CHATTER("[UpdateAckMab %s] Already received END control message for id %d, route %s, just transmit it",
_now.unparse().c_str(), acne_hdr->_id_rate_mab, acne_hdr->_route.unparse().c_str());
output(port).push(p_in);
return;
}
}
_is_exploring.set(flow,0);
DEBUG_CHATTER("[UpdateAckMab %s] Received END control packet for id %d, route %s, flow %s, hop %u, measure=%d, addr_meas %s (INT %d) (timestamp %s). Has received %d...",
_now.unparse().c_str(), acne_hdr->_id_rate_mab, acne_hdr->_route.unparse().c_str(), flow.unparse().c_str(),
acne_hdr->_hop, measure, addr_meas.unparse().c_str(), interface, mab_ctrl->_ts.unparse().c_str(),
_route_to_rate[acne_hdr->_route][acne_hdr->_id_rate_mab]);
_control_message_received[acne_hdr->_id_rate_mab][acne_hdr->_route] = 2;
_now.assign_now();
// burst should already been set to 1, check that
if(!_burst_one && !is_dest && acne_hdr->_id_rate_mab > 0) {
if(_route_to_rate[acne_hdr->_route][acne_hdr->_id_rate_mab] > 0 && _debug)
DEBUG_CHATTER("[UpdateAckMab %s] WARNING: burst is not one, last packet lost?", _now.unparse().c_str());
set_bursts(true);
}
if(measure) {
// save timestamp: set diff between begin and end to _route_to_diff_ts
if(_route_to_diff_ts[acne_hdr->_route].get(acne_hdr->_id_rate_mab) > Timestamp()) {
_route_to_diff_ts[acne_hdr->_route].set(acne_hdr->_id_rate_mab, (mab_ctrl->_ts - _route_to_diff_ts[acne_hdr->_route][acne_hdr->_id_rate_mab]));
_is_diff_ts_ok[acne_hdr->_route][acne_hdr->_id_rate_mab] = 1;
DEBUG_CHATTER("[UpdateAckMab] Setting time difference for route %s (hash %d), id %d, is %s (rate received %d)...", acne_hdr->_route.unparse().c_str(),
acne_hdr->_route.hashcode(), acne_hdr->_id_rate_mab, _route_to_diff_ts[acne_hdr->_route].get(acne_hdr->_id_rate_mab).unparse().c_str(),
(1000*_route_to_rate[acne_hdr->_route][acne_hdr->_id_rate_mab])/_route_to_diff_ts[acne_hdr->_route].get(acne_hdr->_id_rate_mab).msecval());
}
else {
DEBUG_CHATTER("[UpdateAckMab] WARNING: could not compute time difference for id %d, route %s (hash %d)",
acne_hdr->_id_rate_mab, acne_hdr->_route.unparse().c_str(), acne_hdr->_route.hashcode());
}
// compute busy time
if(!_busytime_computed.get(int_id) || acne_hdr->_id_rate_mab == 0) {
String meas_str = "";
int nb_flows;
if(measurements == _measurements_wifi) {
meas_str = "WiFi";
nb_flows = _nb_flows_int1;
}
else if(measurements == _measurements_plc) {
meas_str = "PLC";
nb_flows = _nb_flows_int2;
}
nb_flows = mymax(nb_flows,1);
Timestamp delay_first_packet = measurements->get_delay_first_packet((int)int_id.hashcode());
measurements->compute_airtime((int)int_id.hashcode());
gamma_type busy_time = measurements->get_busytime_us((int)int_id.hashcode());
int diff_us = usec_timestamp(_route_to_diff_ts[acne_hdr->_route][acne_hdr->_id_rate_mab]);
// check if another flow is exploring
// TODO: a better solution would be to check on MAB_BEGIN and send a control message to delay exploration if another flow explores
for(HashTable<FlowTuple, int>::iterator it=_is_exploring.begin();it.live();it++) {
if(it.key()==flow)
continue;
if(it.value() > 0) {
// means that it.key() is exploring
diff_us = 0; // will make results invalid
_invalid_exploring.set(it.key(),1);
DEBUG_CHATTER("[UpdateAckMab %s] Two flows exploring (current %s and %s), set busy time to -1", _now.unparse().c_str(),
flow.unparse().c_str(), it.key().unparse().c_str());
}
}
// check if current explore is valid (might not be if another flow was exploring)
if(_invalid_exploring[flow] > 0) {
diff_us = 0;
_invalid_exploring.set(flow,0);
DEBUG_CHATTER("[UpdateAckMab %s] Another flow than current %s was exploring, set busy time to -1", _now.unparse().c_str(),
flow.unparse().c_str());
}
if(busy_time >= 0) {
if(acne_hdr->_id_rate_mab > 0) {
_silent_busytime.set(int_id, _current_busytime[IntIdCouple(0,interface, flow)]);
_silent_time_computed.set(int_id,1);
gamma_type busy_silent_diff;
if(diff_us > 0 || _silent_busytime[int_id]==-1) {
// if traffic has been received with more than one burst (200*1400=280000 -> 300000), check that first packet has been measured soon enough (otherwise, likely a bug)
if(_route_to_rate[acne_hdr->_route][acne_hdr->_id_rate_mab] < 300000 || delay_first_packet.usecval() < 3*(diff_us/4)) {
if (busy_time/diff_us > _silent_busytime[int_id]) {
if(nb_flows == 1) { // remove external load and compute based on remaining resources
if(_silent_busytime[int_id] > 50 && _silent_busytime[int_id] < (8*GAMMA_SCALE)/10 && busy_time/diff_us < (9*GAMMA_SCALE)/10) // silent slot is always non zero, count only if significant enough (and not saturated)
busy_silent_diff = mymin((gamma_type) GAMMA_SCALE,((busy_time/diff_us - _silent_busytime[int_id])*GAMMA_SCALE)/(GAMMA_SCALE-_silent_busytime[int_id]));
else // no external load
busy_silent_diff = busy_time/diff_us;
}
else // when other flows are present, fair share among the flows (how to also consider external load? should count differently the busy-times...)
if (_silent_busytime[int_id] > (8*GAMMA_SCALE)/10) // saturated: do not subtract
busy_silent_diff = busy_time/diff_us;
else
busy_silent_diff = (busy_time/diff_us - _silent_busytime[int_id])*nb_flows;
}
else {
if (_silent_busytime[int_id] > (8*GAMMA_SCALE)/10) // saturated: do not subtract
busy_silent_diff = busy_time/diff_us;
else
busy_silent_diff = 0;
}
_current_busytime.set(int_id, busy_silent_diff);
}
}
else
_current_busytime.set(int_id, -1);
}
else { // silent slot
if(diff_us > 0)
_current_busytime.set(int_id, busy_time/diff_us);
else {
DEBUG_CHATTER("[UpdateAckMab] Could not compute silent slot airtime.");
_current_busytime.set(int_id,0);
}
}
}
_busytime_computed.set(int_id,1);
DEBUG_CHATTER("[UpdateAckMab %s] Compute airtime on %s (addr_meas %s, %s) for route %s, id %d (silent slot airtime %s): is %s (%d flows)...", _now.unparse().c_str(),
interface_str.c_str(), addr_meas.unparse().c_str(), meas_str.c_str(), acne_hdr->_route.unparse().c_str(), acne_hdr->_id_rate_mab,
acne_hdr->_id_rate_mab > 0 ? String(_silent_busytime[int_id]).c_str() : String(_current_busytime[int_id]).c_str(),
String(_current_busytime[int_id]).c_str(), nb_flows);
#ifdef CLICK_USERLEVEL
if(measurements == _measurements_wifi && _fd_moni) {
DEBUG_CHATTER("[UpdateAckMab] FromDevice moni now inactive");
_fd_moni->set_active(false,false);
}
#endif
if(measurements == _measurements_plc) {
//DEBUG_CHATTER("[UpdateAckMab] Disabling Sniffer mode");
//send_faifa_request(HPAV_SC_DISABLE);
#ifdef CLICK_USERLEVEL
if(_fd_plc) {
DEBUG_CHATTER("[UpdateAckMab] FromDevice PLC now inactive");
_fd_plc->set_active(true,false);
}
#endif
}
}
}
output(port).push(p_in);
}
else if (mab_ctrl->_type == FLOW_INFORMATION) {
_current_busytime.erase(IntIdCouple(0,interface, flow));
output(port).push(p_in);
}
else if(mab_ctrl->_type == TRIGGER_CLEAR) {
reset_queues();
output(port).push(p_in);
}
}
else if(acne_hdr->_type == SPECIAL_TRAFFIC || acne_hdr->_type == NACK_RETRANSMISSION) {
output(0).push(p_in); // send to DstMab to be sent with high priority
}
else
output(port).push(p_in);
}
}
Measurements *
UpdateAckMab::get_measurements(EtherAddress next_hop_eth, String &interface_str, Interface &interface) {
if(next_hop_eth == EtherAddress())
return 0;
if(is_wifi_addr(next_hop_eth)) {
interface_str = "WiFi";
interface = WIFI;
return _measurements_wifi;
}
else if (is_plc_addr(next_hop_eth)) {
interface_str = "PLC";
interface = PLC;
return _measurements_plc;
}
return 0;
}
void UpdateAckMab::reset_queues() {
DEBUG_CHATTER("[UpdateAckMab %s] Resetting queues.", Timestamp::now().unparse().c_str());
// reset Click queues. Ideally, would reset also driver's queues
if(_queue_plc)
_queue_plc->reset();
if(_queue_wifi)
_queue_wifi->reset();
if(_queue_plc_burst)
_queue_plc_burst->reset();
if(_queue_wifi_burst)
_queue_wifi_burst->reset();
}
void UpdateAckMab::run_timer(Timer *timer){
_now.assign_now();
VERB_DEBUG_CHATTER("[UpdateAckMab %s] Run timer %p", _now.unparse().c_str(), timer);
if (timer == &_nb_flows_timer) {
if(_new_flow_id == 0u) { // if new_flow non zero, then only send broadcast
// first: count number of flows, and clear tables (so that only active flows are considered)
int previous_int1 = _nb_flows_int1;
int previous_int2 = _nb_flows_int2;
_nb_flows_int1 = _flow_table_int1.size();
_nb_flows_int2 = _flow_table_int2.size();
_nb_flows_correction = 0;
_flow_last_update = HashTable<uint32_t, int>(_flow_table_int1);
for(HashTable<uint32_t, int>::iterator it = _flow_table_int2.begin();it.live();it++)
_flow_last_update[it.key()] = 1;
_current_hash_flow = 0;
for(HashTable<uint32_t, int>::iterator it = _flow_last_update.begin();it.live();it++) {
_current_hash_flow+=it.key();
}
if(previous_int1!=_nb_flows_int1 || previous_int2!=_nb_flows_int2)
DEBUG_CHATTER("[UpdateAckMab %s] New number of flows: %d (int1, was %d), %d (int2, was %d). Hash is %u",
_now.unparse().c_str(), _nb_flows_int1, previous_int1, _nb_flows_int2, previous_int2, _current_hash_flow);
_flow_table_int1.clear();
_flow_table_int2.clear();
}
// second: send broadcast flow informations
Vector<uint32_t> own_flows_int1;
Vector<uint32_t> own_flows_int2;
for(HashTable<uint32_t, FlowTuple>::iterator it=_flow_table_loc.begin();it.live();it++) {
if(it.value()._uses_int1_loc)
own_flows_int1.push_back(it.key());
if(it.value()._uses_int2_loc)
own_flows_int2.push_back(it.key());
}
for(int i=0;i<=1;i++) {
Vector<uint32_t> own_flows;
uint8_t interface;
EtherAddress addr;
uint8_t type;
if(i==0) {
own_flows = own_flows_int1;
interface = INTERFACE1;
addr = _addr_wifi;
type = FLOW_BROADCAST_INT1;
}
else {
own_flows = own_flows_int2;
interface = INTERFACE2;
addr = _addr_plc;
type = FLOW_BROADCAST_INT2;
}
if(!own_flows.empty()) {
WritablePacket *out = Packet::make(64, 0, sizeof(acne_header)+sizeof(flow_broadcast) + own_flows.size()*sizeof(uint32_t), 10);
out = out->push_mac_header(sizeof(click_ether));
memset(out->data(), 0, out->length());
click_ether *eth_hdr = (click_ether *) (out->data());
acne_header *acne_hdr = (acne_header *) (out->data()+sizeof(click_ether));
flow_broadcast *flow_bdcst = (flow_broadcast *) (out->data()+sizeof(click_ether)+sizeof(acne_header));
// Ethernet header
eth_hdr->ether_type = htons(ETHERTYPE_ACNE);
memcpy(eth_hdr->ether_shost, addr.data(), 6);
memcpy(eth_hdr->ether_dhost, EtherAddress::make_broadcast().data(), 6);
// acne header (only type is useful)
acne_hdr->_type = type;
// flow broadcast message
flow_bdcst->_nb_flow_ids = (uint8_t) own_flows.size();
flow_bdcst->_interface = interface;
flow_bdcst->_new_flow = _new_flow_id;
if(_neighbors)
flow_bdcst->_src = _neighbors->my_ip();
for(int f=0;f<own_flows.size();f++) {
flow_bdcst->_flow_ids[f] = own_flows[f];
}
VERB_DEBUG_CHATTER("[UpdateAckMab] Sending Flow information with %d flow ids on interface %d", own_flows.size(), i+1);
out->set_timestamp_anno(_now);
output(4).push(out);
}
}
_flow_table_loc.clear();
if(_new_flow_id == 0u)
_nb_flows_timer.schedule_after_sec(_flow_bdcst_freq);
else {
_new_flow_id = 0u;
_nb_flows_timer.schedule_at(_old_expiry);
_old_expiry = Timestamp();
}
}
else if (timer == &_timer) {
if((_now-_last_active).sec() >= 3600) {
DEBUG_CHATTER("[UpdateAckMab %s] 1 hour inactive, clearing", _now.unparse().c_str());
clear();
}
else {
_timer.schedule_after_sec(10);
for(HashTable<FormatedRoute, HashTable<uint16_t, Timestamp> >::iterator it = _route_to_diff_ts.begin();it!=_route_to_diff_ts.end();it++) {
for(HashTable<uint16_t, Timestamp>::iterator it2 = _route_to_diff_ts[it.key()].begin();it2!=_route_to_diff_ts[it.key()].end();it2++) {
if(_current_id[it.key()] > 255 && it2.key() < (_current_id[it.key()] - 255)) {
FormatedRoute froute = it.key();
FlowTuple flow = FlowTuple(_neighbors->get_ip(froute._src), _neighbors->get_ip(froute._dst));
DEBUG_CHATTER("[UpdateAckMab] Clearing for id %d (current is %d), route %s.", it2.key(), _current_id[it.key()], it.key().unparse().c_str());
_current_busytime.erase(IntIdCouple(it2.key(), WIFI, flow));
_current_busytime.erase(IntIdCouple(it2.key(), PLC, flow));
_busytime_computed.erase(IntIdCouple(it2.key(), WIFI, flow));
_busytime_computed.erase(IntIdCouple(it2.key(), PLC, flow));
_silent_busytime.erase(IntIdCouple(it2.key(), WIFI, flow));
_silent_busytime.erase(IntIdCouple(it2.key(), PLC, flow));
_silent_time_computed.erase(IntIdCouple(it2.key(), WIFI, flow));
_silent_time_computed.erase(IntIdCouple(it2.key(), PLC, flow));
_initialized.erase(IntIdCouple(it2.key(), WIFI, flow));
_initialized.erase(IntIdCouple(it2.key(), PLC, flow));
_route_to_diff_ts[it.key()].erase(it2.key());
_route_to_rate[it.key()].erase(it2.key());
_is_diff_ts_ok[it.key()].erase(it2.key());
_control_message_received.erase(it2.key());
_ack_printed.erase(it2.key());
}
}
}
}
}
}
void
UpdateAckMab::clear() {
_current_busytime.clear();
_busytime_computed.clear();
_silent_busytime.clear();
_silent_time_computed.clear();
_route_to_diff_ts.clear();
_is_diff_ts_ok.clear();
_current_id.clear();
_last_active.clear();
#ifdef CLICK_USERLEVEL
if(_fd_moni)
_fd_moni->set_active(true,true);
if(_fd_plc)
_fd_plc->set_active(true,true);
#endif
//send_faifa_request(HPAV_SC_DISABLE);
_control_message_received.clear();
_ack_printed.clear();
_initialized.clear();
_nb_flows_int1 = 1;
_nb_flows_int2 = 1;
set_bursts(true);
}
void
UpdateAckMab::send_faifa_request(uint8_t type) {
if(noutputs() < 4)
return;
const unsigned char dst[6] = {0x00, 0xB0, 0x52, 0x00, 0x00, 0x01};
//static_assert(Packet::default_headroom >= sizeof(click_ether));
WritablePacket *q = Packet::make(100, NULL, sizeof(click_ether) + sizeof(click_hp_av_header) + sizeof(click_sniffer_request), 0);
if (!q) {
click_chatter("[Neighbors] Cannot make packet for MM.");
return;
}
click_ether *e = (click_ether *) q->data();
q->set_ether_header(e);
memset(e->ether_shost, 0x00, 6);
memcpy(e->ether_dhost, dst,6);
e->ether_type = htons(ETHERTYPE_HP_AV);
click_sniffer_request *snif_req = (click_sniffer_request *) (q->data() + sizeof(click_ether));
snif_req->version = 0;
snif_req->MMType = htons(HPAV_MMTYPE_SNIFFER_REQ);
snif_req->control = type;
q->timestamp_anno().assign_now();
output(3).push(q);
}
void
UpdateAckMab::set_bursts(bool burst_one) {
if (burst_one == _burst_one)
return;
_now.assign_now();
_burst_one = burst_one;
String who_str;
if(_make_burst_elmt != 0) {
who_str+="mb ";
_make_burst_elmt->set_burst(burst_one ? 1 : _min_burst);
}
if(_make_burst_wifi_elmt != 0) {
who_str+="mb_w ";
_make_burst_wifi_elmt->set_burst(burst_one ? 1 : _min_burst_wifi);
}
if(_make_burst_plc_elmt != 0) {
who_str+="mb_p ";
_make_burst_plc_elmt->set_burst(burst_one ? 1 : _min_burst_plc);
}
#ifdef CLICK_USERLEVEL
if(_td_wifi != 0) {
who_str+="td_w ";
_td_wifi->set_burst(burst_one ? 1 : _min_burst_wifi);
}
if(_td_plc != 0) {
who_str+="td_p ";
_td_plc->set_burst(burst_one ? 1 : _min_burst_plc);
}
#endif
DEBUG_CHATTER("[UpdateAckMab %s] Setting bursts to %d for %s", _now.unparse().c_str(), burst_one ? 1 : _min_burst_plc, who_str.c_str());
}
void
UpdateAckMab::set_faifa(bool b) {
if(b)
send_faifa_request(HPAV_SC_ENABLE);
else
send_faifa_request(HPAV_SC_DISABLE);
}
int
UpdateAckMab::bool_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
UpdateAckMab *elmt = (UpdateAckMab *)e;
int arg;
if(!cp_integer(s, &arg))
return errh->error("Debug must be 0 or 1");
if (!(arg == 0 || arg == 1))
return errh->error("Debug must be 0 or 1");
if(((intptr_t) a) == 0) {
elmt->set_debug(arg==1);
}
else if(((intptr_t) a) == 1) {
if(arg==1)
elmt->clear();
}
else if(((intptr_t) a) == 2)
elmt->set_verb_debug(arg==1);
return 0;
}
int
UpdateAckMab::faifa_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
UpdateAckMab *elmt = (UpdateAckMab *)e;
int arg;
if(!cp_integer(s, &arg))
return errh->error("Arg must be 0 or 1");
if (!(arg == 0 || arg == 1))
return errh->error("Arg must be 0 or 1");
elmt->set_faifa(arg==1);
return 0;
}
int
UpdateAckMab::set_min_burst_handler(const String &s, Element *e, void *, ErrorHandler *errh) {
UpdateAckMab *elmt = (UpdateAckMab *)e;
uint32_t arg;
if(!cp_integer(s, &arg))
return errh->error("Min_burst must be an integer");
elmt->set_min_burst(arg);
return 0;
}
void UpdateAckMab::add_handlers() {
add_write_handler("debug", bool_handler, 0);
add_write_handler("reset", bool_handler, 1);
add_write_handler("verb_debug", bool_handler, 2);
add_write_handler("enable_faifa", faifa_handler, 0);
add_write_handler("set_min_burst", set_min_burst_handler, 0);
}
EXPORT_ELEMENT(UpdateAckMab)
CLICK_ENDDECLS

Event Timeline