Page MenuHomec4science

makeburst.cc
No OneTemporary

File Metadata

Created
Sat, Jul 5, 13:35

makeburst.cc

#include <click/config.h>
#include "makeburst.hh"
#define DEBUG_CHATTER(arg, ...) do { if (_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
#define VERB_DEBUG_CHATTER(arg, ...) do { if (_verb_debug) { click_chatter(arg, ## __VA_ARGS__);} } while (0)
CLICK_DECLS
MakeBurst::MakeBurst() : _q(0), _timeout_timer(this), _print_stats_timer(this)
{
}
MakeBurst::~MakeBurst()
{
}
int MakeBurst::configure(Vector<String> &conf, ErrorHandler *errh){
_debug = false;
_capacity = 2000;
_burst = 1;
_print_dropped = false;
_timeout = 0;
_timeout_tcp = 20;
_packets_sent = 0;
_count_bursts = 0;
_check_mab_id = false;
_print_stats = 0;
if(Args(this, errh).bind(conf)
.read("MAX_BURST", _capacity)
.read("BURST", _burst)
.read("TIMEOUT", _timeout)
.read("TIMEOUT_TCP", _timeout_tcp)
.read("DEBUG", _debug)
.read("VERB_DEBUG", _verb_debug)
.read("CHECK_MAB_ID", _check_mab_id)
.read("PRINT_STATS", _print_stats)
.complete() < 0)
return -1;
if(_capacity < _burst)
_capacity = _burst;
if(_check_mab_id) {
_next_queue = (FullNoteQueue *) output(0).element();
if (!_next_queue || !_next_queue->cast("Queue")) {
return errh->error("[MakeBurst] Could not get queue %p", _next_queue);
}
}
return 0;
}
int
MakeBurst::initialize(ErrorHandler *errh) {
assert(!_q && head() == 0 && tail() == 0);
_q = (Packet **) CLICK_LALLOC(sizeof(Packet *) * (_capacity + 1));
if (_q == 0)
return errh->error("[MakeBurst] Out of memory");
_drops = 0;
_timeout_timer.initialize(this);
_print_stats_timer.initialize(this);
_print_stats_timer.schedule_after_sec(_print_stats);
if(_check_mab_id && noutputs() != 3)
return errh->error("[MakeBurst] If check MAB id, should have 3 outputs.");
click_chatter("[MakeBurst %s] MakeBurst with %d outputs", name().c_str(), noutputs());
return 0;
}
void
MakeBurst::push(int port, Packet *p_in) {
const click_ip* ip4_header = p_in->ip_header();
// send TCP acks in burst (if no burst, higher airtime)
bool force_burst_tcp = false;
if(ip4_header && ip4_header->ip_p == IP_PROTO_TCP) {
VERB_DEBUG_CHATTER("[MakeBurst %s:%s] TCP packet", name().c_str(), Timestamp::now().unparse().c_str());
click_tcp *tcp_hdr = (click_tcp *)(p_in->tcp_header());
if(tcp_hdr->th_flags & TH_ACK) {
// use ClassifyAck
uint16_t a = htons(ip4_header->ip_len);
uint32_t b = tcp_hdr->th_off;
uint32_t c = a - b*4 - 20;
if(c==0) {
VERB_DEBUG_CHATTER("[MakeBurst %s:%s] TCP ack", name().c_str(), Timestamp::now().unparse().c_str());
force_burst_tcp = true;
}
}
}
if(_check_mab_id && !force_burst_tcp) {
click_ether *eth_hdr = (click_ether *) (p_in->data());
acne_header * acne_hdr = (acne_header *) (p_in->data() + sizeof(click_ether)
+ (ntohs(eth_hdr->ether_type) == ETHERTYPE_IP)*sizeof(click_ip));
// MAB_CONTROL, high priority
if((ip4_header && ip4_header->ip_p == IP_PROTO_IPIP) || ntohs(eth_hdr->ether_type) == ETHERTYPE_8021Q || acne_hdr->_type == MAB_CONTROL ||
acne_hdr->_type == MAB_ACK) { // IP_PROTO_IPIP or ETHERTYPE_8021Q means encapsulated MAB control
VERB_DEBUG_CHATTER("[MakeBurst %s:%s] MAB_CONTROL, sending to output 2", name().c_str(), Timestamp::now().unparse().c_str());
output(2).push(p_in);
return;
}
if(acne_hdr->_is_last) { // send last packet with high priority
bool kill = false;
if(_next_queue->size() == _next_queue->capacity()) {
// queue is full
_next_queue->deq()->kill();
kill = true;
}
DEBUG_CHATTER("[MakeBurst %s:%s] Last packet, sending to output 0 (kill=%d)",
name().c_str(), Timestamp::now().unparse().c_str(), kill);
output(0).push(p_in);
return;
}
// if MAB id 0, do not make bursts
if(acne_hdr->_id_rate_mab == 0) {
VERB_DEBUG_CHATTER("[MakeBurst %s:%s] MAB id 0, sending to output 1", name().c_str(), Timestamp::now().unparse().c_str());
output(1).push(p_in);
return;
}
}
if(_burst == 1 && !force_burst_tcp) {
VERB_DEBUG_CHATTER("[MakeBurst %s:%s] Burst is 1, sending to output %d", name().c_str(), Timestamp::now().unparse().c_str(), noutputs() > 1);
if(noutputs() > 1)
output(1).push(p_in);
else
output(0).push(p_in);
return;
}
if(_timeout > 0 && size() == 0 && !_timeout_timer.scheduled())
_timeout_timer.schedule_after_msec(_timeout);
int s = size();
int burst = _burst;
if(force_burst_tcp) {
if(!_timeout_timer.scheduled())
_timeout_timer.schedule_after_msec(_timeout_tcp);
burst = 50;
}
if(s < (burst-1)) {
VERB_DEBUG_CHATTER("[MakeBurst %s:%s] Enqueuing packet (burst %d, size %d)", name().c_str(), Timestamp::now().unparse().c_str(), burst, size());
enq(p_in);
}
else { // dequeue packets
Timestamp begin;
if(_debug) begin.assign_now();
if(_timeout > 0)
_timeout_timer.unschedule();
for (int i=0;i<s;i++) {
output(0).push(deq());
}
Timestamp now;
if(size() != 0) {
now.assign_now();
click_chatter("[MakeBurst %s] WARNING: size is not 0 after dequeue, is %d", now.unparse().c_str(), size());
}
output(0).push(p_in);
if((_debug && !force_burst_tcp) || _verb_debug) {
now.assign_now();
click_chatter("[MakeBurst %s:%s] Sending %d packets (done in %s s).", name().c_str(), now.unparse().c_str(), s+1, (now-begin).unparse().c_str());
}
_packets_sent+=(s+1);
_count_bursts++;
}
}
void
MakeBurst::run_timer(Timer *timer){
VERB_DEBUG_CHATTER("[MakeBurst %s] Run timer %p", Timestamp::now().unparse().c_str(), timer);
if (timer==&_timeout_timer) {
int s = size();
if(_debug) {
Timestamp now;
now.assign_now();
click_chatter("[MakeBurst %s:%s] Dequeuing %d packets in Timer", name().c_str(), now.unparse().c_str(), s);
}
_packets_sent+=s;
_count_bursts++;
for (int i=0;i<s;i++) {
output(0).push(deq());
}
}
else if(timer==&_print_stats_timer) {
if(_print_stats && _count_bursts > 0) {
Timestamp now;
now.assign_now();
int pack_per_burst = _packets_sent/_count_bursts;
click_chatter("[MakeBurst %s:%s] Burst is %d, timeout %d: have sent %d packets in %d bursts (%d packets per burst).",
name().c_str(), now.unparse().c_str(), _burst, _timeout, _packets_sent, _count_bursts, pack_per_burst);
_packets_sent = 0;
_count_bursts = 0;
}
_print_stats_timer.schedule_after_sec(_print_stats);
}
}
inline bool
MakeBurst::enq(Packet *p)
{
assert(p);
Storage::index_type h = head(), t = tail(), nt = next_i(t);
if (nt != h) {
_q[t] = p;
packet_memory_barrier(_q[t]);
set_tail(nt);
return true;
}
else {
if(!_print_dropped) {
_print_dropped = true;
click_chatter("[MakeBurst] WARNING: packet dropped. Head=%d, tail=%d, next=%d, capacity=%d", h, t, nt, _capacity);
}
p->kill();
_drops++;
return false;
}
}
inline Packet *
MakeBurst::deq()
{
Storage::index_type h = head(), t = tail();
if (h != t) {
Packet *p = _q[h];
packet_memory_barrier(_q[h]);
set_head(next_i(h));
assert(p);
return p;
}
else
return 0;
}
bool
MakeBurst::set_burst(int burst) {
if(burst==_burst)
return true;
if(burst>_capacity) {
click_chatter("[MakeBurst] Maximum capacitiy is %d, cannot set burst to %d", _capacity, burst);
return false;
}
int s = 0;
if(burst < _burst) {
// push packets
s = size();
for (int i=0;i<s;i++) {
output(0).push(deq());
}
_packets_sent+=s;
_count_bursts++;
}
if(_debug && _count_bursts > 0) {
Timestamp now;
now.assign_now();
int pack_per_burst = _packets_sent/_count_bursts;
DEBUG_CHATTER("[MakeBurst %s-%s] Sending %d packets. Burst was %d: have sent %d packets in %d bursts (%d packets per burst). Is now %d",
name().c_str(), now.unparse().c_str(), s, _burst, _packets_sent, _count_bursts, pack_per_burst, burst);
}
_packets_sent = 0;
_count_bursts = 0;
_burst = burst;
return true;
}
int
MakeBurst::bool_handler(const String &s, Element *e, void *arg,
ErrorHandler *errh) {
MakeBurst *elmt = (MakeBurst *)e;
int b;
if(!cp_integer(s, &b))
return errh->error("Debug must be 0 or 1");
if (!(b == 0 || b == 1))
return errh->error("Debug must be 0 or 1");
if(((intptr_t) arg) == 0)
elmt->set_debug(b==1);
if(((intptr_t) arg) == 1)
elmt->set_verb_debug(b==1);
return 0;
}
int
MakeBurst::set_int_handler(const String &s, Element *e, void *a,
ErrorHandler *errh) {
MakeBurst *elmt = (MakeBurst *)e;
int arg;
if(!cp_integer(s, &arg))
return errh->error("Burst must be an integer");
if(((intptr_t) a) == 0)
elmt->set_burst(arg);
if(((intptr_t) a) == 1)
elmt->set_timeout(arg);
if(((intptr_t) a) == 2)
elmt->set_timeout_tcp(arg);
if(((intptr_t) a) == 3)
elmt->set_print_stats(arg);
return 0;
}
static String
drops_handler(Element *e, void *) {
MakeBurst *elmt = (MakeBurst *)e;
return String(elmt->get_drops());
}
static String
burst_handler(Element *e, void *) {
MakeBurst *elmt = (MakeBurst *)e;
return String(elmt->get_burst());
}
void MakeBurst::add_handlers() {
add_read_handler("drops", drops_handler,0);
add_read_handler("burst", burst_handler,0);
add_write_handler("debug", bool_handler, 0);
add_write_handler("verb_debug", bool_handler, 1);
add_write_handler("set_burst", set_int_handler, 0);
add_write_handler("set_timeout", set_int_handler,1);
add_write_handler("set_timeout_tcp", set_int_handler, 2);
add_write_handler("print_stats", set_int_handler, 3);
}
EXPORT_ELEMENT(MakeBurst)
CLICK_ENDDECLS

Event Timeline