00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "pushback-queue.h"
00038
00039 #include "ip.h"
00040 #include "pushback.h"
00041 #include "rate-limit.h"
00042
00043 static class PushbackQueueClass : public TclClass {
00044
00045 public:
00046 PushbackQueueClass() : TclClass("Queue/RED/Pushback") {}
00047
00048 TclObject * create(int argc, const char*const* argv) {
00049 if (argc==4) {
00050 printf("Missing Argument for Pushback Queue Constructor\n");
00051 exit(-1);
00052 }
00053 return (new PushbackQueue(argv[4]));
00054 }
00055
00056 } class_pushback_queue;
00057
00058
00059 PushbackQueue::PushbackQueue(const char* const pba): pushbackID_(-1), src_(-1), dst_(-1),
00060 qmon_(NULL), RLDropTrace_(NULL) {
00061
00062 pushback_ = (PushbackAgent *)TclObject::lookup(pba);
00063 if (pushback_ == NULL) {
00064 printf("Wrong Argument for Pushback Queue Constructor\n");
00065 exit(-1);
00066 }
00067 bind("pushbackID_", &pushbackID_);
00068 bind_bool("rate_limiting_", &rate_limiting_);
00069 verbose_ = pushback_->verbose_;
00070
00071 timer_ = new PushbackQueueTimer(this);
00072 timer_->resched(SUSTAINED_CONGESTION_PERIOD);
00073
00074 rateEstimator_=new RateEstimator();
00075 rlsList_ = new RateLimitSessionList();
00076 if (verbose_) printf("pushback queue instantiated %d\n",pushback_->last_index_);
00077
00078 }
00079
00080 void
00081 PushbackQueue::reportDrop(Packet *p) {
00082 if (debug_)
00083 printf("PBQ:(%d:%d) rate limiting = %d\n", src_, dst_, rate_limiting_);
00084
00085 if (rate_limiting_)
00086 pushback_->reportDrop(pushbackID_, p);
00087
00088 }
00089
00090 int
00091 PushbackQueue::command(int argc, const char*const* argv)
00092 {
00093 Tcl& tcl = Tcl::instance();
00094 if (argc==2) {
00095 if (strcmp(argv[1], "rldrop-trace") == 0) {
00096 if (RLDropTrace_ != NULL) {
00097 tcl.resultf("%s", RLDropTrace_->name());
00098 }
00099 else {
00100 tcl.resultf("0");
00101 }
00102 return (TCL_OK);
00103 }
00104
00105 }
00106 else if (argc == 3) {
00107 if (strcmp(argv[1], "set-monitor") == 0) {
00108 qmon_ = (EDQueueMonitor *)TclObject::lookup(argv[2]);
00109 if (qmon_ == NULL) {
00110 tcl.resultf("Got Invalid Queue Monitor\n");
00111 return TCL_ERROR;
00112 }
00113 return TCL_OK;
00114 }
00115 else if (strcmp(argv[1], "rldrop-trace") == 0) {
00116
00117 RLDropTrace_ = (NsObject *) TclObject::lookup(argv[2]);
00118 if (RLDropTrace_ == NULL) {
00119 if (debug_) printf("Error Attaching Trace\n");
00120 return (TCL_ERROR);
00121 }
00122 if (debug_)
00123 printf("PBQ: RLDropTrace Set to %s\n", RLDropTrace_->name());
00124 return (TCL_OK);
00125 }
00126 } else if (argc == 4) {
00127 if (strcmp(argv[1], "set-src-dst") == 0) {
00128 src_ = atoi(argv[2]);
00129 dst_ = atoi(argv[3]);
00130 if (src_ < 0 || dst_ < 0) {
00131 tcl.resultf("Got Invalid Source or Destination\n");
00132 return TCL_ERROR;
00133 }
00134 return TCL_OK;
00135 }
00136 }
00137
00138 return REDQueue::command(argc, argv);
00139 }
00140
00141 void
00142 PushbackQueue::timeout(int from) {
00143
00144 int barrivals = qmon_->barrivals() - qmon_->mon_ebdrops();
00145 int bdrops = qmon_->bdrops() - qmon_->mon_ebdrops();
00146 int bdeps = qmon_->bdepartures();
00147
00148
00149
00150
00151 double dropRate1= getDropRate();
00152 double dropRate2= ((double)bdrops/barrivals);
00153
00154 if (dropRate1 > 0 || dropRate2 > 0) {
00155 if (verbose_)
00156 printf("PBQ:(%d:%d) (%g) arrs %d drops %d deps %d mdrops %d dr %g %g\n",
00157 src_, dst_, Scheduler::instance().clock(),
00158 barrivals*8, bdrops*8, bdeps*8, qmon_->mon_ebdrops()*8, dropRate1, dropRate2);
00159 fflush(stdout);
00160 }
00161 Tcl& tcl = Tcl::instance();
00162 tcl.evalf("%s reset",qmon_->name());
00163
00164
00165 if (rate_limiting_ &&
00166 dropRate1 >= SUSTAINED_CONGESTION_DROPRATE &&
00167 dropRate2 >= SUSTAINED_CONGESTION_DROPRATE/2) {
00168 if (verbose_) {
00169 printf("PBQ:(%d:%d) (%g) Arr: %d (%g) Drops: %d (%g %g) BW: %g\n",
00170 src_, dst_, Scheduler::instance().clock(),
00171 barrivals, rateEstimator_->estRate_,
00172 bdrops, dropRate1, dropRate2, link_->bandwidth());
00173 fflush(stdout);
00174 }
00175
00176
00177
00178
00179
00180 pushback_->identifyAggregate(pushbackID_, rateEstimator_->estRate_, link_->bandwidth());
00181 }
00182 else if (rlsList_->noMySessions(pushback_->node_->nodeid()) && LOWER_BOUND_MODE == 1) {
00183 pushback_->calculateLowerBound(pushbackID_, rateEstimator_->estRate_);
00184 }
00185
00186
00187 pushback_->resetDropLog(pushbackID_);
00188
00189 timer_->resched(SUSTAINED_CONGESTION_PERIOD);
00190 }
00191
00192 void
00193 PushbackQueue::enque(Packet *p) {
00194
00195 hdr_cmn * hdr = HDR_CMN(p);
00196
00197 if (debug_)
00198 printf("In queue enque with ptype %d %d\n", hdr->ptype(), PT_PUSHBACK);
00199
00200 if (hdr->ptype_ == PT_PUSHBACK) {
00201 if (verbose_) printf("PBQ:(%d:%d). Got a pushback packet.\n",src_, dst_);
00202 q_->enqueHead(p);
00203 return;
00204 }
00205
00206 int dropped = 0;
00207
00208 int qlen = qib_ ? q_->byteLength() : q_->length();
00209 int lowDemand = (edv_.v_ave < edp_.th_min || qlen < 1 || getDropRate() < 0.1*TARGET_DROPRATE );
00210
00211
00212
00213
00214
00215
00216
00217 if (rlsList_->noSessions_)
00218 dropped = rlsList_->filter(p, lowDemand);
00219
00220 if (dropped) {
00221
00222 if (RLDropTrace_!= NULL)
00223 ((Trace *)RLDropTrace_)->recvOnly(p);
00224
00225 qmon_->mon_edrop(p);
00226
00227
00228
00229 Packet::free(p);
00230 return;
00231 }
00232
00233
00234
00235 rateEstimator_->estimateRate(p);
00236 REDQueue::enque(p);
00237
00238 }
00239
00240
00241 double
00242 PushbackQueue::getRate() {
00243 return rateEstimator_->estRate_;
00244 }
00245
00246 double
00247 PushbackQueue::getBW() {
00248 return link_->bandwidth();
00249 }
00250
00251 double
00252 PushbackQueue::getDropRate() {
00253 if (rateEstimator_->estRate_ < getBW()) {
00254 return 0;
00255 } else {
00256 return 1 - getBW()/rateEstimator_->estRate_;
00257 }
00258 }
00259