00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <stdio.h>
00036 #include <stdlib.h>
00037 #include <sys/types.h>
00038 #include <math.h>
00039
00040 #include "tfrc-sink.h"
00041 #include "formula-with-inverse.h"
00042 #include "flags.h"
00043
00044 static class TfrcSinkClass : public TclClass {
00045 public:
00046 TfrcSinkClass() : TclClass("Agent/TFRCSink") {}
00047 TclObject* create(int, const char*const*) {
00048 return (new TfrcSinkAgent());
00049 }
00050 } class_tfrcSink;
00051
00052
00053 TfrcSinkAgent::TfrcSinkAgent() : Agent(PT_TFRC_ACK), nack_timer_(this)
00054 {
00055 bind("packetSize_", &size_);
00056 bind("InitHistorySize_", &hsz);
00057 bind("NumFeedback_", &NumFeedback_);
00058 bind ("AdjustHistoryAfterSS_", &adjust_history_after_ss);
00059 bind ("printLoss_", &printLoss_);
00060 bind ("algo_", &algo);
00061 bind ("PreciseLoss_", &PreciseLoss_);
00062 bind ("numPkts_", &numPkts_);
00063
00064
00065 bind ("NumSamples_", &numsamples);
00066 bind ("discount_", &discount);
00067 bind ("smooth_", &smooth_);
00068
00069
00070 bind ("history_", &history);
00071
00072
00073 bind("minlc_", &minlc);
00074
00075 rtt_ = 0;
00076 tzero_ = 0;
00077 last_timestamp_ = 0;
00078 last_arrival_ = 0;
00079 last_report_sent=0;
00080
00081 maxseq = -1;
00082 maxseqList = -1;
00083 rcvd_since_last_report = 0;
00084 losses_since_last_report = 0;
00085 loss_seen_yet = 0;
00086 lastloss = 0;
00087 lastloss_round_id = -1 ;
00088 numPktsSoFar_ = 0;
00089
00090 rtvec_ = NULL;
00091 tsvec_ = NULL;
00092 lossvec_ = NULL;
00093
00094
00095 last_sample = 0;
00096
00097
00098 false_sample = 0;
00099 sample = NULL ;
00100 weights = NULL ;
00101 mult = NULL ;
00102 sample_count = 1 ;
00103 mult_factor_ = 1.0;
00104 init_WALI_flag = 0;
00105
00106
00107 avg_loss_int = -1 ;
00108 loss_int = 0 ;
00109
00110
00111 sendrate = 0 ;
00112 }
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124 int TfrcSinkAgent::new_loss(int i, double tstamp)
00125 {
00126 if ((tsvec_[i%hsz]-lastloss > rtt_)
00127 && (PreciseLoss_ == 0 || (round_id > lastloss_round_id))) {
00128 lastloss = tstamp;
00129 lastloss_round_id = round_id ;
00130 return TRUE;
00131 } else return FALSE;
00132 }
00133
00134 double TfrcSinkAgent::estimate_tstamp(int before, int after, int i)
00135 {
00136 double delta = (tsvec_[after%hsz]-tsvec_[before%hsz])/(after-before) ;
00137 double tstamp = tsvec_[before%hsz]+(i-before)*delta ;
00138 return tstamp;
00139 }
00140
00141
00142
00143
00144 void TfrcSinkAgent::recv(Packet *pkt, Handler *)
00145 {
00146 hdr_tfrc *tfrch = hdr_tfrc::access(pkt);
00147 hdr_flags* hf = hdr_flags::access(pkt);
00148 double now = Scheduler::instance().clock();
00149 double p = -1;
00150 int ecnEvent = 0;
00151 int congestionEvent = 0;
00152 int UrgentFlag = 0;
00153 int newdata = 0;
00154
00155 rcvd_since_last_report ++;
00156
00157 if (maxseq < 0) {
00158
00159 newdata = 1;
00160 maxseq = tfrch->seqno - 1 ;
00161 maxseqList = tfrch->seqno;
00162 rtvec_=(double *)malloc(sizeof(double)*hsz);
00163 tsvec_=(double *)malloc(sizeof(double)*hsz);
00164 lossvec_=(char *)malloc(sizeof(double)*hsz);
00165 if (rtvec_ && lossvec_) {
00166 int i;
00167 for (i = 0; i < hsz ; i ++) {
00168 lossvec_[i] = UNKNOWN;
00169 rtvec_[i] = -1;
00170 tsvec_[i] = -1;
00171 }
00172 }
00173 else {
00174 printf ("error allocating memory for packet buffers\n");
00175 abort ();
00176 }
00177 }
00178
00179
00180 int seqno = tfrch->seqno ;
00181 int oldmaxseq = maxseq;
00182
00183
00184 if ((seqno > maxseq) ||
00185 (seqno > maxseqList && lossvec_[seqno%hsz] == UNKNOWN )) {
00186 if (seqno > maxseqList + 1)
00187 ++ numPktsSoFar_;
00188 UrgentFlag = tfrch->UrgentFlag;
00189 round_id = tfrch->round_id ;
00190 rtt_=tfrch->rtt;
00191 tzero_=tfrch->tzero;
00192 psize_=tfrch->psize;
00193 sendrate = tfrch->rate;
00194 last_arrival_=now;
00195 last_timestamp_=tfrch->timestamp;
00196 rtvec_[seqno%hsz]=now;
00197 tsvec_[seqno%hsz]=last_timestamp_;
00198 if (hf->ect() == 1 && hf->ce() == 1) {
00199 lossvec_[seqno%hsz] = ECN_RCVD;
00200 if (new_loss(seqno, tsvec_[seqno%hsz])) {
00201
00202 ecnEvent = 1;
00203 lossvec_[seqno%hsz] = ECNLOST;
00204 losses_since_last_report++;
00205 }
00206 } else lossvec_[seqno%hsz] = RCVD;
00207 }
00208 if (seqno > maxseqList &&
00209 (ecnEvent || numPktsSoFar_ >= numPkts_ ||
00210 tsvec_[seqno%hsz] - tsvec_[maxseqList%hsz] > rtt_)) {
00211
00212 int i = maxseqList ;
00213 while(i < seqno) {
00214 if (lossvec_[i%hsz] == UNKNOWN) {
00215 rtvec_[i%hsz]=now;
00216 tsvec_[i%hsz]=estimate_tstamp(oldmaxseq, seqno, i);
00217 if (new_loss(i, tsvec_[i%hsz])) {
00218 congestionEvent = 1;
00219 lossvec_[i%hsz] = LOST;
00220 } else {
00221
00222
00223 lossvec_[i%hsz] = NOT_RCVD;
00224 }
00225 losses_since_last_report++;
00226 }
00227 i++;
00228 }
00229 maxseqList = seqno;
00230 numPktsSoFar_ = 0;
00231 } else if (seqno == maxseqList + 1) {
00232 maxseqList = seqno;
00233 numPktsSoFar_ = 0;
00234 }
00235 if (seqno > maxseq) {
00236 maxseq = tfrch->seqno ;
00237
00238
00239 if ((algo == WALI) && (loss_seen_yet ==0) &&
00240 (tfrch->seqno - oldmaxseq > 1 || ecnEvent )) {
00241 UrgentFlag = 1 ;
00242 loss_seen_yet = 1;
00243 if (adjust_history_after_ss) {
00244 p = adjust_history(tfrch->timestamp);
00245 }
00246
00247 }
00248 if ((rtt_ > SMALLFLOAT) &&
00249 (now - last_report_sent >= rtt_/NumFeedback_)) {
00250 UrgentFlag = 1 ;
00251 }
00252 }
00253 if (UrgentFlag || ecnEvent || congestionEvent) {
00254 nextpkt(p);
00255 }
00256 Packet::free(pkt);
00257 }
00258
00259 double TfrcSinkAgent::est_loss ()
00260 {
00261 double p = 0 ;
00262 switch (algo) {
00263 case WALI:
00264 p = est_loss_WALI () ;
00265 break;
00266 case EWMA:
00267 p = est_loss_EWMA () ;
00268 break;
00269 case RBPH:
00270 p = est_loss_RBPH () ;
00271 break;
00272 case EBPH:
00273 p = est_loss_EBPH () ;
00274 break;
00275 default:
00276 printf ("invalid algo specified\n");
00277 abort();
00278 break ;
00279 }
00280 return p;
00281 }
00282
00283
00284
00285
00286 double TfrcSinkAgent::est_thput ()
00287 {
00288 double time_for_rcv_rate;
00289 double now = Scheduler::instance().clock();
00290 double thput = 1 ;
00291
00292 if ((rtt_ > 0) && ((now - last_report_sent) >= rtt_)) {
00293
00294 time_for_rcv_rate = (now - last_report_sent);
00295 if (time_for_rcv_rate > 0 && rcvd_since_last_report > 0) {
00296 thput = rcvd_since_last_report/time_for_rcv_rate;
00297 }
00298 }
00299 else {
00300
00301 if (rtt_ > 0){
00302 double last = rtvec_[maxseq%hsz];
00303 int rcvd = 0;
00304 int i = maxseq;
00305 while (i > 0) {
00306 if (lossvec_[i%hsz] == RCVD) {
00307 if ((rtvec_[i%hsz] + rtt_) > last)
00308 rcvd++;
00309 else
00310 break ;
00311 }
00312 i--;
00313 }
00314 if (rcvd > 0)
00315 thput = rcvd/rtt_;
00316 }
00317 }
00318 return thput ;
00319 }
00320
00321
00322
00323
00324 void TfrcSinkAgent::nextpkt(double p) {
00325
00326 sendpkt(p);
00327
00328
00329 if (rtt_ > 0.0 && NumFeedback_ > 0)
00330 nack_timer_.resched(1.5*rtt_/NumFeedback_);
00331 }
00332
00333
00334
00335
00336 void TfrcSinkAgent::sendpkt(double p)
00337 {
00338 double now = Scheduler::instance().clock();
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348 if (last_arrival_ >= last_report_sent) {
00349
00350 Packet* pkt = allocpkt();
00351 if (pkt == NULL) {
00352 printf ("error allocating packet\n");
00353 abort();
00354 }
00355
00356 hdr_tfrc_ack *tfrc_ackh = hdr_tfrc_ack::access(pkt);
00357
00358 tfrc_ackh->seqno=maxseq;
00359 tfrc_ackh->timestamp_echo=last_timestamp_;
00360 tfrc_ackh->timestamp_offset=now-last_arrival_;
00361 tfrc_ackh->timestamp=now;
00362 tfrc_ackh->NumFeedback_ = NumFeedback_;
00363 if (p < 0)
00364 tfrc_ackh->flost = est_loss ();
00365 else
00366 tfrc_ackh->flost = p;
00367 tfrc_ackh->rate_since_last_report = est_thput ();
00368 tfrc_ackh->losses = losses_since_last_report;
00369 last_report_sent = now;
00370 rcvd_since_last_report = 0;
00371 losses_since_last_report = 0;
00372 send(pkt, 0);
00373 }
00374 }
00375
00376 int TfrcSinkAgent::command(int argc, const char*const* argv)
00377 {
00378 if (argc == 3) {
00379 if (strcmp(argv[1], "weights") == 0) {
00380
00381
00382
00383
00384
00385
00386 char *w ;
00387 w = (char *)calloc(strlen(argv[2])+1, sizeof(char)) ;
00388 if (w == NULL) {
00389 printf ("error allocating w\n");
00390 abort();
00391 }
00392 strcpy(w, (char *)argv[2]);
00393 numsamples = atoi(strtok(w,"+"));
00394 sample = (int *)malloc((numsamples+1)*sizeof(int));
00395 weights = (double *)malloc((numsamples+1)*sizeof(double));
00396 mult = (double *)malloc((numsamples+1)*sizeof(double));
00397 fflush(stdout);
00398 if (sample && weights) {
00399 int count = 0 ;
00400 while (count < numsamples) {
00401 sample[count] = 0;
00402 mult[count] = 1;
00403 char *w;
00404 w = strtok(NULL, "+");
00405 if (w == NULL)
00406 break ;
00407 else {
00408 weights[count++] = atof(w);
00409 }
00410 }
00411 if (count < numsamples) {
00412 printf ("error in weights string %s\n", argv[2]);
00413 abort();
00414 }
00415 sample[count] = 0;
00416 weights[count] = 0;
00417 mult[count] = 1;
00418 free(w);
00419 return (TCL_OK);
00420 }
00421 else {
00422 printf ("error allocating memory for smaple and weights:2\n");
00423 abort();
00424 }
00425 }
00426 }
00427 return (Agent::command(argc, argv));
00428 }
00429
00430 void TfrcNackTimer::expire(Event *) {
00431 a_->nextpkt(-1);
00432 }
00433
00434 void TfrcSinkAgent::print_loss(int sample, double ave_interval)
00435 {
00436 double now = Scheduler::instance().clock();
00437 double drops = 1/ave_interval;
00438
00439
00440 printf ("time: %7.5f loss_rate: %7.5f \n", now, drops);
00441 printf ("time: %7.5f sample 0: %5d loss_rate: %7.5f \n",
00442 now, sample, drops);
00443
00444
00445 }
00446
00447 void TfrcSinkAgent::print_loss_all(int *sample)
00448 {
00449 double now = Scheduler::instance().clock();
00450 printf ("%f: sample 0: %5d 1: %5d 2: %5d 3: %5d 4: %5d\n",
00451 now, sample[0], sample[1], sample[2], sample[3], sample[4]);
00452 }
00453
00455
00457
00458
00462 double TfrcSinkAgent::est_loss_WALI ()
00463 {
00464 int i;
00465 double ave_interval1, ave_interval2;
00466 int ds ;
00467
00468 if (!init_WALI_flag) {
00469 init_WALI () ;
00470 }
00471
00472
00473 for (i = last_sample; i <= maxseq ; i ++) {
00474 sample[0]++;
00475 if (lossvec_[i%hsz] == LOST || lossvec_[i%hsz] == ECNLOST) {
00476
00477
00478 sample_count ++;
00479 shift_array (sample, numsamples+1, 0);
00480 multiply_array(mult, numsamples+1, mult_factor_);
00481 shift_array (mult, numsamples+1, 1.0);
00482 mult_factor_ = 1.0;
00483 }
00484 }
00485 last_sample = maxseq+1 ;
00486
00487 if (sample_count>numsamples+1)
00488
00489 ds=numsamples+1;
00490 else
00491 ds=sample_count;
00492
00493 if (sample_count == 1 && false_sample == 0)
00494
00495 return 0;
00496
00497 if (sample_count > 1 && discount && sample[0] > 0) {
00498 double ave = weighted_average(1, ds, 1.0, mult, weights, sample);
00499 int factor = 2;
00500 double ratio = (factor*ave)/sample[0];
00501 double min_ratio = 0.5;
00502 if ( ratio < 1.0) {
00503
00504 mult_factor_ = ratio;
00505 if (mult_factor_ < min_ratio)
00506 mult_factor_ = min_ratio;
00507 }
00508 }
00509
00510 ave_interval1 = weighted_average(0, ds, mult_factor_, mult, weights, sample);
00511
00512
00513
00514
00515 ave_interval2 = weighted_average(1, ds, mult_factor_, mult, weights, sample);
00516 if (ave_interval2 > ave_interval1)
00517 ave_interval1 = ave_interval2;
00518 if (ave_interval1 > 0) {
00519 if (printLoss_ > 0) {
00520 print_loss(sample[0], ave_interval1);
00521 print_loss_all(sample);
00522 }
00523 return 1/ave_interval1;
00524 } else return 999;
00525 }
00526
00527
00528
00529 double TfrcSinkAgent::weighted_average(int start, int end, double factor, double *m, double *w, int *sample)
00530 {
00531 int i;
00532 double wsum = 0;
00533 double answer = 0;
00534 if (smooth_ == 1 && start == 0) {
00535 if (end == numsamples+1) {
00536
00537
00538 end = end-1;
00539 }
00540
00541 for (i = start ; i < end; i++)
00542 if (i==0)
00543 wsum += m[i]*w[i+1];
00544 else
00545 wsum += factor*m[i]*w[i+1];
00546 for (i = start ; i < end; i++)
00547 if (i==0)
00548 answer += m[i]*w[i+1]*sample[i]/wsum;
00549 else
00550 answer += factor*m[i]*w[i+1]*sample[i]/wsum;
00551 return answer;
00552
00553 } else {
00554 for (i = start ; i < end; i++)
00555 if (i==0)
00556 wsum += m[i]*w[i];
00557 else
00558 wsum += factor*m[i]*w[i];
00559 for (i = start ; i < end; i++)
00560 if (i==0)
00561 answer += m[i]*w[i]*sample[i]/wsum;
00562 else
00563 answer += factor*m[i]*w[i]*sample[i]/wsum;
00564 return answer;
00565 }
00566 }
00567
00568
00569 void TfrcSinkAgent::shift_array(int *a, int sz, int defval)
00570 {
00571 int i ;
00572 for (i = sz-2 ; i >= 0 ; i--) {
00573 a[i+1] = a[i] ;
00574 }
00575 a[0] = defval;
00576 }
00577 void TfrcSinkAgent::shift_array(double *a, int sz, double defval) {
00578 int i ;
00579 for (i = sz-2 ; i >= 0 ; i--) {
00580 a[i+1] = a[i] ;
00581 }
00582 a[0] = defval;
00583 }
00584
00585
00586
00587 void TfrcSinkAgent::multiply_array(double *a, int sz, double multiplier) {
00588 int i ;
00589 for (i = 1; i <= sz-1; i++) {
00590 double old = a[i];
00591 a[i] = old * multiplier ;
00592 }
00593 }
00594
00595
00596
00597
00598 double TfrcSinkAgent::adjust_history (double ts)
00599 {
00600 int i;
00601 double p;
00602 for (i = maxseq; i >= 0 ; i --) {
00603 if (lossvec_[i%hsz] == LOST || lossvec_[i%hsz] == ECNLOST ) {
00604 lossvec_[i%hsz] = NOT_RCVD;
00605 }
00606 }
00607 lastloss = ts;
00608 lastloss_round_id = round_id ;
00609 p=b_to_p(est_thput()*psize_, rtt_, tzero_, psize_, 1);
00610 false_sample = (int)(1.0/p);
00611 sample[1] = false_sample;
00612 sample[0] = 0;
00613 sample_count++;
00614 if (printLoss_) {
00615 print_loss_all (sample);
00616 }
00617 false_sample = -1 ;
00618 return p;
00619 }
00620
00621
00622
00623
00624
00625 void TfrcSinkAgent::init_WALI () {
00626 int i;
00627 if (numsamples < 0)
00628 numsamples = DEFAULT_NUMSAMPLES ;
00629 if (smooth_ == 1) {
00630 numsamples = numsamples + 1;
00631 }
00632 sample = (int *)malloc((numsamples+1)*sizeof(int));
00633 weights = (double *)malloc((numsamples+1)*sizeof(double));
00634 mult = (double *)malloc((numsamples+1)*sizeof(double));
00635 for (i = 0 ; i < numsamples+1 ; i ++) {
00636 sample[i] = 0 ;
00637 }
00638 if (smooth_ == 1) {
00639 int mid = int(numsamples/2);
00640 for (i = 0; i < mid; i ++) {
00641 weights[i] = 1.0;
00642 }
00643 for (i = mid; i <= numsamples; i ++){
00644 weights[i] = 1.0 - (i-mid)/(mid + 1.0);
00645 }
00646 } else {
00647 int mid = int(numsamples/2);
00648 for (i = 0; i < mid; i ++) {
00649 weights[i] = 1.0;
00650 }
00651 for (i = mid; i <= numsamples; i ++){
00652 weights[i] = 1.0 - (i+1-mid)/(mid + 1.0);
00653 }
00654 }
00655 for (i = 0; i < numsamples+1; i ++) {
00656 mult[i] = 1.0 ;
00657 }
00658 init_WALI_flag = 1;
00659 }
00660
00662
00664
00665 double TfrcSinkAgent::est_loss_EWMA () {
00666 double p1, p2 ;
00667 for (int i = last_sample; i <= maxseq ; i ++) {
00668 loss_int++;
00669 if (lossvec_[i%hsz] == LOST || lossvec_[i%hsz] == ECNLOST ) {
00670 if (avg_loss_int < 0) {
00671 avg_loss_int = loss_int ;
00672 } else {
00673 avg_loss_int = history*avg_loss_int + (1-history)*loss_int ;
00674 }
00675 loss_int = 0 ;
00676 }
00677 }
00678 last_sample = maxseq+1 ;
00679
00680 if (avg_loss_int < 0) {
00681 p1 = 0;
00682 } else {
00683 p1 = 1.0/avg_loss_int ;
00684 }
00685 if (loss_int == 0) {
00686 p2 = p1 ;
00687 } else {
00688 p2 = 1.0/(history*avg_loss_int + (1-history)*loss_int) ;
00689 }
00690 if (p2 < p1) {
00691 p1 = p2 ;
00692 }
00693 if (printLoss_ > 0) {
00694 if (p1 > 0)
00695 print_loss(loss_int, 1.0/p1);
00696 else
00697 print_loss(loss_int, 0.00001);
00698 print_loss_all(sample);
00699 }
00700 return p1 ;
00701 }
00702
00704
00706 double TfrcSinkAgent::est_loss_RBPH () {
00707
00708 double numpkts = hsz ;
00709 double p ;
00710
00711
00712 if (sendrate > 0 && rtt_ > 0) {
00713 double x = b_to_p(sendrate, rtt_, tzero_, psize_, 1);
00714 if (x > 0)
00715 numpkts = minlc/x ;
00716 else
00717 numpkts = hsz ;
00718 }
00719
00720
00721 if (numpkts > maxseq)
00722 numpkts = maxseq ;
00723 if (numpkts > hsz)
00724 numpkts = hsz ;
00725
00726 int lc = 0;
00727 int pc = 0;
00728 int i = maxseq ;
00729
00730
00731 while (pc < numpkts) {
00732 pc ++ ;
00733 if (lossvec_[i%hsz] == LOST || lossvec_[i%hsz] == ECNLOST )
00734 lc ++ ;
00735 i -- ;
00736 }
00737
00738
00739 if (lc < minlc) {
00740
00741
00742 numpkts = maxseq ;
00743 if (numpkts > hsz)
00744 numpkts = hsz ;
00745
00746 while ((lc < minlc) && (pc < numpkts)) {
00747 pc ++ ;
00748 if (lossvec_[i%hsz] == LOST || lossvec_[i%hsz] == ECNLOST )
00749 lc ++ ;
00750 i -- ;
00751
00752 }
00753 }
00754
00755 if (pc == 0)
00756 p = 0;
00757 else
00758 p = (double)lc/(double)pc ;
00759 if (printLoss_ > 0) {
00760 if (p > 0)
00761 print_loss(0, 1.0/p);
00762 else
00763 print_loss(0, 0.00001);
00764 print_loss_all(sample);
00765 }
00766 return p ;
00767 }
00768
00770
00772 double TfrcSinkAgent::est_loss_EBPH () {
00773
00774 double numpkts = hsz ;
00775 double p ;
00776
00777 int lc = 0;
00778 int pc = 0;
00779 int i = maxseq ;
00780
00781 numpkts = maxseq ;
00782 if (numpkts > hsz)
00783 numpkts = hsz ;
00784
00785 while ((lc < minlc) && (pc < numpkts)) {
00786 pc ++ ;
00787 if (lossvec_[i%hsz] == LOST || lossvec_[i%hsz] == ECNLOST)
00788 lc ++ ;
00789 i -- ;
00790 }
00791
00792 if (pc == 0)
00793 p = 0;
00794 else
00795 p = (double)lc/(double)pc ;
00796 if (printLoss_ > 0) {
00797 if (p > 0)
00798 print_loss(0, 1.0/p);
00799 else
00800 print_loss(0, 0.00001);
00801 print_loss_all(sample);
00802 }
00803 return p ;
00804 }