00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "pushback.h"
00038
00039 #include "ident-tree.h"
00040 #include "pushback-queue.h"
00041 #include "rate-limit.h"
00042 #include "pushback-message.h"
00043
00044
00045
00046 int hdr_pushback::offset_;
00047
00048 static class PushbackHeaderClass : public PacketHeaderClass {
00049 public:
00050 PushbackHeaderClass() : PacketHeaderClass("PacketHeader/Pushback",
00051 sizeof(hdr_pushback)) {
00052 bind_offset(&hdr_pushback::offset_);
00053 }
00054 } class_Pushback_hdr;
00055
00056
00057 static class PushbackClass : public TclClass {
00058 public:
00059 PushbackClass() : TclClass("Agent/Pushback") {}
00060 TclObject* create(int, const char*const*) {
00061 return (new PushbackAgent());
00062 }
00063 } class_Pushback;
00064
00065 PushbackAgent::PushbackAgent() : Agent(PT_PUSHBACK), last_index_(0), intResult_(-1) {
00066
00067 bind("last_index_", &last_index_);
00068 bind("intResult_", &intResult_);
00069 bind_bool("enable_pushback_", &enable_pushback_);
00070 bind_bool("verbose_", &verbose_);
00071 timer_ = new PushbackTimer(this);
00072 debugLevel = 3;
00073
00074 }
00075
00076 int
00077 PushbackAgent::command(int argc, const char*const* argv) {
00078
00079 Tcl& tcl = Tcl::instance();
00080 if (argc == 4 ) {
00081 if (strcmp(argv[1], "initialize") == 0) {
00082
00083 node_ = (Node *)TclObject::lookup(argv[2]);
00084 rtLogic_ = (RouteLogic *)TclObject::lookup(argv[3]);
00085
00086 if (node_ == NULL || rtLogic_ == NULL) {
00087 if (verbose_) printf("Improper Initialization for Pushback Agent\n");
00088 return(TCL_ERROR);
00089 }
00090
00091 sprintf(prnMsg, "node=%s rtLogic=%s id=%d address=%d\n", node_->name(),
00092 rtLogic_->name(), node_->nodeid(), node_->address());
00093 printMsg(prnMsg,0);
00094
00095 return(TCL_OK);
00096 }
00097 }
00098 else if (argc == 3) {
00099
00100 if (strcmp(argv[1], "add-queue") == 0) {
00101 if (last_index_==MAX_QUEUES) {
00102 printf("queue list size exhausted - recompile with a bigger MAX_QUEUES\n");
00103 exit(-1);
00104 }
00105 PushbackQueue * queue = (PushbackQueue *) TclObject::lookup(argv[2]);
00106 if (queue == NULL) {
00107 printf("NULL queue passed \n");
00108 exit(-1);
00109 }
00110
00111 int index = last_index_++;
00112 queue_list_[index].pbq_ = queue;
00113 queue_list_[index].idTree_ = new IdentStruct();
00114
00115 tcl.resultf("%d", index);
00116 return (TCL_OK);
00117 }
00118 }
00119 return (Agent::command(argc, argv));
00120 }
00121
00122 void
00123 PushbackAgent::reportDrop(int qid, Packet * p) {
00124
00125 if (!checkQID(qid)) {
00126 sprintf(prnMsg,"Got invalid qid %d\n", qid);
00127 printMsg(prnMsg,0);
00128 exit(-1);
00129 }
00130
00131 hdr_ip * iph = hdr_ip::access(p);
00132 ns_addr_t src = iph->src();
00133 ns_addr_t dst = iph->dst();
00134 int fid = iph->flowid();
00135
00136 sprintf(prnMsg,"DropDetails from queue %d: %d.%d -> %d.%d (%d)\n", qid,
00137 src.addr_, src.port_, dst.addr_, dst.port_, fid);
00138 printMsg(prnMsg, 5);
00139
00140 queue_list_[qid].idTree_->registerDrop(p);
00141 }
00142
00143 void
00144 PushbackAgent::calculateLowerBound(int qid, double arrRate) {
00145
00146 if (!checkQID(qid)) {
00147 sprintf(prnMsg, "Got invalid id from queue in identifyAggregate\n");
00148 printMsg(prnMsg,0);
00149 exit(-1);
00150 }
00151
00152 AggReturn * aggReturn = queue_list_[qid].idTree_->calculateLowerBound();
00153 if (aggReturn == NULL) {
00154
00155
00156
00157 return;
00158 }
00159
00160 double lowerBound = 0;
00161 int i = 0;
00162 for (; i <= aggReturn->finalIndex_; i++) {
00163 cluster currCluster = aggReturn->clusterList_[i];
00164 AggSpec * aggSpec = new AggSpec(1, currCluster.prefix_, currCluster.bits_);
00165 RateLimitSession * rls1 =
00166 queue_list_[qid].pbq_->rlsList_->containsLocalAggSpec(aggSpec, node_->nodeid());
00167 if (rls1 !=NULL) continue;
00168 lowerBound = (currCluster.count_)*(arrRate/aggReturn->totalCount_);
00169 sprintf(prnMsg, "LB: count: %d totalCount_: %d arrRate: %g\n", currCluster.count_, aggReturn->totalCount_, arrRate);
00170 printMsg(prnMsg,0);
00171 break;
00172 }
00173
00174 if (i == aggReturn->finalIndex_+1) {
00175 sprintf(prnMsg, "Warning: All clusters being rate limited\n");
00176 printMsg(prnMsg,0);
00177
00178 }
00179
00180 queue_list_[qid].idTree_->setLowerBound(lowerBound, 1);
00181
00182 delete(aggReturn);
00183 }
00184
00185 void
00186 PushbackAgent::identifyAggregate(int qid, double arrRate, double linkBW) {
00187
00188
00189 if (!timer_->containsRefresh(qid)) {
00190 PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
00191 timer_->insert(event);
00192 }
00193
00194
00195 sprintf(prnMsg, "identifyAggregate for %d\n", qid);
00196 printMsg(prnMsg,0);
00197
00198 if (!checkQID(qid)) {
00199 sprintf(prnMsg, "Got invalid id from queue in identifyAggregate\n");
00200 printMsg(prnMsg,0);
00201 exit(-1);
00202 }
00203 if (verbose_) queue_list_[qid].idTree_->traverse();
00204
00205
00206
00207 int noSessions = queue_list_[qid].pbq_->rlsList_->noMySessions(node_->nodeid());
00208
00209
00210
00211
00212
00213
00214 AggReturn * aggReturn = queue_list_[qid].idTree_->identifyAggregate(arrRate, linkBW);
00215 if (aggReturn == NULL) return;
00216
00217 for (int i=0; i<=aggReturn->finalIndex_; i++) {
00218 cluster currCluster = aggReturn->clusterList_[i];
00219 AggSpec * aggSpec = new AggSpec(1, currCluster.prefix_, currCluster.bits_);
00220
00221
00222 RateLimitSession * rls1 =
00223 queue_list_[qid].pbq_->rlsList_->containsLocalAggSpec(aggSpec, node_->nodeid());
00224 if (rls1 != NULL) {
00225 sprintf(prnMsg, "got subset aggregate. Lowerbound = %g. agg = ", aggReturn->limit_);
00226 printMsg(prnMsg,0);
00227 aggSpec->print(); fflush(stdout);
00228 delete(aggSpec);
00229
00230
00231 if (aggReturn->limit_ < rls1->lowerBound_) {
00232 rls1->lowerBound_ = aggReturn->limit_;
00233 }
00234
00235 rls1->refreshed();
00236 continue;
00237 }
00238
00239 double estimate = (currCluster.count_)*(arrRate/aggReturn->totalCount_);
00240
00241 if (noSessions >= MAX_SESSIONS) {
00242 int rank = queue_list_[qid].pbq_->rlsList_->rankRate(node_->nodeid(), estimate);
00243 if (rank >= MAX_SESSIONS) {
00244 sprintf(prnMsg, "got rate <= minRate. agg = ");
00245 printMsg(prnMsg,0);aggSpec->print(); fflush(stdout);
00246 delete(aggSpec);
00247 continue;
00248 }
00249 }
00250
00251 sprintf(prnMsg, "starting rate-limiting lower=%g estimate=%g agg ",
00252 aggReturn->limit_, estimate);
00253 printMsg(prnMsg,0);
00254 aggSpec->print(); fflush(stdout);
00255
00256 double initialLimit = estimate;
00257 RateLimitSession * rls = new RateLimitSession(aggSpec, estimate, 1, initialLimit,
00258 node_->nodeid(), qid,
00259 RATE_LIMIT_TIME_DEFAULT, aggReturn->limit_,
00260 node_, rtLogic_);
00261 queue_list_[qid].pbq_->rlsList_->insert(rls);
00262
00263 PushbackEvent * event = new PushbackEvent(INITIAL_UPDATE_TIME, INITIAL_UPDATE_EVENT, rls);
00264 timer_->insert(event);
00265
00266
00267 noSessions++;
00268 }
00269
00270 queue_list_[qid].idTree_->setLowerBound(aggReturn->limit_, 0);
00271 delete(aggReturn);
00272 }
00273
00274 void
00275 PushbackAgent::resetDropLog(int qid) {
00276
00277 sprintf(prnMsg, " drop log reset for qid %d\n", qid);
00278 printMsg(prnMsg,5);
00279
00280 if (!checkQID(qid)) {
00281 printf("Got invalid id from queue in resetDropLog\n");
00282 exit(-1);
00283 }
00284
00285 queue_list_[qid].idTree_->reset();
00286 }
00287
00288 void
00289 PushbackAgent::timeout(PushbackEvent * event) {
00290
00291 sprintf(prnMsg, " %s event for qid %d\n", PushbackEvent::type(event), event->qid_);
00292 printMsg(prnMsg,0);
00293 switch (event->eventID_) {
00294 case PUSHBACK_CHECK_EVENT: pushbackCheck(event->rls_);
00295 break;
00296 case PUSHBACK_REFRESH_EVENT: pushbackRefresh(event->qid_);
00297 break;
00298 case PUSHBACK_STATUS_EVENT: pushbackStatus(event->rls_);
00299 break;
00300 case INITIAL_UPDATE_EVENT: initialUpdate(event->rls_);
00301 break;
00302 default: sprintf(prnMsg, " Unrecognized event %d\n", event->eventID_);
00303 printMsg(prnMsg,0);
00304 break;
00305 }
00306
00307 }
00308
00309 void
00310 PushbackAgent::initialUpdate(RateLimitSession * rls) {
00311
00312 if ( !rls->initialPhase_ ) {
00313 sprintf(prnMsg, " Error: Update when not in initialphase\n");
00314 printMsg(prnMsg,0);
00315 exit(-1);
00316 }
00317
00318 double qdrop = queue_list_[rls->localQID_].pbq_->getDropRate();
00319 double dropRate = rls->getDropRate();
00320 double arrRate = rls->getArrivalRateForStatus();
00321 double newLimit = arrRate*(1 - 2*(dropRate+qdrop));
00322
00323 sprintf(prnMsg,"Initial-Update: qdrop=%g dr=%g newL=%g oldTarget=%g lowerBound=%g arr=%g\n",
00324 qdrop, dropRate, newLimit, rls->rlStrategy_->target_rate_, rls->lowerBound_, arrRate);
00325 printMsg(prnMsg,0);
00326
00327
00328 if (arrRate < 0.75*rls->lowerBound_) {
00329 #ifdef DEBUG
00330 double now = Scheduler::instance().clock();
00331 printf("Cancel pushback A time: %5.3f\n", now);
00332 #endif
00333 pushbackCancel(rls);
00334 return;
00335 }
00336
00337 if (newLimit > rls->lowerBound_) {
00338 rls->setLimit(newLimit);
00339
00340 PushbackEvent * event = new PushbackEvent(INITIAL_UPDATE_TIME, INITIAL_UPDATE_EVENT, rls);
00341 timer_->insert(event);
00342 }
00343 else {
00344 rls->setLimit(rls->lowerBound_);
00345 rls->initialPhase_ = 0;
00346
00347 if (rls->logData_->count_!=0 && enable_pushback_) {
00348 PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
00349 timer_->insert(event);
00350 }
00351 }
00352 }
00353
00354
00355 void
00356 PushbackAgent::pushbackCheck(RateLimitSession * rls) {
00357
00358 double dropRate = rls->getDropRate();
00359
00360 if (dropRate >= DROP_RATE_FOR_PUSHBACK) {
00361 rls->pushbackOn();
00362 rls->heightInPTree_++;
00363
00364 double totalRate = rls->rlStrategy_->target_rate_;
00365 int count = rls->logData_->count_;
00366 double fairShare = totalRate/count;
00367 int done = count;
00368
00369
00370 while (done != 0) {
00371 LoggingDataStructNode * lgdsNode = rls->logData_->first_;
00372 int countThisRound=0;
00373 while (lgdsNode != NULL) {
00374 double rate = lgdsNode->rateEstimator_->estRate_;
00375 if (rate <= fairShare && !lgdsNode->pushbackSent_) {
00376 AggSpec * aggSpec = rls->aggSpec_->clone();
00377 PushbackMessage * msg;
00378 if (rate < fairShare/2.0) {
00379 msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
00380 rls->localID_, aggSpec, INFINITE_LIMIT,
00381 rls->depthInPTree_);
00382 lgdsNode->pushbackSent(INFINITE_LIMIT, rate);
00383 }
00384 else {
00385 msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
00386 rls->localID_, aggSpec, rate, rls->depthInPTree_);
00387 lgdsNode->pushbackSent(rate, rate);
00388 }
00389 sendMsg(msg);
00390 countThisRound++;
00391 done--;
00392 totalRate -= rate;
00393 }
00394 lgdsNode = lgdsNode->next_;
00395 }
00396 if (done == 0) break;
00397 if (countThisRound==0) {
00398
00399 LoggingDataStructNode * lgdsNode = rls->logData_->first_;
00400 while (lgdsNode != NULL) {
00401 if (!lgdsNode->pushbackSent_) {
00402 AggSpec * aggSpec = rls->aggSpec_->clone();
00403 PushbackMessage * msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_,
00404 rls->localQID_, rls->localID_,
00405 aggSpec, fairShare,
00406 rls->depthInPTree_);
00407 lgdsNode->pushbackSent(fairShare,lgdsNode->rateEstimator_->estRate_);
00408 sendMsg(msg);
00409 done--;
00410 totalRate-=fairShare;
00411 }
00412 lgdsNode = lgdsNode->next_;
00413 }
00414 }
00415 else {
00416 fairShare= totalRate/done;
00417 }
00418 }
00419
00420 }
00421 else {
00422
00423 PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
00424 timer_->insert(event);
00425 }
00426 }
00427
00428 void
00429 PushbackAgent::pushbackStatus(RateLimitSession * rls) {
00430
00431 if (rls->pushbackON_) {
00432 sprintf(prnMsg, " Warning: status timer expired for non-leaf node\n");
00433 printMsg(prnMsg,0);
00434
00435 }
00436 double rate = rls->getArrivalRateForStatus();
00437 rls->logData_->resetStatus();
00438
00439 PushbackMessage * msg = new PushbackStatusMessage(node_->nodeid(), rls->origin_,
00440 rls->remoteQID_, rls->remoteID_,
00441 rate, rls->heightInPTree_);
00442 sendMsg(msg);
00443 }
00444
00445 void
00446 PushbackAgent::pushbackRefresh(int qid) {
00447
00448 PushbackQueue * pbq = queue_list_[qid].pbq_;
00449 int oldSessions = pbq->rlsList_->noMySessions(node_->nodeid());
00450 if (!oldSessions) {
00451
00452
00453
00454 return;
00455 }
00456
00457 int noSessions = oldSessions;
00458
00459 if (MERGER_MODE == 1) {
00460 pbq->rlsList_->mergeSessions(node_->nodeid());
00461 noSessions = pbq->rlsList_->noMySessions(node_->nodeid());
00462
00463 if (noSessions!=oldSessions) {
00464 sprintf(prnMsg, " Some sessions merged. old = %d new = %d\n", oldSessions, noSessions);
00465 printMsg(prnMsg,0);
00466
00467
00468 RateLimitSession * listItem = pbq->rlsList_->first_;
00469 while (listItem != NULL) {
00470 if (listItem->origin_ == node_->nodeid() && listItem->merged_) {
00471 pushbackCancel(listItem);
00472 listItem = listItem->next_;
00473
00474 }
00475 }
00476 } else {
00477 sprintf(prnMsg, " No sessions merged. number = %d\n", noSessions);
00478 printMsg(prnMsg,0);
00479 }
00480 } else {
00481 sprintf(prnMsg, "Number of sessions = %d\n", noSessions);
00482 printMsg(prnMsg,0);
00483 }
00484
00485 double now = Scheduler::instance().clock();
00486
00487
00488 RateLimitSession * listItem1 = pbq->rlsList_->first_;
00489 while (noSessions > MAX_SESSIONS && listItem1 != NULL) {
00490 int rank = pbq->rlsList_->rankRate(node_->nodeid(), listItem1->getArrivalRateForStatus());
00491 if (listItem1->origin_ == node_->nodeid() &&
00492 rank >= MAX_SESSIONS && (now - listItem1->startTime_) >= EARLIEST_TIME_TO_FREE) {
00493 sprintf(prnMsg,"Releasing because of too many being rate-limited\n");
00494 printMsg(prnMsg,0);
00495 if (LOWER_BOUND_MODE == 1 &&
00496 queue_list_[qid].idTree_->lowerBound_ < listItem1->getArrivalRateForStatus()) {
00497 queue_list_[qid].idTree_->lowerBound_ = listItem1->getArrivalRateForStatus();
00498 }
00499 pushbackCancel(listItem1);
00500 noSessions--;
00501 }
00502 listItem1 = listItem1->next_;
00503 }
00504
00505 double linkBW = pbq->getBW();
00506 double arrRate = pbq->getRate();
00507 double targetRate = linkBW/(1 - TARGET_DROPRATE);
00508
00509 double totalRateLimitedArrivalRate = 0;
00510 double totalLimit=0;
00511 double lowerBound=-1;
00512 RateLimitSession * listItem = pbq->rlsList_->first_;
00513 while (listItem != NULL) {
00514 if (listItem->origin_ == node_->nodeid() && !listItem->merged_) {
00515 double sessionArrRate = listItem->getArrivalRateForStatus();
00516 double sessionLimit = listItem->rlStrategy_->target_rate_;
00517 totalRateLimitedArrivalRate+= sessionArrRate;
00518 totalLimit+= (sessionArrRate > sessionLimit)? sessionLimit: sessionArrRate;
00519 if (listItem->lowerBound_ < lowerBound || lowerBound == -1) {
00520 lowerBound = listItem->lowerBound_;
00521 }
00522 }
00523 listItem = listItem->next_;
00524 }
00525
00526 if (LOWER_BOUND_MODE == 1) {
00527 lowerBound = queue_list_[qid].idTree_->lowerBound_;
00528 }
00529
00530 double excessRate = (arrRate - totalLimit + totalRateLimitedArrivalRate) - targetRate;
00531
00532 sprintf(prnMsg,"arr=%g totalLimit=%g totalRateLimit=%g excess=%g\n", arrRate, totalLimit,
00533 totalRateLimitedArrivalRate, excessRate);
00534 printMsg(prnMsg,0);
00535
00536 if (excessRate < 0) {
00537 sprintf(prnMsg, "Negative Excess Rate. Things maybe fine now.\n");
00538 printMsg(prnMsg,0);
00539
00540 #ifdef DEBUG
00541 printf("Negative Excess Rate - time: %5.3f\n", now);
00542 #endif
00543 requiredLimit_ = 2*totalRateLimitedArrivalRate;
00544 } else {
00545
00546
00547
00548
00549 requiredLimit_ = (totalRateLimitedArrivalRate - excessRate)/noSessions;
00550 if (requiredLimit_ < lowerBound) {
00551 requiredLimit_ = lowerBound;
00552 }
00553 #ifdef DEBUG
00554 printf("New requiredLimit - time: %5.3f limit: %5.3f lowerBound:%5.3f \n", now, requiredLimit_, lowerBound);
00555 #endif
00556 }
00557
00558 sprintf(prnMsg,"Refresh. target=%g limit=%g floor=%g\n", targetRate, requiredLimit_,
00559 lowerBound);
00560 printMsg(prnMsg,0);
00561
00562
00563 for (int i=0; i<noSessions; i++) {
00564 listItem = pbq->rlsList_->first_;
00565 while (listItem != NULL ) {
00566 if (listItem->origin_ == node_->nodeid() &&
00567 pbq->rlsList_->rankSession(node_->nodeid(),listItem) == i)
00568 break;
00569 listItem = listItem->next_;
00570 }
00571 if (listItem == NULL) {
00572 printf("Error: Rank %d not found\n", i);
00573 exit(0);
00574 }
00575
00576 double oldLimit = listItem->rlStrategy_->target_rate_;
00577 double sendRate = listItem->getArrivalRateForStatus();
00578 #ifdef DEBUG
00579 printf("time: %5.3f ID: %d sendRate %5.3f oldLimit %5.3f requiredLimit %5.3f\n", now,
00580 listItem->localID_, sendRate, oldLimit, requiredLimit_);
00581 #endif
00582
00583 if (sendRate < requiredLimit_) {
00584
00585 if (now - listItem->refreshTime_ >= MIN_TIME_TO_FREE) {
00586 #ifdef DEBUG
00587 printf("time: %5.3f ID: %d refreshTime %5.3f MIN %d Cancel pushback B \n",
00588 now, listItem->localID_, listItem->refreshTime_, MIN_TIME_TO_FREE);
00589 #endif
00590 pushbackCancel(listItem);
00591 requiredLimit_+= (requiredLimit_ - sendRate)/(noSessions - i - 1);
00592 i--; noSessions--;
00593 }
00594 else {
00595
00596
00597
00598
00599 #ifdef DEBUG
00600 printf("time: %5.3f ID: %d double limit\n", now, listItem->localID_);
00601 #endif
00602 double maxR = sendRate>oldLimit? sendRate: oldLimit;
00603 if (now - listItem->refreshTime_ <= PRIMARY_WAITING_ZONE) {
00604 sprintf(prnMsg,"Waiting Zone 1: sendRate=%g oldLimit=%g\n", sendRate, oldLimit);
00605 printMsg(prnMsg,0);
00606 }
00607 else {
00608 sprintf(prnMsg,"Waiting Zone 2: sendRate=%g oldLimit=%g\n", sendRate, oldLimit);
00609 printMsg(prnMsg,0);
00610 maxR *= 1.5;
00611 }
00612 if (maxR < requiredLimit_) {
00613 listItem->setLimit(maxR);
00614 requiredLimit_ += (requiredLimit_ - maxR)/(noSessions - i - 1);
00615 }
00616 else {
00617 listItem->setLimit(requiredLimit_);
00618 }
00619
00620 if (listItem->pushbackON_)
00621 refreshUpstreamLimits(listItem);
00622 }
00623 }
00624 else {
00625
00626 double newLimit;
00627 if (oldLimit > 1.25 * requiredLimit_ || oldLimit ==0)
00628 newLimit = requiredLimit_;
00629 else
00630 newLimit = 0.5*requiredLimit_ + 0.5*oldLimit;
00631
00632 if (newLimit < lowerBound)
00633 newLimit = lowerBound;
00634
00635 listItem->refreshed();
00636 listItem->setLimit(newLimit);
00637 if (listItem->pushbackON_)
00638 refreshUpstreamLimits(listItem);
00639 #ifdef DEBUG
00640 printf("time: %5.3f ID: %d newLimit %5.3f oldLimit %5.3f requiredLimit %5.3f\n",
00641 now, listItem->localID_, newLimit, oldLimit, requiredLimit_);
00642 #endif
00643 }
00644 }
00645
00646
00647 noSessions = pbq->rlsList_->noMySessions(node_->nodeid());
00648 if (noSessions) {
00649 PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
00650 timer_->insert(event);
00651 }
00652 }
00653
00654
00655 void
00656 PushbackAgent::pushbackCancel(RateLimitSession * rls) {
00657
00658 sprintf(prnMsg,"Stopping rate-limiting for aggregate: ");
00659 printMsg(prnMsg,0);
00660 rls->aggSpec_->print();
00661 fflush(stdout);
00662
00663 #ifdef DEBUG
00664 double now = Scheduler::instance().clock();
00665 printf("time: %5.3f ID: %d Cancel pushback C\n", now, rls->localID_);
00666 #endif
00667
00668 if (rls->pushbackON_) {
00669 LoggingDataStructNode * lgdsNode = rls->logData_->first_;
00670 while (lgdsNode != NULL) {
00671 PushbackMessage * msg = new PushbackCancelMessage(node_->nodeid(), lgdsNode->nid_,
00672 rls->localQID_, rls->localID_);
00673 sendMsg(msg);
00674 lgdsNode = lgdsNode->next_;
00675 }
00676 }
00677
00678
00679 timer_->removeEvents(rls);
00680
00681 queue_list_[rls->localQID_].pbq_->rlsList_->endSession(rls);
00682
00683 }
00684
00685
00686
00687 void
00688 PushbackAgent::recv(Packet * pkt, Handler * h) {
00689
00690 hdr_pushback * hdr_push = ((hdr_pushback*)pkt)->access(pkt);
00691 PushbackMessage * msg = hdr_push->msg_;
00692
00693 sprintf(prnMsg, " %s msg from %d\n", PushbackMessage::type(msg), msg->senderID_);
00694 printMsg(prnMsg,0);
00695
00696 switch (msg->msgID_) {
00697 case PUSHBACK_REQUEST_MSG : processPushbackRequest((PushbackRequestMessage *)msg);
00698 break;
00699 case PUSHBACK_STATUS_MSG : processPushbackStatus((PushbackStatusMessage *) msg);
00700 break;
00701 case PUSHBACK_REFRESH_MSG : processPushbackRefresh((PushbackRefreshMessage *) msg);
00702 break;
00703 case PUSHBACK_CANCEL_MSG : processPushbackCancel((PushbackCancelMessage *) msg);
00704 break;
00705 default: fprintf(stderr,"PBA: %s Undefined Message ID %d\n", name(),msg->msgID_);
00706 }
00707
00708 delete(msg);
00709 }
00710
00711 void
00712 PushbackAgent::processPushbackRequest(PushbackRequestMessage * msg) {
00713
00714 int qid = getQID(msg->senderID_);
00715 sprintf(prnMsg, " pushback request from %d for qid=%d limit=%g\n", msg->senderID_,
00716 qid, msg->limit_);
00717 printMsg(prnMsg,0);
00718
00719 AggSpec * aggSpec = msg->aggSpec_;
00720 if (queue_list_[qid].pbq_->rlsList_->containsAggSpec(aggSpec)) {
00721 fprintf(stdout,"PBA: %s got a pushback req for agg I already rate-limit. \
00722 Feature not yet Implemented\n",name());
00723 exit(-1);
00724 }
00725
00726 RateLimitSession * rls = new RateLimitSession(aggSpec, msg->limit_, msg->senderID_, qid,
00727 msg->qid_, msg->rlsID_, msg->depth_+1,
00728 RATE_LIMIT_TIME_DEFAULT, -1, node_, rtLogic_);
00729 queue_list_[qid].pbq_->rlsList_->insert(rls);
00730
00731
00732 if (rls->logData_->count_ && enable_pushback_) {
00733 PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
00734 timer_->insert(event);
00735 }
00736 }
00737
00738
00739 void
00740 PushbackAgent::processPushbackStatus(PushbackStatusMessage * msg) {
00741
00742 int qid = msg->qid_;
00743
00744 if (!checkQID(qid)) {
00745 sprintf(prnMsg, " Got invalid qid from %d in status message\n", msg->senderID_);
00746 printMsg(prnMsg,0);
00747 exit(-1);
00748 }
00749
00750 RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByLocalID(msg->rlsID_);
00751
00752 if (rls == NULL) {
00753 sprintf(prnMsg, " session %d not found\n", msg->rlsID_);
00754 printMsg(prnMsg,0);
00755 exit(-1);
00756 }
00757
00758
00759 if (msg->height_ + 1 > rls->heightInPTree_) {
00760 rls->heightInPTree_ = msg->height_ + 1;
00761 sprintf(prnMsg, " height increased to %d\n", rls->heightInPTree_);
00762 printMsg(prnMsg,0);
00763 }
00764
00765 rls->logData_->registerStatus(msg->senderID_, msg->arrivalRate_);
00766 sprintf(prnMsg, " got rate %g\n", msg->arrivalRate_);
00767 printMsg(prnMsg,0);
00768
00769
00770 if (rls->origin_!= node_->nodeid()) {
00771
00772
00773 int gotAll = rls->logData_->consolidateStatus();
00774 if (gotAll==1) {
00775
00776 double rate = rls->logData_->statusArrivalRateAll_;
00777 PushbackMessage * msg = new PushbackStatusMessage(node_->nodeid(), rls->origin_,
00778 rls->remoteQID_, rls->remoteID_,
00779 rate, rls->heightInPTree_);
00780 sendMsg(msg);
00781 timer_->cancelStatus(rls);
00782
00783 rls->logData_->resetStatus();
00784 }
00785 }
00786 }
00787
00788 void
00789 PushbackAgent::processPushbackRefresh(PushbackRefreshMessage *msg) {
00790
00791 int qid = getQID(msg->senderID_);
00792 sprintf(prnMsg, " pushback refresh from %d for qid=%d with limit=%g\n", msg->senderID_, qid, msg->limit_);
00793 printMsg(prnMsg,0);
00794
00795 RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByRemoteID(msg->rlsID_);
00796
00797 if (rls == NULL) {
00798 sprintf(prnMsg, " session %d not found\n", msg->rlsID_);
00799 printMsg(prnMsg,0);
00800 exit(-1);
00801 }
00802
00803
00804 rls->setAggSpec(msg->aggSpec_);
00805 delete(msg->aggSpec_);
00806 double newLimit = msg->limit_;
00807 rls->setLimit(newLimit);
00808
00809
00810 if (rls->pushbackON_) {
00811 refreshUpstreamLimits(rls);
00812 }
00813
00814
00815 PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME - 0.1*rls->depthInPTree_,
00816 PUSHBACK_STATUS_EVENT, rls);
00817 timer_->insert(event);
00818 }
00819
00820 void
00821 PushbackAgent::processPushbackCancel(PushbackCancelMessage *msg) {
00822
00823 int qid = getQID(msg->senderID_);
00824 sprintf(prnMsg, " pushback cancel from %d for queue index %d\n", msg->senderID_, qid);
00825 printMsg(prnMsg,0);
00826
00827 RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByRemoteID(msg->rlsID_);
00828
00829 if (rls == NULL) {
00830 sprintf(prnMsg, " session %d not found\n", msg->rlsID_);
00831 printMsg(prnMsg,0);
00832 exit(-1);
00833 }
00834 pushbackCancel(rls);
00835
00836 }
00837
00838 void
00839 PushbackAgent::refreshUpstreamLimits(RateLimitSession * rls) {
00840
00841 double totalRate = rls->rlStrategy_->target_rate_;
00842 int count = rls->logData_->count_;
00843 double fairShare = totalRate/count;
00844 int done = count;
00845 double arrRate = rls->getArrivalRateForStatus();
00846 sprintf(prnMsg, "Sending refresh messages to %d nodes. Limit = %g arrRate = %g\n", count, totalRate, arrRate);
00847 printMsg(prnMsg,0);
00848
00849 int excess = 0;
00850 if (totalRate > arrRate) {
00851 excess = 1;
00852 }
00853
00854
00855 while (done != 0) {
00856 LoggingDataStructNode * lgdsNode = rls->logData_->first_;
00857 int countThisRound=0;
00858 while (lgdsNode != NULL) {
00859 double rate;
00860 rate = lgdsNode->statusArrivalRate_;
00861 if (rate <= fairShare && !lgdsNode->sentRefresh_) {
00862 AggSpec * aggSpec = rls->aggSpec_->clone();
00863 PushbackMessage * msg;
00864 if (rate < fairShare/2.0) {
00865 msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
00866 rls->localID_, aggSpec, INFINITE_LIMIT);
00867 lgdsNode->sentRefresh(INFINITE_LIMIT);
00868 }
00869 else if (!excess) {
00870 msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
00871 rls->localID_, aggSpec, rate);
00872 lgdsNode->sentRefresh(rate);
00873 }
00874 else {
00875 msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
00876 rls->localID_, aggSpec, fairShare);
00877 lgdsNode->sentRefresh(fairShare);
00878 rate = fairShare;
00879 }
00880 sendMsg(msg);
00881 countThisRound++;
00882 done--;
00883 totalRate -= rate;
00884 }
00885 lgdsNode = lgdsNode->next_;
00886 }
00887 if (done == 0) break;
00888 if (countThisRound==0) {
00889
00890 LoggingDataStructNode * lgdsNode = rls->logData_->first_;
00891 while (lgdsNode != NULL) {
00892 if (!lgdsNode->sentRefresh_) {
00893 AggSpec * aggSpec = rls->aggSpec_->clone();
00894 PushbackMessage * msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_,
00895 rls->localQID_, rls->localID_,
00896 aggSpec, fairShare);
00897 lgdsNode->sentRefresh(fairShare);
00898 sendMsg(msg);
00899 done--;
00900 totalRate-=fairShare;
00901 }
00902 lgdsNode = lgdsNode->next_;
00903 }
00904 }
00905 else {
00906 fairShare = totalRate/done;
00907 }
00908 }
00909
00910
00911 LoggingDataStructNode * lgdsNode = rls->logData_->first_;
00912 while (lgdsNode != NULL) {
00913 lgdsNode->sentRefresh_ = 0;
00914 lgdsNode = lgdsNode->next_;
00915 }
00916
00917 }
00918
00919 int
00920 PushbackAgent::getQID(int sender) {
00921
00922 Tcl& tcl = Tcl::instance();
00923 intResult_ = -1;
00924 int index = 0;
00925
00926
00927 for (; index <last_index_; index++) {
00928 tcl.evalf("%s set intResult_ [%s check-queue %d %d %s]", name(), name(),
00929 node_->nodeid(), sender , queue_list_[index].pbq_->name());
00930 if (intResult_ == 1) break;
00931 }
00932
00933 if (index == last_index_) {
00934 sprintf(prnMsg, " right queue not found\n");
00935 printMsg(prnMsg,0);
00936 exit(-1);
00937 }
00938
00939 return index;
00940 }
00941
00942 void
00943 PushbackAgent::sendMsg(PushbackMessage * msg) {
00944
00945 Tcl& tcl = Tcl::instance();
00946
00947 dst_.addr_ = msg->targetID_;
00948
00949
00950 tcl.evalf("%s set intResult_ [%s get-pba-port %d]", name(), name(),dst_.addr_ );
00951
00952 if ( intResult_ == -1 ) {
00953 fprintf(stderr,"PBA: %s Pushback Agent not found on Node %d\n", name(), dst_.addr_);
00954 return;
00955 }
00956 dst_.port_ = intResult_;
00957 Packet *pkt = allocpkt();
00958 hdr_pushback * hdr_push = ((hdr_pushback*)pkt)->access(pkt);
00959 hdr_push->msg_ = msg;
00960
00961 sprintf(prnMsg, " sent %s message to %d.%d\n", PushbackMessage::type(msg), dst_.addr_, dst_.port_);
00962 printMsg(prnMsg,4);
00963 send(pkt,0);
00964 }
00965
00966 void
00967 PushbackAgent::printMsg(char * msg, int msgLevel) {
00968
00969 if (msgLevel < debugLevel) {
00970 if (verbose_) printf("PBA:%d (%g) %s", node_->nodeid(), Scheduler::instance().clock(), msg);
00971 fflush(stdout);
00972 }
00973 }
00974
00975 int
00976 PushbackAgent::checkQID(int qid) {
00977 if (qid < 0 || qid >= last_index_)
00978 return 0;
00979 else
00980 return 1;
00981 }
00982
00983
00984
00985
00986 int
00987 PushbackAgent::mergerAccept(int count, int bits, int bitsDiff) {
00988
00989
00990
00991
00992
00993 return 0;
00994 }
00995
00996
00997
00998 void
00999 PushbackTimer::expire(Event *e) {
01000
01001 if (firstEvent_ == NULL) {
01002 printf("PushbackTimer: No event found on expiry\n");
01003 exit(-1);
01004 }
01005
01006 PushbackEvent * event = firstEvent_;
01007 firstEvent_= firstEvent_->next_;
01008 schedule();
01009
01010 agent_->timeout(event);
01011 delete(event);
01012 }
01013
01014
01015 void
01016 PushbackTimer::insert(PushbackEvent * event) {
01017
01018 sprintf(agent_->prnMsg,"%s timer set\n", PushbackEvent::type(event));
01019 agent_->printMsg(agent_->prnMsg,4);
01020 if (firstEvent_ == NULL) {
01021 firstEvent_ = event;
01022 schedule();
01023 return;
01024 }
01025
01026 if (event->time_ < firstEvent_->time_) {
01027 event->setSucc(firstEvent_);
01028 firstEvent_=event;
01029 schedule();
01030 return;
01031 }
01032
01033 PushbackEvent * listItem = firstEvent_;
01034 while (listItem->next_!=NULL && listItem->next_->time_ <= event->time_) {
01035 listItem = listItem->next_;
01036 }
01037
01038 event->setSucc(listItem->next_);
01039 listItem->setSucc(event);
01040
01041
01042 sanityCheck();
01043
01044 return;
01045 }
01046
01047 void
01048 PushbackTimer::removeEvents(RateLimitSession * rls) {
01049 if (firstEvent_==NULL) return;
01050 while (firstEvent_!= NULL && firstEvent_->rls_==rls) {
01051 cancel();
01052 PushbackEvent * event = firstEvent_;
01053 firstEvent_=firstEvent_->next_;
01054 delete(event);
01055 schedule();
01056 }
01057 if (firstEvent_==NULL) return;
01058
01059 PushbackEvent * previous = firstEvent_;
01060 PushbackEvent * current = firstEvent_->next_;
01061 while (current!=NULL) {
01062 if (current->rls_==rls) {
01063 previous->next_=current->next_;
01064 delete(current);
01065 current = previous->next_;
01066 continue;
01067 }
01068 previous=current;
01069 current=current->next_;
01070 }
01071
01072 }
01073
01074 void
01075 PushbackTimer::schedule() {
01076
01077 if (firstEvent_== NULL) {
01078 sprintf(agent_->prnMsg,"Timer: Nothing to schedule\n");
01079 agent_->printMsg(agent_->prnMsg, 0);
01080 return;
01081 }
01082
01083 resched(firstEvent_->time_ - Scheduler::instance().clock());
01084 }
01085
01086 void
01087 PushbackTimer::cancelStatus(RateLimitSession * rls) {
01088
01089 if (firstEvent_==NULL) {
01090 sprintf(agent_->prnMsg, " Error timer list empty\n");
01091 agent_->printMsg(agent_->prnMsg, 0);
01092
01093 exit(-1);
01094 }
01095
01096 if (firstEvent_->eventID_==PUSHBACK_STATUS_EVENT && firstEvent_->rls_==rls) {
01097 cancel();
01098 PushbackEvent * event = firstEvent_;
01099 firstEvent_=firstEvent_->next_;
01100 delete(event);
01101 schedule();
01102 return;
01103 }
01104
01105 PushbackEvent * previous = firstEvent_;
01106 PushbackEvent * current = firstEvent_->next_;
01107
01108 while (current!=NULL) {
01109 if (current->eventID_ == PUSHBACK_STATUS_EVENT && current->rls_==rls) {
01110 previous->next_=current->next_;
01111 delete(current);
01112 return;
01113 }
01114 previous=current;
01115 current=current->next_;
01116 }
01117
01118 sprintf(agent_->prnMsg, "Error status timer not found\n");
01119 agent_->printMsg(agent_->prnMsg, 0);
01120 exit(-1);
01121 }
01122
01123 int
01124 PushbackTimer::containsRefresh(int qid) {
01125 PushbackEvent * listItem = firstEvent_;
01126 while (listItem!=NULL) {
01127 if (listItem->eventID_ == PUSHBACK_REFRESH_EVENT && listItem->qid_==qid)
01128 return 1;
01129 listItem = listItem->next_;
01130 }
01131 return 0;
01132 }
01133
01134 void
01135 PushbackTimer::sanityCheck() {
01136
01137 if (firstEvent_==NULL || firstEvent_->next_ == NULL) return;
01138
01139 PushbackEvent * listItem = firstEvent_;
01140 while (listItem->next_!=NULL) {
01141 if (listItem->time_ > listItem->next_->time_) {
01142 sprintf(agent_->prnMsg, "Sanity Check Failed\n");
01143 agent_->printMsg(agent_->prnMsg, 0);
01144 exit(-1);
01145 }
01146 listItem = listItem->next_;
01147 }
01148
01149 }
01150