00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <tclcl.h>
00034
00035 #include "empweb.h"
00036
00037
00038
00039
00040
00041
00042
00043
00044 class EmpWebPage : public TimerHandler {
00045 public:
00046 EmpWebPage(int id, EmpWebTrafSession* sess, int nObj, Node* dst, int svrId) :
00047 persistOption_(0), id_(id), sess_(sess), nObj_(nObj), curObj_(0), doneObj_(0), dst_(dst), svrId_(svrId) {}
00048 virtual ~EmpWebPage() {}
00049
00050 inline void start() {
00051
00052 status_ = TIMER_PENDING;
00053 handle(&event_);
00054 }
00055 inline int id() const { return id_; }
00056 inline int svrId() const { return svrId_; }
00057 Node* dst() { return dst_; }
00058
00059 void doneObject() {
00060 if (sess_->mgr()->isdebug())
00061 printf("doneObject: %g done=%d total=%d \n", Scheduler::instance().clock(), doneObj_, nObj_);
00062
00063 if (++doneObj_ >= nObj_) {
00064 printf("doneObject: %g %d %d \n", Scheduler::instance().clock(), doneObj_, nObj_);
00065 sess_->donePage((void*)this);
00066
00067
00068 } else if (persistOption_) {
00069 sched(sess_->interObj()->value());
00070 }
00071
00072 }
00073 inline int curObj() const { return curObj_; }
00074 inline int doneObj() const { return doneObj_; }
00075
00076 inline void set_persistOption(int opt) { persistOption_ = opt; }
00077 int persistOption_ ;
00078
00079 private:
00080 virtual void expire(Event* = 0) {
00081
00082 if (curObj_ >= nObj_)
00083 return;
00084 sess_->launchReq(this, LASTOBJ_++,
00085 (int)ceil(sess_->objSize()->value()),
00086 (int)ceil(sess_->reqSize()->value()), sess_->id(), persistOption_);
00087 if (sess_->mgr()->isdebug())
00088 printf("expire: Session %d launched page %d obj %d nObj %d \n",
00089 sess_->id(), id_, curObj_, nObj_);
00090 }
00091 virtual void handle(Event *e) {
00092 if (sess_->mgr()->isdebug())
00093 printf("handle: Session %d launched page %d obj %d\n",
00094 sess_->id(), id_, curObj_);
00095
00096 TimerHandler::handle(e);
00097 curObj_++;
00098 if (!persistOption_) {
00099 if (curObj_ < nObj_) sched(sess_->interObj()->value());
00100 }
00101 }
00102 int id_;
00103 EmpWebTrafSession* sess_;
00104 int nObj_, curObj_;
00105 int doneObj_;
00106 Node* dst_;
00107 int svrId_ ;
00108 static int LASTOBJ_;
00109 };
00110
00111 int EmpWebPage::LASTOBJ_ = 1;
00112
00113 int EmpWebTrafSession::LASTPAGE_ = 1;
00114
00115 int EmpWebTrafPool::LASTFLOW_ = 1;
00116
00117
00118 EmpWebTrafSession::~EmpWebTrafSession()
00119 {
00120 if (donePage_ != curPage_) {
00121 fprintf(stderr, "done pages %d != all pages %d\n",
00122 donePage_, curPage_);
00123 abort();
00124 }
00125 if (status_ != TIMER_IDLE) {
00126 fprintf(stderr, "EmpWebTrafSession must be idle when deleted.\n");
00127 abort();
00128 }
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147 }
00148
00149 void EmpWebTrafSession::donePage(void* ClntData)
00150 {
00151 EmpWebPage* pg = (EmpWebPage*)ClntData;
00152 if (mgr_->isdebug())
00153 printf("Session %d done page %d\n", id_, pg->id());
00154
00155 if (pg->doneObj() != pg->curObj()) {
00156 fprintf(stderr, "done objects %d != all objects %d\n",
00157 pg->doneObj(), pg->curObj());
00158 abort();
00159 }
00160
00161
00162
00163 if (pg->persistOption_) {
00164 if (!fulltcp_) {
00165
00166 mgr_->recycleTcp(ctcp_);
00167 mgr_->recycleTcp(stcp_);
00168 mgr_->recycleSink(csnk_);
00169 mgr_->recycleSink(ssnk_);
00170 } else {
00171 Tcl::instance().evalf("%s disconnect-full %s %s %s %s",
00172 mgr_->name(),
00173 src_->name(), pg->dst()->name(),
00174 ctcp_->name(), stcp_->name());
00175 }
00176 }
00177
00178 delete pg;
00179
00180
00181 if (++donePage_ >= nPage_) {
00182 mgr_->doneSession(id_);
00183 } else if (interPageOption_) {
00184 sched(rvInterPage_->value());
00185
00186 }
00187 }
00188
00189
00190 void EmpWebTrafSession::expire(Event *)
00191 {
00192
00193
00194 int n;
00195 if (clientIdx_ < mgr()->nClientL_) n = 0 ;
00196 else
00197 n = int(ceil(serverSel()->value()));
00198
00199 assert((n >= 0) && (n < mgr()->nSrc_));
00200 Node* dst = mgr()->server_[n];
00201
00202
00203
00204 EmpWebPage* pg = new EmpWebPage(LASTPAGE_++, this,
00205 (int)ceil(rvPageSize_->value()), dst, n);
00206
00207
00208 int opt = (int)ceil(this->persistSel()->value());
00209 pg->set_persistOption(opt);
00210
00211 if (mgr_->isdebug())
00212 printf("Session %d starting page %d, curpage %d \n",
00213 id_, LASTPAGE_-1, curPage_);
00214
00215 if (pg->persistOption_) {
00216
00217 mgr_->LASTFLOW_++;
00218
00219 int wins = int(ceil(serverWin()->value()));
00220 int winc = int(ceil(clientWin()->value()));
00221 int window = (wins >= winc) ? wins : winc;
00222
00223 int m = int(ceil(mtu()->value()));
00224
00225
00226 if (fulltcp_) {
00227 ctcp_ = mgr_->picktcp(window,m);
00228 stcp_ = mgr_->picktcp(window,m);
00229
00230 Tcl::instance().evalf("%s connect-full %s %s %s %s",
00231 mgr_->name(),
00232 src_->name(), pg->dst()->name(),
00233 ctcp_->name(), stcp_->name());
00234
00235 } else {
00236 ctcp_ = mgr_->picktcp(window,m);
00237 stcp_ = mgr_->picktcp(window,m);
00238 csnk_ = mgr_->picksink();
00239 ssnk_ = mgr_->picksink();
00240 }
00241
00242 Tcl::instance().evalf("%s set-fid %d %s %s", mgr_->name(), mgr_->LASTFLOW_-1, ctcp_->name(), stcp_->name());
00243
00244 }
00245
00246 pg->start();
00247 }
00248
00249 void EmpWebTrafSession::handle(Event *e)
00250 {
00251
00252 TimerHandler::handle(e);
00253 ++curPage_;
00254
00255
00256
00257
00258 if (!interPageOption_) {
00259 if (curPage_ < nPage_) {
00260 sched(rvInterPage_->value());
00261
00262 }
00263 }
00264 }
00265
00266
00267 void EmpWebTrafSession::launchReq(void* ClntData, int obj, int size, int reqSize, int sid, int persist)
00268 {
00269
00270 TcpAgent* ctcp;
00271 TcpAgent* stcp;
00272 TcpSink* csnk;
00273 TcpSink* ssnk;
00274
00275 EmpWebPage* pg = (EmpWebPage*)ClntData;
00276
00277 if (persist) {
00278 if (mgr_->isdebug()) {
00279 printf("HTTP1.1\n");
00280 }
00281
00282
00283 if (fulltcp_) {
00284 ctcp = ctcp_;
00285 stcp = stcp_;
00286 } else {
00287 ctcp = ctcp_;
00288 stcp = stcp_;
00289 csnk = csnk_;
00290 ssnk = ssnk_;
00291 }
00292
00293
00294 } else {
00295 if (mgr_->isdebug()) {
00296 printf("HTTP1.0\n");
00297 }
00298
00299 mgr_->LASTFLOW_++;
00300
00301 int wins = int(ceil(serverWin()->value()));
00302 int winc = int(ceil(clientWin()->value()));
00303 int window = (wins >= winc) ? wins : winc;
00304
00305 int m = int(ceil(mtu()->value()));
00306
00307
00308
00309 if (fulltcp_) {
00310 ctcp = mgr_->picktcp(window,m);
00311 stcp = mgr_->picktcp(window,m);
00312 } else {
00313 ctcp = mgr_->picktcp(window,m);
00314 stcp = mgr_->picktcp(window,m);
00315 csnk = mgr_->picksink();
00316 ssnk = mgr_->picksink();
00317 }
00318
00319 Tcl::instance().evalf("%s set-fid %d %s %s", mgr_->name(), mgr_->LASTFLOW_-1, ctcp->name(), stcp->name());
00320
00321 }
00322
00323
00324
00325
00326 if (fulltcp_) {
00327
00328 Tcl::instance().evalf("%s launch-req-full %d %d %s %s %s %s %d %d %d %d", mgr_->name(), obj, pg->id(),
00329 src_->name(), pg->dst()->name(),
00330 ctcp->name(),
00331 stcp->name(),
00332 size, reqSize, ClntData,persist);
00333
00334 } else {
00335
00336 Tcl::instance().evalf("%s launch-req %d %d %s %s %s %s %s %s %d %d %d %d", mgr_->name(), obj, pg->id(),
00337 src_->name(), pg->dst()->name(),
00338 ctcp->name(), csnk->name(),
00339 stcp->name(), ssnk->name(),
00340 size, reqSize, ClntData,
00341 persist);
00342 }
00343
00344
00345
00346 if (mgr_->isdebug()) {
00347 printf("size=%d obj=%d page=%d sess=%d %g src=%d dst=%d\n", size, obj, pg->id(), id_, Scheduler::instance().clock(), src_->address(), pg->dst()->address());
00348 }
00349 }
00350
00351
00352 static class EmpWebTrafPoolClass : public TclClass {
00353 public:
00354 EmpWebTrafPoolClass() : TclClass("PagePool/EmpWebTraf") {}
00355 TclObject* create(int, const char*const*) {
00356 return (new EmpWebTrafPool());
00357 }
00358 } class_empwebtrafpool;
00359
00360 EmpWebTrafPool::~EmpWebTrafPool()
00361 {
00362 if (session_ != NULL) {
00363 for (int i = 0; i < nSession_; i++)
00364 delete session_[i];
00365 delete []session_;
00366 }
00367 if (server_ != NULL)
00368 delete []server_;
00369 if (client_ != NULL)
00370 delete []client_;
00371
00372 }
00373
00374 void EmpWebTrafPool::delay_bind_init_all()
00375 {
00376 delay_bind_init_one("debug_");
00377 PagePool::delay_bind_init_all();
00378 }
00379
00380 int EmpWebTrafPool::delay_bind_dispatch(const char *varName,const char *localName,
00381 TclObject *tracer)
00382 {
00383 if (delay_bind_bool(varName, localName, "debug_", &debug_, tracer))
00384 return TCL_OK;
00385 return PagePool::delay_bind_dispatch(varName, localName, tracer);
00386 }
00387
00388 EmpWebTrafPool::EmpWebTrafPool() :
00389 concurrentSess_(0), nSrc_(0), server_(NULL), session_(NULL), nClient_(0), client_(NULL), nTcp_(0), nSink_(0), fulltcp_(0)
00390 {
00391 bind("fulltcp_", &fulltcp_);
00392
00393 LIST_INIT(&tcpPool_);
00394 LIST_INIT(&sinkPool_);
00395 }
00396
00397 TcpAgent* EmpWebTrafPool::picktcp(int win, int mtu)
00398 {
00399
00400
00401 TcpAgent* a = (TcpAgent*)detachHead(&tcpPool_);
00402 if (a == NULL) {
00403 Tcl& tcl = Tcl::instance();
00404 tcl.evalf("%s alloc-tcp %d %d", name(), win, mtu);
00405 a = (TcpAgent*)lookup_obj(tcl.result());
00406 if (a == NULL) {
00407 fprintf(stderr, "Failed to allocate a TCP agent\n");
00408 abort();
00409 }
00410 } else
00411 nTcp_--;
00412 return a;
00413 }
00414
00415 TcpSink* EmpWebTrafPool::picksink()
00416 {
00417 TcpSink* a = (TcpSink*)detachHead(&sinkPool_);
00418 if (a == NULL) {
00419 Tcl& tcl = Tcl::instance();
00420 tcl.evalf("%s alloc-tcp-sink", name());
00421 a = (TcpSink*)lookup_obj(tcl.result());
00422 if (a == NULL) {
00423 fprintf(stderr, "Failed to allocate a TCP sink\n");
00424 abort();
00425 }
00426 } else
00427 nSink_--;
00428 return a;
00429 }
00430
00431 void EmpWebTrafPool::recycleTcp(Agent* a)
00432 {
00433 if (fulltcp_) {
00434 delete a;
00435 } else {
00436
00437 if (a == NULL) {
00438 fprintf(stderr, "Failed to recycle TCP agent\n");
00439 abort();
00440 }
00441 nTcp_++;
00442 insertAgent(&tcpPool_, a);
00443 }
00444 }
00445
00446 void EmpWebTrafPool::recycleSink(Agent* a)
00447 {
00448 if (fulltcp_) {
00449 delete a;
00450 } else {
00451
00452 if (a == NULL) {
00453 fprintf(stderr, "Failed to recycle Sink agent\n");
00454 abort();
00455 }
00456 nSink_++;
00457 insertAgent(&sinkPool_, a);
00458 }
00459 }
00460
00461 int EmpWebTrafPool::command(int argc, const char*const* argv)
00462 {
00463 if (argc == 3) {
00464 if (strcmp(argv[1], "set-num-session") == 0) {
00465 if (session_ != NULL) {
00466 for (int i = 0; i < nSession_; i++)
00467 delete session_[i];
00468 delete []session_;
00469 }
00470 nSession_ = atoi(argv[2]);
00471 session_ = new EmpWebTrafSession*[nSession_];
00472 memset(session_, 0, sizeof(EmpWebTrafSession*)*nSession_);
00473 return (TCL_OK);
00474 } else if (strcmp(argv[1], "set-num-server-lan") == 0) {
00475 nSrcL_ = atoi(argv[2]);
00476 if (nSrcL_ > nSrc_) {
00477 fprintf(stderr, "Wrong server index %d\n", nSrcL_);
00478 return TCL_ERROR;
00479 }
00480 return (TCL_OK);
00481 } else if (strcmp(argv[1], "set-num-remote-client") == 0) {
00482 nClientL_ = atoi(argv[2]);
00483 if (nClientL_ > nClient_) {
00484 fprintf(stderr, "Wrong client index %d\n", nClientL_);
00485 return TCL_ERROR;
00486 }
00487 return (TCL_OK);
00488 } else if (strcmp(argv[1], "set-num-server") == 0) {
00489 nSrc_ = atoi(argv[2]);
00490 if (server_ != NULL)
00491 delete []server_;
00492 server_ = new Node*[nSrc_];
00493 return (TCL_OK);
00494 } else if (strcmp(argv[1], "set-num-client") == 0) {
00495 nClient_ = atoi(argv[2]);
00496 if (client_ != NULL)
00497 delete []client_;
00498 client_ = new Node*[nClient_];
00499 return (TCL_OK);
00500 } else if (strcmp(argv[1], "set-interPageOption") == 0) {
00501 int option = atoi(argv[2]);
00502 if (session_ != NULL) {
00503 for (int i = 0; i < nSession_; i++) {
00504 EmpWebTrafSession* p = session_[i];
00505 p->set_interPageOption(option);
00506 }
00507 }
00508 return (TCL_OK);
00509 } else if (strcmp(argv[1], "doneObj") == 0) {
00510 EmpWebPage* p = (EmpWebPage*)atoi(argv[2]);
00511
00512 p->doneObject();
00513
00514 return (TCL_OK);
00515 }
00516 } else if (argc == 4) {
00517 if (strcmp(argv[1], "set-server") == 0) {
00518 Node* cli = (Node*)lookup_obj(argv[3]);
00519 if (cli == NULL)
00520 return (TCL_ERROR);
00521 int nc = atoi(argv[2]);
00522 if (nc >= nSrc_) {
00523 fprintf(stderr, "Wrong server index %d\n", nc);
00524 return TCL_ERROR;
00525 }
00526 server_[nc] = cli;
00527 return (TCL_OK);
00528 } else if (strcmp(argv[1], "set-client") == 0) {
00529 Node* s = (Node*)lookup_obj(argv[3]);
00530 if (s == NULL)
00531 return (TCL_ERROR);
00532 int n = atoi(argv[2]);
00533 if (n >= nClient_) {
00534 fprintf(stderr, "Wrong client index %d\n", n);
00535 return TCL_ERROR;
00536 }
00537 client_[n] = s;
00538 return (TCL_OK);
00539 } else if (strcmp(argv[1], "recycle") == 0) {
00540
00541
00542
00543 Agent* tcp = (Agent*)lookup_obj(argv[2]);
00544 Agent* snk = (Agent*)lookup_obj(argv[3]);
00545 if ((tcp == NULL) || (snk == NULL))
00546 return (TCL_ERROR);
00547
00548 if (fulltcp_) {
00549 delete tcp;
00550 delete snk;
00551 } else {
00552 nTcp_++, nSink_++;
00553 insertAgent(&tcpPool_, tcp);
00554 insertAgent(&sinkPool_, snk);
00555 }
00556 return (TCL_OK);
00557 }
00558 } else if (argc == 16) {
00559 if (strcmp(argv[1], "create-session") == 0) {
00560
00561
00562
00563
00564
00565
00566
00567 int n = atoi(argv[2]);
00568 if ((n < 0)||(n >= nSession_)||(session_[n] != NULL)) {
00569 fprintf(stderr,"Invalid session index %d\n",n);
00570 return (TCL_ERROR);
00571 }
00572 int npg = (int)strtod(argv[3], NULL);
00573 double lt = strtod(argv[4], NULL);
00574
00575 int flip = atoi(argv[15]);
00576 if ((flip < 0)||(flip > 1)) {
00577 fprintf(stderr,"Invalid I/O flag %d\n",flip);
00578 return (TCL_ERROR);
00579 }
00580
00581 int cl;
00582 if (flip == 1)
00583 cl = int(floor(Random::uniform(0, nClientL_)));
00584 else
00585 cl = int(floor(Random::uniform(nClientL_, nClient_)));
00586
00587 assert((cl >= 0) && (cl < nClient_));
00588 Node* c=client_[cl];
00589
00590 EmpWebTrafSession* p =
00591 new EmpWebTrafSession(this, c, npg, n, nSrc_, cl,fulltcp_);
00592
00593 int res = lookup_rv(p->interPage(), argv[5]);
00594 res = (res == TCL_OK) ?
00595 lookup_rv(p->pageSize(), argv[6]) : TCL_ERROR;
00596 res = (res == TCL_OK) ?
00597 lookup_rv(p->interObj(), argv[7]) : TCL_ERROR;
00598 res = (res == TCL_OK) ?
00599 lookup_rv(p->objSize(), argv[8]) : TCL_ERROR;
00600 res = (res == TCL_OK) ?
00601 lookup_rv(p->reqSize(), argv[9]) : TCL_ERROR;
00602 res = (res == TCL_OK) ?
00603 lookup_rv(p->persistSel(), argv[10]) : TCL_ERROR;
00604 res = (res == TCL_OK) ?
00605 lookup_rv(p->serverSel(), argv[11]) : TCL_ERROR;
00606 res = (res == TCL_OK) ?
00607 lookup_rv(p->serverWin(), argv[12]) : TCL_ERROR;
00608 res = (res == TCL_OK) ?
00609 lookup_rv(p->clientWin(), argv[13]) : TCL_ERROR;
00610 res = (res == TCL_OK) ?
00611 lookup_rv(p->mtu(), argv[14]) : TCL_ERROR;
00612 if (res == TCL_ERROR) {
00613 delete p;
00614 fprintf(stderr, "Invalid random variable\n");
00615 return (TCL_ERROR);
00616 }
00617 p->sched(lt);
00618 session_[n] = p;
00619
00620
00621 return (TCL_OK);
00622 }
00623 }
00624 return PagePool::command(argc, argv);
00625 }
00626
00627