00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "flags.h"
00038 #include "ip.h"
00039 #include "tcp-sink.h"
00040 #include "hdr_qs.h"
00041
00042 static class TcpSinkClass : public TclClass {
00043 public:
00044 TcpSinkClass() : TclClass("Agent/TCPSink") {}
00045 TclObject* create(int, const char*const*) {
00046 return (new TcpSink(new Acker));
00047 }
00048 } class_tcpsink;
00049
00050 Acker::Acker() : next_(0), maxseen_(0), wndmask_(MWM), ecn_unacked_(0),
00051 ts_to_echo_(0)
00052 {
00053 seen_ = new int[MWS];
00054 memset(seen_, 0, (sizeof(int) * (MWS)));
00055 }
00056
00057 void Acker::reset()
00058 {
00059 next_ = 0;
00060 maxseen_ = 0;
00061 memset(seen_, 0, (sizeof(int) * (wndmask_ + 1)));
00062 }
00063
00064
00065
00066 void Acker::resize_buffers(int sz) {
00067 int* new_seen = new int[sz];
00068 int new_wndmask = sz - 1;
00069
00070 if(!new_seen){
00071 fprintf(stderr, "Unable to allocate buffer seen_[%i]\n", sz);
00072 exit(1);
00073 }
00074
00075 memset(new_seen, 0, (sizeof(int) * (sz)));
00076
00077 for(int i = next_; i <= maxseen_+1; i++){
00078 new_seen[i & new_wndmask] = seen_[i&wndmask_];
00079 }
00080
00081 delete[] seen_;
00082 seen_ = new_seen;
00083 wndmask_ = new_wndmask;
00084 return;
00085 }
00086
00087 void Acker::update_ts(int seqno, double ts)
00088 {
00089 if (ts >= ts_to_echo_ && seqno <= next_)
00090 ts_to_echo_ = ts;
00091 }
00092
00093
00094
00095 int Acker::update(int seq, int numBytes)
00096 {
00097 bool just_marked_as_seen = FALSE;
00098 is_dup_ = FALSE;
00099
00100 if (numBytes <= 0)
00101 printf("Error, received TCP packet size <= 0\n");
00102 int numToDeliver = 0;
00103 while(seq + 1 - next_ >= wndmask_) {
00104
00105
00106
00107
00108 resize_buffers((wndmask_+1)*2);
00109 }
00110
00111 if (seq > maxseen_) {
00112
00113 int i;
00114 for (i = maxseen_ + 1; i < seq; ++i)
00115 seen_[i & wndmask_] = 0;
00116
00117
00118
00119 maxseen_ = seq;
00120 seen_[maxseen_ & wndmask_] = numBytes;
00121
00122 seen_[(maxseen_ + 1) & wndmask_] = 0;
00123
00124
00125 just_marked_as_seen = TRUE;
00126
00127 }
00128 int next = next_;
00129 if (seq < next) {
00130
00131
00132
00133 #ifdef DEBUGDSACK
00134 printf("%f\t Received duplicate packet %d\n",Scheduler::instance().clock(),seq);
00135 #endif
00136 is_dup_ = TRUE;
00137 }
00138
00139 if (seq >= next && seq <= maxseen_) {
00140
00141
00142
00143
00144
00145 if (seen_[seq & wndmask_] && !just_marked_as_seen) {
00146
00147
00148
00149 is_dup_ = TRUE;
00150 #ifdef DEBUGDSACK
00151 printf("%f\t Received duplicate packet %d\n",Scheduler::instance().clock(),seq);
00152 #endif
00153 }
00154 seen_[seq & wndmask_] = numBytes;
00155
00156 while (seen_[next & wndmask_]) {
00157
00158
00159
00160
00161
00162
00163
00164
00165 numToDeliver += seen_[next & wndmask_];
00166 ++next;
00167 }
00168 next_ = next;
00169
00170 }
00171 return numToDeliver;
00172 }
00173
00174 TcpSink::TcpSink(Acker* acker) : Agent(PT_ACK), acker_(acker), save_(NULL),
00175 lastreset_(0.0)
00176 {
00177 bytes_ = 0;
00178
00179
00180
00181
00182
00183 #if defined(TCP_DELAY_BIND_ALL) && 0
00184 #else
00185 bind("maxSackBlocks_", &max_sack_blocks_);
00186 #endif
00187 }
00188
00189 void
00190 TcpSink::delay_bind_init_all()
00191 {
00192 delay_bind_init_one("packetSize_");
00193 delay_bind_init_one("ts_echo_bugfix_");
00194 delay_bind_init_one("bytes_");
00195 delay_bind_init_one("generateDSacks_");
00196 delay_bind_init_one("qs_enabled_");
00197 delay_bind_init_one("RFC2581_immediate_ack_");
00198 #if defined(TCP_DELAY_BIND_ALL) && 0
00199 delay_bind_init_one("maxSackBlocks_");
00200 #endif
00201
00202 Agent::delay_bind_init_all();
00203 }
00204
00205 int
00206 TcpSink::delay_bind_dispatch(const char *varName, const char *localName, TclObject *tracer)
00207 {
00208 if (delay_bind(varName, localName, "packetSize_", &size_, tracer)) return TCL_OK;
00209 if (delay_bind_bool(varName, localName, "ts_echo_bugfix_", &ts_echo_bugfix_, tracer)) return TCL_OK;
00210 if (delay_bind_bool(varName, localName, "generateDSacks_", &generate_dsacks_, tracer)) return TCL_OK;
00211 if (delay_bind_bool(varName, localName, "qs_enabled_", &qs_enabled_, tracer)) return TCL_OK;
00212 if (delay_bind_bool(varName, localName, "RFC2581_immediate_ack_", &RFC2581_immediate_ack_, tracer)) return TCL_OK;
00213 #if defined(TCP_DELAY_BIND_ALL) && 0
00214 if (delay_bind(varName, localName, "maxSackBlocks_", &max_sack_blocks_, tracer)) return TCL_OK;
00215 #endif
00216
00217 return Agent::delay_bind_dispatch(varName, localName, tracer);
00218 }
00219
00220 void Acker::append_ack(hdr_cmn*, hdr_tcp*, int) const
00221 {
00222 }
00223
00224 void Acker::update_ecn_unacked(int value)
00225 {
00226 ecn_unacked_ = value;
00227 }
00228
00229 int TcpSink::command(int argc, const char*const* argv)
00230 {
00231 if (argc == 2) {
00232 if (strcmp(argv[1], "reset") == 0) {
00233 reset();
00234 return (TCL_OK);
00235 }
00236 if (strcmp(argv[1], "resize_buffers") == 0) {
00237
00238 fprintf(stderr,"DEPRECIATED: resize_buffers\n");
00239 return (TCL_OK);
00240 }
00241 }
00242
00243 return (Agent::command(argc, argv));
00244 }
00245
00246 void TcpSink::reset()
00247 {
00248 acker_->reset();
00249 save_ = NULL;
00250 lastreset_ = Scheduler::instance().clock();
00251
00252 }
00253
00254 void TcpSink::ack(Packet* opkt)
00255 {
00256 Packet* npkt = allocpkt();
00257
00258
00259 double now = Scheduler::instance().clock();
00260 hdr_flags *sf;
00261
00262 hdr_tcp *otcp = hdr_tcp::access(opkt);
00263 hdr_ip *oiph = hdr_ip::access(opkt);
00264 hdr_tcp *ntcp = hdr_tcp::access(npkt);
00265
00266 if (qs_enabled_) {
00267
00268 hdr_qs *oqsh = hdr_qs::access(opkt);
00269 hdr_qs *nqsh = hdr_qs::access(npkt);
00270 if (otcp->seqno() == 0 && oqsh->flag() == QS_REQUEST) {
00271 nqsh->flag() = QS_RESPONSE;
00272 nqsh->ttl() = (oiph->ttl() - oqsh->ttl()) % 256;
00273 nqsh->rate() = (oqsh->rate() < MWS) ? oqsh->rate() : MWS;
00274 }
00275 else {
00276 nqsh->flag() = QS_DISABLE;
00277 }
00278 }
00279
00280
00281
00282 ntcp->seqno() = acker_->Seqno();
00283
00284
00285 ntcp->ts() = now;
00286
00287
00288 if (ts_echo_bugfix_)
00289 ntcp->ts_echo() = acker_->ts_to_echo();
00290 else
00291 ntcp->ts_echo() = otcp->ts();
00292
00293
00294 hdr_ip* oip = hdr_ip::access(opkt);
00295 hdr_ip* nip = hdr_ip::access(npkt);
00296
00297 nip->flowid() = oip->flowid();
00298
00299
00300 hdr_flags* of = hdr_flags::access(opkt);
00301 hdr_flags* nf = hdr_flags::access(npkt);
00302 if (save_ != NULL)
00303 sf = hdr_flags::access(save_);
00304
00305 if ( (save_ != NULL && sf->cong_action()) || of->cong_action() )
00306
00307 acker_->update_ecn_unacked(0);
00308 if ( (save_ != NULL && sf->ect() && sf->ce()) ||
00309 (of->ect() && of->ce()) )
00310
00311 acker_->update_ecn_unacked(1);
00312 if ( (save_ != NULL && sf->ect()) || of->ect() )
00313
00314 nf->ecnecho() = acker_->ecn_unacked();
00315 if (!of->ect() && of->ecnecho() ||
00316 (save_ != NULL && !sf->ect() && sf->ecnecho()) )
00317
00318
00319
00320
00321 nf->ecnecho() = 1;
00322 acker_->append_ack(hdr_cmn::access(npkt),
00323 ntcp, otcp->seqno());
00324 add_to_ack(npkt);
00325
00326
00327 send(npkt, 0);
00328
00329 }
00330
00331 void TcpSink::add_to_ack(Packet*)
00332 {
00333 return;
00334 }
00335
00336
00337 void TcpSink::recv(Packet* pkt, Handler*)
00338 {
00339 int numToDeliver;
00340 int numBytes = hdr_cmn::access(pkt)->size();
00341
00342 hdr_tcp *th = hdr_tcp::access(pkt);
00343
00344 if (th->ts() < lastreset_) {
00345
00346 Packet::free(pkt);
00347 return;
00348 }
00349 acker_->update_ts(th->seqno(),th->ts());
00350
00351
00352 numToDeliver = acker_->update(th->seqno(), numBytes);
00353
00354
00355
00356 if (numToDeliver)
00357 recvBytes(numToDeliver);
00358
00359 ack(pkt);
00360
00361 Packet::free(pkt);
00362
00363 }
00364
00365 static class DelSinkClass : public TclClass {
00366 public:
00367 DelSinkClass() : TclClass("Agent/TCPSink/DelAck") {}
00368 TclObject* create(int, const char*const*) {
00369 return (new DelAckSink(new Acker));
00370 }
00371 } class_delsink;
00372
00373 DelAckSink::DelAckSink(Acker* acker) : TcpSink(acker), delay_timer_(this)
00374 {
00375 bind_time("interval_", &interval_);
00376 bind("bytes_", &bytes_);
00377 }
00378
00379 void DelAckSink::reset() {
00380 if (delay_timer_.status() == TIMER_PENDING)
00381 delay_timer_.cancel();
00382 TcpSink::reset();
00383 }
00384
00385 void DelAckSink::recv(Packet* pkt, Handler*)
00386 {
00387 int numToDeliver;
00388 int numBytes = hdr_cmn::access(pkt)->size();
00389 hdr_tcp *th = hdr_tcp::access(pkt);
00390
00391 if (th->ts() < lastreset_) {
00392
00393 Packet::free(pkt);
00394 return;
00395 }
00396 acker_->update_ts(th->seqno(),th->ts());
00397 numToDeliver = acker_->update(th->seqno(), numBytes);
00398 if (numToDeliver) {
00399 bytes_ += numToDeliver;
00400 recvBytes(numToDeliver);
00401 }
00402
00403
00404
00405 if (delay_timer_.status() != TIMER_PENDING &&
00406 th->seqno() == acker_->Seqno()) {
00407
00408
00409
00410
00411
00412
00413
00414
00415 if (RFC2581_immediate_ack_ &&
00416 (th->seqno() < acker_->Maxseen())) {
00417
00418
00419 } else {
00420
00421 save_ = pkt;
00422 delay_timer_.resched(interval_);
00423 return;
00424 }
00425 }
00426
00427 if (delay_timer_.status() == TIMER_PENDING)
00428 delay_timer_.cancel();
00429 ack(pkt);
00430 if (save_ != NULL) {
00431 Packet::free(save_);
00432 save_ = NULL;
00433 }
00434
00435 Packet::free(pkt);
00436 }
00437
00438 void DelAckSink::timeout(int)
00439 {
00440
00441 if ( save_ != NULL ) {
00442 Packet* pkt = save_;
00443 ack(pkt);
00444 save_ = NULL;
00445 Packet::free(pkt);
00446 }
00447 }
00448
00449 void DelayTimer::expire(Event* ) {
00450 a_->timeout(0);
00451 }
00452
00453
00454
00455 class SackStack {
00456 protected:
00457 int size_;
00458 int cnt_;
00459 struct Sf_Entry {
00460 int left_;
00461 int right_;
00462 } *SFE_;
00463 public:
00464 SackStack(int);
00465 ~SackStack();
00466 int& head_right(int n = 0) { return SFE_[n].right_; }
00467 int& head_left(int n = 0) { return SFE_[n].left_; }
00468 int cnt() { return cnt_; }
00469 void reset() {
00470 register int i;
00471 for (i = 0; i < cnt_; i++)
00472 SFE_[i].left_ = SFE_[i].right_ = -1;
00473
00474 cnt_ = 0;
00475 }
00476
00477 inline void push(int n = 0) {
00478 if (cnt_ >= size_) cnt_ = size_ - 1;
00479 register int i;
00480 for (i = cnt_-1; i >= n; i--)
00481 SFE_[i+1] = SFE_[i];
00482 cnt_++;
00483 }
00484
00485 inline void pop(int n = 0) {
00486 register int i;
00487 for (i = n; i < cnt_-1; i++)
00488 SFE_[i] = SFE_[i+1];
00489 SFE_[i].left_ = SFE_[i].right_ = -1;
00490 cnt_--;
00491 }
00492 };
00493
00494 SackStack::SackStack(int sz)
00495 {
00496 register int i;
00497 size_ = sz;
00498 SFE_ = new Sf_Entry[sz];
00499 for (i = 0; i < sz; i++)
00500 SFE_[i].left_ = SFE_[i].right_ = -1;
00501 cnt_ = 0;
00502 }
00503
00504 SackStack::~SackStack()
00505 {
00506 delete SFE_;
00507 }
00508
00509 static class Sack1TcpSinkClass : public TclClass {
00510 public:
00511 Sack1TcpSinkClass() : TclClass("Agent/TCPSink/Sack1") {}
00512 TclObject* create(int, const char*const*) {
00513 Sacker* sacker = new Sacker;
00514 TcpSink* sink = new TcpSink(sacker);
00515 sacker->configure(sink);
00516 return (sink);
00517 }
00518 } class_sack1tcpsink;
00519
00520 static class Sack1DelAckTcpSinkClass : public TclClass {
00521 public:
00522 Sack1DelAckTcpSinkClass() : TclClass("Agent/TCPSink/Sack1/DelAck") {}
00523 TclObject* create(int, const char*const*) {
00524 Sacker* sacker = new Sacker;
00525 TcpSink* sink = new DelAckSink(sacker);
00526 sacker->configure(sink);
00527 return (sink);
00528 }
00529 } class_sack1delacktcpsink;
00530
00531 void Sacker::configure(TcpSink *sink)
00532 {
00533 if (sink == NULL) {
00534 fprintf(stderr, "warning: Sacker::configure(): no TCP sink!\n");
00535 return;
00536 }
00537
00538 TracedInt& nblocks = sink->max_sack_blocks_;
00539 if (int(nblocks) > NSA) {
00540 fprintf(stderr, "warning(Sacker::configure): TCP header limits number of SACK blocks to %d, not %d\n", NSA, int(nblocks));
00541 nblocks = NSA;
00542 }
00543 sf_ = new SackStack(int(nblocks));
00544 nblocks.tracer(this);
00545 base_nblocks_ = int(nblocks);
00546 dsacks_ = &(sink->generate_dsacks_);
00547 }
00548
00549 void
00550 Sacker::trace(TracedVar *v)
00551 {
00552
00553 TracedInt* ti = (TracedInt*) v;
00554
00555 if (int(*ti) > NSA) {
00556 fprintf(stderr, "warning(Sacker::trace): TCP header limits number of SACK blocks to %d, not %d\n", NSA, int(*ti));
00557 *ti = NSA;
00558 }
00559
00560 int newval = int(*ti);
00561 delete sf_;
00562 sf_ = new SackStack(newval);
00563 base_nblocks_ = newval;
00564 }
00565
00566 void Sacker::reset()
00567 {
00568 sf_->reset();
00569 Acker::reset();
00570 }
00571
00572 Sacker::~Sacker()
00573 {
00574 delete sf_;
00575 }
00576
00577 void Sacker::append_ack(hdr_cmn* ch, hdr_tcp* h, int old_seqno) const
00578 {
00579
00580
00581
00582 int sack_index, i, sack_right, sack_left;
00583 int recent_sack_left, recent_sack_right;
00584
00585 int seqno = Seqno();
00586
00587
00588 sack_index = 0;
00589 sack_left = sack_right = -1;
00590
00591
00592 if (old_seqno < 0) {
00593 printf("Error: invalid packet number %d\n", old_seqno);
00594 } else if (seqno >= maxseen_ && (sf_->cnt() != 0))
00595 sf_->reset();
00596
00597
00598
00599 else if (( (seqno < maxseen_) || is_dup_ ) && (base_nblocks_ > 0)) {
00600
00601
00602
00603
00604
00605
00606
00607 if ((*dsacks_) && is_dup_) {
00608
00609 h->sa_left(sack_index) = old_seqno;
00610 h->sa_right(sack_index) = old_seqno+1;
00611
00612 sack_index++;
00613 #ifdef DEBUGDSACK
00614 printf("%f\t Generating D-SACK for packet %d\n", Scheduler::instance().clock(),old_seqno);
00615 #endif
00616
00617
00618 }
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630 if (sack_index >= base_nblocks_) {
00631 printf("Error: can't use DSACK with less than 2 SACK blocks\n");
00632 } else {
00633 sack_right=-1;
00634
00635
00636
00637 for (i=old_seqno; i<=maxseen_; i++) {
00638 if (!seen_[i & wndmask_]) {
00639 sack_right=i;
00640 break;
00641 }
00642 }
00643
00644
00645
00646 if (sack_right == -1) {
00647 sack_right = maxseen_+1;
00648 }
00649
00650
00651
00652 if (old_seqno <= seqno) {
00653 sack_left = 0;
00654
00655 } else {
00656
00657 for (i = sack_right-1; i > seqno; i--) {
00658 if (!seen_[i & wndmask_]) {
00659 sack_left = i+1;
00660 break;
00661 }
00662 }
00663 h->sa_left(sack_index) = sack_left;
00664 h->sa_right(sack_index) = sack_right;
00665
00666
00667
00668 sack_index++;
00669 }
00670
00671 recent_sack_left = sack_left;
00672 recent_sack_right = sack_right;
00673
00674
00675
00676
00677
00678 int k = 0;
00679 while (sack_index < base_nblocks_) {
00680
00681 sack_left = sf_->head_left(k);
00682 sack_right = sf_->head_right(k);
00683
00684
00685 if (sack_left < 0 || sack_right < 0 ||
00686 sack_right > maxseen_ + 1)
00687 break;
00688
00689
00690
00691 if (recent_sack_left <= sack_left &&
00692 recent_sack_right >= sack_right) {
00693 sf_->pop(k);
00694 continue;
00695 }
00696
00697 h->sa_left(sack_index) = sack_left;
00698 h->sa_right(sack_index) = sack_right;
00699
00700
00701
00702
00703 sack_index++;
00704 k++;
00705 }
00706
00707
00708 if (old_seqno > seqno) {
00709
00710 sf_->push();
00711
00712
00713
00714 sf_->head_left() = recent_sack_left;
00715 sf_->head_right() = recent_sack_right;
00716
00717
00718 }
00719
00720 }
00721
00722
00723
00724 }
00725 h->sa_length() = sack_index;
00726
00727 ch->size() += sack_index * 8;
00728
00729
00730 }