00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include <stdlib.h>
00038 #include <math.h>
00039 #include "ip.h"
00040 #include "flags.h"
00041 #include "random.h"
00042 #include "template.h"
00043 #include "nilist.h"
00044 #include "tcp.h"
00045 #include "tcp-int.h"
00046 #include "tcp-session.h"
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073 static class TcpSessionClass : public TclClass {
00074 public:
00075 TcpSessionClass() : TclClass("Agent/TCP/Session") {}
00076 TclObject* create(int, const char*const*) {
00077 return (new TcpSessionAgent());
00078 }
00079 } class_tcpsession;
00080
00081 TcpSessionAgent::TcpSessionAgent() : CorresHost(),
00082 rtx_timer_(this), burstsnd_timer_(this), sessionSeqno_(0),
00083 last_send_time_(-1), curConn_(0), numConsecSegs_(0),
00084 schedDisp_(FINE_ROUND_ROBIN), wtSum_(0), dynWtSum_(0)
00085 {
00086 bind("ownd_", &ownd_);
00087 bind("owndCorr_", &owndCorrection_);
00088 bind_bool("proxyopt_", &proxyopt_);
00089 bind_bool("fixedIw_", &fixedIw_);
00090 bind("schedDisp_", &schedDisp_);
00091 bind_bool("disableIntLossRecov_", &disableIntLossRecov_);
00092
00093 sessionList_.append(this);
00094 }
00095
00096 int
00097 TcpSessionAgent::command(int argc, const char*const* argv)
00098 {
00099 if (argc == 2) {
00100 if (!strcmp(argv[1], "resetwt")) {
00101 Islist_iter<IntTcpAgent> conn_iter(conns_);
00102 IntTcpAgent *tcp;
00103
00104 while ((tcp = conn_iter()) != NULL)
00105 tcp->wt_ = 1;
00106 wtSum_ = conn_iter.count();
00107 return (TCL_OK);
00108 }
00109 }
00110 return (CorresHost::command(argc, argv));
00111 }
00112
00113 void
00114 SessionRtxTimer::expire(Event*)
00115 {
00116 a_->timeout(TCP_TIMER_RTX);
00117 }
00118
00119 void
00120 SessionResetTimer::expire(Event*)
00121 {
00122 a_->timeout(TCP_TIMER_RESET);
00123 }
00124
00125 void
00126 SessionBurstSndTimer::expire(Event*)
00127 {
00128 a_->timeout(TCP_TIMER_BURSTSND);
00129 }
00130
00131 void
00132 TcpSessionAgent::reset_rtx_timer(int , int backoff)
00133 {
00134 if (backoff)
00135 rtt_backoff();
00136 set_rtx_timer();
00137 rtt_active_ = 0;
00138 }
00139
00140 void
00141 TcpSessionAgent::set_rtx_timer()
00142 {
00143 if (rtx_timer_.status() == TIMER_PENDING)
00144 rtx_timer_.cancel();
00145 if (reset_timer_.status() == TIMER_PENDING)
00146 reset_timer_.cancel();
00147 if (fs_enable_ && fs_mode_)
00148 reset_timer_.resched(rtt_exact_timeout());
00149 else
00150 rtx_timer_.resched(rtt_timeout());
00151 }
00152
00153 void
00154 TcpSessionAgent::cancel_rtx_timer()
00155 {
00156 rtx_timer_.force_cancel();
00157 reset_timer_.force_cancel();
00158 }
00159
00160 void
00161 TcpSessionAgent::cancel_timers()
00162 {
00163 rtx_timer_.force_cancel();
00164 reset_timer_.force_cancel();
00165 burstsnd_timer_.force_cancel();
00166 delsnd_timer_.force_cancel();
00167 }
00168
00169 int
00170 TcpSessionAgent::fs_pkt() {
00171 return (fs_enable_ && fs_mode_ && sessionSeqno_-1 >= fs_startseq_ &&
00172 sessionSeqno_-1 < fs_endseq_);
00173 }
00174
00175 void
00176 TcpSessionAgent::rtt_update_exact(double tao)
00177 {
00178 double g = 1/8;
00179 double h = 1/4;
00180 double delta;
00181
00182 if (t_exact_srtt_ != 0) {
00183 delta = tao - t_exact_srtt_;
00184 if (delta < 0)
00185 delta = -delta;
00186
00187 if (t_exact_srtt_ != 0)
00188 t_exact_srtt_ = g*tao + (1-g)*t_exact_srtt_;
00189 else
00190 t_exact_srtt_ = tao;
00191
00192 delta -= t_exact_rttvar_;
00193 t_exact_rttvar_ += h*delta;
00194 }
00195 else {
00196 t_exact_srtt_ = tao;
00197 t_exact_rttvar_ = tao/2;
00198 }
00199 }
00200
00201 void
00202 TcpSessionAgent::newack(Packet *pkt)
00203 {
00204 double now = Scheduler::instance().clock();
00205 Islist_iter<Segment> seg_iter(seglist_);
00206 hdr_tcp *tcph = hdr_tcp::access(pkt);
00207 hdr_flags *fh = hdr_flags::access(pkt);
00208
00209 if (!fh->no_ts_) {
00210
00211 if (ts_option_) {
00212 rtt_update(now - tcph->ts_echo());
00213 rtt_update_exact(now - tcph->ts_echo());
00214 }
00215
00216 if (rtt_active_ && rtt_seg_ == NULL) {
00217 t_backoff_ = 1;
00218 rtt_active_ = 0;
00219 if (!ts_option_)
00220 rtt_update(now - rtt_ts_);
00221 }
00222 }
00223 if (seg_iter.count() > 0)
00224 set_rtx_timer();
00225 else
00226 cancel_rtx_timer();
00227 }
00228
00229 void
00230 TcpSessionAgent::timeout(int tno)
00231 {
00232 if (tno == TCP_TIMER_BURSTSND)
00233 send_much(NULL,0,0);
00234 else if (tno == TCP_TIMER_RESET) {
00235 Islist_iter<Segment> seg_iter(seglist_);
00236 Segment *curseg;
00237 Islist_iter<IntTcpAgent> conn_iter(conns_);
00238 IntTcpAgent *curconn;
00239
00240 fs_mode_ = 0;
00241 if (seg_iter.count() == 0 && !slow_start_restart_) {
00242 return;
00243 }
00244 recover_ = sessionSeqno_ - 1;
00245 last_cwnd_action_ = CWND_ACTION_TIMEOUT;
00246 ownd_ = 0;
00247 owndCorrection_ = 0;
00248 while ((curconn = conn_iter()) != NULL) {
00249 curconn->maxseq_ = curconn->highest_ack_;
00250 curconn->t_seqno_ = curconn->highest_ack_ + 1;
00251 curconn->recover_ = curconn->maxseq_;
00252 curconn->last_cwnd_action_ = CWND_ACTION_TIMEOUT;
00253 }
00254 while ((curseg = seg_iter()) != NULL) {
00255
00256 curseg->size_ = 0;
00257 }
00258
00259
00260
00261
00262
00263
00264 if (connWithPktBeforeFS_) {
00265 connWithPktBeforeFS_ = NULL;
00266 timeout(TCP_TIMER_RTX);
00267 }
00268 else {
00269 slowdown(CLOSE_CWND_INIT);
00270 reset_rtx_timer(0,0);
00271 send_much(NULL, 0, TCP_REASON_TIMEOUT);
00272 }
00273 }
00274 else if (tno == TCP_TIMER_RTX) {
00275 Islist_iter<Segment> seg_iter(seglist_);
00276 Segment *curseg;
00277 Islist_iter<IntTcpAgent> conn_iter(conns_);
00278 IntTcpAgent *curconn;
00279
00280 if (seg_iter.count() == 0 && !slow_start_restart_) {
00281 return;
00282 }
00283 recover_ = sessionSeqno_ - 1;
00284 last_cwnd_action_ = CWND_ACTION_TIMEOUT;
00285 if (seg_iter.count() == 0 && restart_bugfix_) {
00286 slowdown(CLOSE_CWND_INIT);
00287 reset_rtx_timer(0,0);
00288 }
00289 else {
00290 slowdown(CLOSE_CWND_RESTART|CLOSE_SSTHRESH_HALF);
00291 reset_rtx_timer(0,1);
00292 }
00293 nrexmit_++;
00294 ownd_ = 0;
00295 owndCorrection_ = 0;
00296 while ((curconn = conn_iter()) != NULL) {
00297 curconn->t_seqno_ = curconn->highest_ack_ + 1;
00298 curconn->recover_ = curconn->maxseq_;
00299 curconn->last_cwnd_action_ = CWND_ACTION_TIMEOUT;
00300 }
00301 while ((curseg = seg_iter()) != NULL) {
00302
00303 curseg->size_ = 0;
00304 }
00305
00306 send_much(NULL, 0, TCP_REASON_TIMEOUT);
00307 }
00308 else
00309 printf("TcpSessionAgent::timeout(): ignoring unknown timer %d\n", tno);
00310 }
00311
00312 Segment*
00313 TcpSessionAgent::add_pkts(int size, int seqno, int sessionSeqno, int daddr,
00314 int dport, int sport, double ts, IntTcpAgent *sender)
00315 {
00316
00317
00318
00319
00320 if (!(rtx_timer_.status() == TIMER_PENDING) || seglist_.count() == 0)
00321 set_rtx_timer();
00322 last_seg_sent_ = CorresHost::add_pkts(size, seqno, sessionSeqno, daddr, dport, sport, ts, sender);
00323 return last_seg_sent_;
00324 }
00325
00326 void
00327 TcpSessionAgent::add_agent(IntTcpAgent *agent, int size, double winMult,
00328 int winInc, int ssthresh)
00329 {
00330 CorresHost::add_agent(agent,size,winMult,winInc,ssthresh);
00331 wtSum_ += agent->wt_;
00332 reset_dyn_weights();
00333 }
00334
00335 int
00336 TcpSessionAgent::window()
00337 {
00338 if (maxcwnd_ == 0)
00339 return (int(cwnd_));
00340 else
00341 return (int(min(cwnd_,maxcwnd_)));
00342 }
00343
00344 void
00345 TcpSessionAgent::set_weight(IntTcpAgent *tcp, int wt)
00346 {
00347 wtSum_ -= tcp->wt_;
00348 tcp->wt_ = wt;
00349 wtSum_ += tcp->wt_;
00350 }
00351
00352 void
00353 TcpSessionAgent::reset_dyn_weights()
00354 {
00355 IntTcpAgent *tcp;
00356 Islist_iter<IntTcpAgent> conn_iter(conns_);
00357
00358 while ((tcp = conn_iter()) != NULL)
00359 tcp->dynWt_ = tcp->wt_;
00360 dynWtSum_ = wtSum_;
00361 }
00362
00363 IntTcpAgent *
00364 TcpSessionAgent::who_to_snd(int how)
00365 {
00366 int i = 0;
00367 switch (how) {
00368
00369 case FINE_ROUND_ROBIN: {
00370 IntTcpAgent *next;
00371 int wtOK = 0;
00372
00373 if (dynWtSum_ == 0)
00374 reset_dyn_weights();
00375 do {
00376 wtOK = 0;
00377 if ((next = (*connIter_)()) == NULL) {
00378 connIter_->set_cur(connIter_->get_last());
00379 next = (*connIter_)();
00380 }
00381 i++;
00382 if (next && next->dynWt_>0) {
00383 next->dynWt_--;
00384 dynWtSum_--;
00385 wtOK = 1;
00386 }
00387 } while (next && (!next->data_left_to_send() || !wtOK)
00388 && (i < connIter_->count()));
00389 if (!next->data_left_to_send())
00390 next = NULL;
00391 return next;
00392 }
00393
00394 case COARSE_ROUND_ROBIN: {
00395 int maxConsecSegs;
00396 if (curConn_)
00397 maxConsecSegs = (window()*curConn_->wt_)/wtSum_;
00398 if (curConn_ && numConsecSegs_++ < maxConsecSegs &&
00399 curConn_->data_left_to_send())
00400 return curConn_;
00401 else {
00402 numConsecSegs_ = 0;
00403 curConn_ = who_to_snd(FINE_ROUND_ROBIN);
00404 if (curConn_)
00405 numConsecSegs_++;
00406 }
00407 return curConn_;
00408 }
00409 case RANDOM: {
00410 IntTcpAgent *next;
00411
00412 do {
00413 int foo = int(Random::uniform() * nActive_ + 1);
00414
00415 connIter_->set_cur(connIter_->get_last());
00416
00417 for (;foo > 0; foo--)
00418 (*connIter_)();
00419 next = (*connIter_)();
00420 } while (next && !next->data_left_to_send());
00421 return(next);
00422 }
00423 default:
00424 return NULL;
00425 }
00426 }
00427
00428 void
00429 TcpSessionAgent::send_much(IntTcpAgent* , int force, int reason)
00430 {
00431 int npackets = 0;
00432 Islist_iter<Segment> seg_iter(seglist_);
00433
00434 if (reason != TCP_REASON_TIMEOUT &&
00435 burstsnd_timer_.status() == TIMER_PENDING)
00436 return;
00437
00438 if ((seg_iter.count() == 0) && (last_send_time_ != -1) &&
00439 (Scheduler::instance().clock() - last_send_time_ >= t_rtxcur_)) {
00440 if (slow_start_restart_ && restart_bugfix_)
00441 slowdown(CLOSE_CWND_INIT);
00442 else if (slow_start_restart_)
00443 slowdown(CLOSE_CWND_RESTART|CLOSE_SSTHRESH_HALF);
00444 else if (fs_enable_) {
00445 if (cwnd_ < ssthresh_)
00446 cwnd_ = int(cwnd_/2);
00447 else
00448 cwnd_ -= 1;
00449 fs_startseq_ = sessionSeqno_ + 1;
00450 fs_endseq_ = sessionSeqno_ + window();
00451 fs_mode_ = 1;
00452 }
00453 }
00454
00455 while (ok_to_snd(size_)) {
00456 {
00457 IntTcpAgent *sender = who_to_snd(schedDisp_);
00458 if (sender) {
00459
00460
00461
00462
00463 if (fs_enable_ && fs_mode_ &&
00464 sessionSeqno_ == fs_startseq_)
00465 connWithPktBeforeFS_ = sender;
00466
00467
00468 if (sender->t_seqno_ < sender->maxseq_) {
00469 int i =
00470 findSessionSeqno(sender, sender->t_seqno_);
00471 removeSessionSeqno(i);
00472 sender->send_one(i);
00473 }
00474 else {
00475 sender->send_one(sessionSeqno_++);
00476 if (!rtt_active_) {
00477 rtt_active_ = 1;
00478 rtt_seg_ = last_seg_sent_;
00479 }
00480 }
00481 npackets++;
00482 }
00483 else
00484 break;
00485 }
00486 reason = 0;
00487 force = 0;
00488 if (maxburst_ && npackets == maxburst_) {
00489 if (ok_to_snd(size_))
00490 burstsnd_timer_.resched(t_exact_srtt_*maxburst_/window());
00491 break;
00492 }
00493 }
00494 if (npackets > 0)
00495 last_send_time_ = Scheduler::instance().clock();
00496 }
00497
00498 void
00499 TcpSessionAgent::recv(IntTcpAgent *agent, Packet *pkt, int amt_data_acked)
00500 {
00501 hdr_tcp *tcph = hdr_tcp::access(pkt);
00502
00503 if (hdr_flags::access(pkt)->ecnecho() && ecn_)
00504 quench(1, agent, tcph->seqno());
00505 clean_segs(size_, pkt, agent, sessionSeqno_,amt_data_acked);
00506
00507
00508 if (amt_data_acked > 0 && (tcph->seqno() >= agent->recover_ ||
00509 agent->last_cwnd_action_ != CWND_ACTION_DUPACK )
00510 && !dontIncrCwnd_) {
00511 int i = count_bytes_acked_ ? amt_data_acked:1;
00512 while (i-- > 0)
00513 opencwnd(size_,agent);
00514 }
00515 dontIncrCwnd_ = 0;
00516 if (amt_data_acked > 0) {
00517 if (fs_enable_ && fs_mode_ && connWithPktBeforeFS_ == agent)
00518 connWithPktBeforeFS_ = NULL;
00519 newack(pkt);
00520 }
00521 Packet::free(pkt);
00522 send_much(NULL,0,0);
00523 }
00524
00525 void
00526 TcpSessionAgent::setflags(Packet *pkt)
00527 {
00528 hdr_flags *hf = hdr_flags::access(pkt);
00529 if (ecn_)
00530 hf->ect() = 1;
00531 }
00532
00533 int
00534 TcpSessionAgent::findSessionSeqno(IntTcpAgent *sender, int seqno)
00535 {
00536 Islist_iter<Segment> seg_iter(seglist_);
00537 Segment *cur;
00538 int min = sessionSeqno_;
00539
00540 while ((cur = seg_iter()) != NULL) {
00541 if (sender == cur->sender_ && cur->seqno_ >= seqno &&
00542 cur->sessionSeqno_ < min)
00543 min = cur->sessionSeqno_;
00544 }
00545 if (min == sessionSeqno_) {
00546 printf("In TcpSessionAgent::findSessionSeqno: search unsuccessful\n");
00547 min = sessionSeqno_ - 1;
00548 }
00549 return (min);
00550 }
00551
00552
00553 void
00554 TcpSessionAgent::removeSessionSeqno(int sessionSeqno)
00555 {
00556 Islist_iter<Segment> seg_iter(seglist_);
00557 Segment *cur, *prev=NULL;
00558
00559 while ((cur = seg_iter()) != NULL) {
00560 if (cur->sessionSeqno_ == sessionSeqno) {
00561 seglist_.remove(cur, prev);
00562 adjust_ownd(cur->size_);
00563 return;
00564 }
00565 prev = cur;
00566 }
00567 printf("In removeSessionSeqno(): unable to find segment with sessionSeqno = %d\n", sessionSeqno);
00568 }
00569
00570 void
00571 TcpSessionAgent::quench(int how, IntTcpAgent *sender, int seqno)
00572 {
00573 int i = findSessionSeqno(sender,seqno);
00574
00575 if (i > recover_) {
00576 recover_ = sessionSeqno_ - 1;
00577 last_cwnd_action_ = CWND_ACTION_ECN;
00578 sender->recover_ = sender->maxseq_;
00579 sender->last_cwnd_action_ = CWND_ACTION_ECN;
00580 closecwnd(how,sender);
00581 }
00582 }
00583
00584 void
00585 TcpSessionAgent::traceVar(TracedVar* v)
00586 {
00587 double curtime;
00588 Scheduler& s = Scheduler::instance();
00589 char wrk[500];
00590 int n;
00591
00592 curtime = &s ? s.clock() : 0;
00593 if (!strcmp(v->name(), "ownd_") || !strcmp(v->name(), "owndCorr_")) {
00594 if (!strcmp(v->name(), "ownd_"))
00595 sprintf(wrk,"%-8.5f %-2d %-2d %-2d %-2d %s %-6.3f",
00596 curtime, addr(), port(), daddr(), dport(),
00597 v->name(), double(*((TracedDouble*) v)));
00598 else if (!strcmp(v->name(), "owndCorr_"))
00599 sprintf(wrk,"%-8.5f %-2d %-2d %-2d %-2d %s %d",
00600 curtime, addr(), port(), daddr(), dport(),
00601 v->name(), int(*((TracedInt*) v)));
00602 n = strlen(wrk);
00603 wrk[n] = '\n';
00604 wrk[n+1] = 0;
00605 if (channel_)
00606 (void)Tcl_Write(channel_, wrk, n+1);
00607 wrk[n] = 0;
00608 }
00609 else
00610 TcpAgent::traceVar(v);
00611 }
00612
00613
00614