00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "config.h"
00032 #include <tclcl.h>
00033 #include <iostream>
00034
00035 #include "node.h"
00036 #include "pagepool.h"
00037 #include "webtraf.h"
00038
00039
00040
00041
00042
00043
00044
00045 class WebPage : public TimerHandler {
00046 public:
00047 WebPage(int id, WebTrafSession* sess, int nObj, Node* dst) :
00048 id_(id), sess_(sess), nObj_(nObj), curObj_(0), doneObj_(0),
00049 dst_(dst) {}
00050 virtual ~WebPage() {}
00051
00052 inline void start() {
00053
00054 status_ = TIMER_PENDING;
00055 handle(&event_);
00056 }
00057 inline int id() const { return id_; }
00058 Node* dst() { return dst_; }
00059
00060 void doneObject() {
00061 if (++doneObj_ >= nObj_) {
00062
00063 sess_->donePage((void*)this);
00064 }
00065 }
00066 inline int curObj() const { return curObj_; }
00067 inline int doneObj() const { return doneObj_; }
00068
00069 private:
00070 virtual void expire(Event* = 0) {
00071
00072 if (curObj_ >= nObj_)
00073 return;
00074 sess_->launchReq(this, LASTOBJ_++,
00075 (int)ceil(sess_->objSize()->value()));
00076 if (sess_->mgr()->isdebug())
00077 printf("Session %d launched page %d obj %d\n",
00078 sess_->id(), id_, curObj_);
00079 }
00080 virtual void handle(Event *e) {
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091 if (curObj_ < nObj_) {
00092
00093
00094 TimerHandler::handle(e);
00095 curObj_++;
00096
00097
00098
00099
00100
00101
00102 if (curObj_ < nObj_) sched(sess_->interObj()->value());
00103 }
00104 }
00105 int id_;
00106 WebTrafSession* sess_;
00107 int nObj_, curObj_, doneObj_;
00108 Node* dst_;
00109 static int LASTOBJ_;
00110 };
00111
00112 int WebPage::LASTOBJ_ = 1;
00113
00114 int WebTrafSession::LASTPAGE_ = 1;
00115
00116
00117 WebTrafSession::WebTrafSession(WebTrafPool *mgr, Node *src, int np, int id, int ftcp_, int recycle_p) :
00118 rvInterPage_(NULL), rvPageSize_(NULL),
00119 rvInterObj_(NULL), rvObjSize_(NULL),
00120 mgr_(mgr), src_(src), nPage_(np), curPage_(0), donePage_(0),
00121 id_(id), interPageOption_(1), fulltcp_(0) {
00122 fulltcp_ = ftcp_;
00123 recycle_page_ = recycle_p;
00124 }
00125
00126
00127 WebTrafSession::~WebTrafSession()
00128 {
00129 if (donePage_ != curPage_) {
00130 fprintf(stderr, "done pages %d != all pages %d\n",
00131 donePage_, curPage_);
00132 abort();
00133 }
00134 if (status_ != TIMER_IDLE) {
00135 fprintf(stderr, "WebTrafSession must be idle when deleted.\n");
00136 abort();
00137 }
00138
00139
00140
00141 if (recycle_page_) {
00142 if (rvInterPage_ != NULL)
00143 Tcl::instance().evalf("delete %s", rvInterPage_->name());
00144 if (rvPageSize_ != NULL)
00145 Tcl::instance().evalf("delete %s", rvPageSize_->name());
00146 if (rvInterObj_ != NULL)
00147 Tcl::instance().evalf("delete %s", rvInterObj_->name());
00148 if (rvObjSize_ != NULL)
00149 Tcl::instance().evalf("delete %s", rvObjSize_->name());
00150 }
00151 }
00152
00153 void WebTrafSession::donePage(void* ClntData)
00154 {
00155 WebPage* pg = (WebPage*)ClntData;
00156 if (mgr_->isdebug())
00157 printf("Session %d done page %d\n", id_, pg->id());
00158 if (pg->doneObj() != pg->curObj()) {
00159 fprintf(stderr, "done objects %d != all objects %d\n",
00160 pg->doneObj(), pg->curObj());
00161 abort();
00162 }
00163 delete pg;
00164
00165
00166 if (++donePage_ >= nPage_)
00167 mgr_->doneSession(id_);
00168 else if (interPageOption_) {
00169
00170
00171
00172 sched(rvInterPage_->value());
00173
00174 }
00175 }
00176
00177
00178 void WebTrafSession::expire(Event *)
00179 {
00180
00181 Node* dst = mgr_->pickdst();
00182
00183 WebPage* pg = new WebPage(LASTPAGE_++, this,
00184 (int)ceil(rvPageSize_->value()), dst);
00185 if (mgr_->isdebug())
00186 printf("Session %d starting page %d, curpage %d\n",
00187 id_, LASTPAGE_-1, curPage_);
00188 pg->start();
00189 }
00190
00191 void WebTrafSession::handle(Event *e)
00192 {
00193
00194 TimerHandler::handle(e);
00195 ++curPage_;
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205 if (!interPageOption_) {
00206 if (curPage_ < nPage_) {
00207 sched(rvInterPage_->value());
00208
00209 }
00210 }
00211 }
00212
00213
00214 void WebTrafSession::launchReq(void* ClntData, int obj, int size) {
00215 mgr_->launchReq(src_, ClntData, obj, size);
00216 }
00217
00218 static class WebTrafPoolClass : public TclClass {
00219 public:
00220 WebTrafPoolClass() : TclClass("PagePool/WebTraf") {
00221
00222 }
00223 TclObject* create(int, const char*const*) {
00224 return (new WebTrafPool());
00225 }
00226 } class_webtrafpool;
00227
00228 WebTrafPool::~WebTrafPool()
00229 {
00230 if (session_ != NULL) {
00231 for (int i = 0; i < nSession_; i++)
00232 delete session_[i];
00233 delete []session_;
00234 }
00235 if (server_ != NULL)
00236 delete []server_;
00237 if (client_ != NULL)
00238 delete []client_;
00239
00240 }
00241
00242 void WebTrafPool::delay_bind_init_all()
00243 {
00244 delay_bind_init_one("debug_");
00245 PagePool::delay_bind_init_all();
00246 }
00247
00248 int WebTrafPool::delay_bind_dispatch(const char *varName,const char *localName,
00249 TclObject *tracer)
00250 {
00251 if (delay_bind_bool(varName, localName, "debug_", &debug_, tracer))
00252 return TCL_OK;
00253 return PagePool::delay_bind_dispatch(varName, localName, tracer);
00254 }
00255
00256
00257 WebTrafPool::WebTrafPool() :
00258 session_(NULL), nServer_(0), server_(NULL), nClient_(0), client_(NULL),
00259 nTcp_(0), nSink_(0), fulltcp_(0), recycle_page_(0)
00260 {
00261 bind("fulltcp_", &fulltcp_);
00262 bind("recycle_page_", &recycle_page_);
00263
00264 asimflag_=0;
00265 LIST_INIT(&tcpPool_);
00266 LIST_INIT(&sinkPool_);
00267 dbTcp_a = dbTcp_r = dbTcp_cr = 0;
00268 }
00269
00270 TcpAgent* WebTrafPool::picktcp()
00271 {
00272 TcpAgent* a = (TcpAgent*)detachHead(&tcpPool_);
00273 if (a == NULL) {
00274 Tcl& tcl = Tcl::instance();
00275 tcl.evalf("%s alloc-tcp", name());
00276 a = (TcpAgent*)lookup_obj(tcl.result());
00277 if (a == NULL) {
00278 fprintf(stderr, "Failed to allocate a TCP agent\n");
00279 abort();
00280 }
00281 } else
00282 nTcp_--;
00283
00284 return a;
00285 }
00286
00287 TcpSink* WebTrafPool::picksink()
00288 {
00289 TcpSink* a = (TcpSink*)detachHead(&sinkPool_);
00290 if (a == NULL) {
00291 Tcl& tcl = Tcl::instance();
00292 tcl.evalf("%s alloc-tcp-sink", name());
00293 a = (TcpSink*)lookup_obj(tcl.result());
00294 if (a == NULL) {
00295 fprintf(stderr, "Failed to allocate a TCP sink\n");
00296 abort();
00297 }
00298 } else
00299 nSink_--;
00300 return a;
00301 }
00302
00303 Node* WebTrafPool::picksrc() {
00304 int n = int(floor(Random::uniform(0, nClient_)));
00305 assert((n >= 0) && (n < nClient_));
00306 return client_[n];
00307 }
00308
00309 Node* WebTrafPool::pickdst() {
00310 int n = int(floor(Random::uniform(0, nServer_)));
00311 assert((n >= 0) && (n < nServer_));
00312 return(server_[n].get_node());
00313 }
00314
00315
00316 void WebTrafPool::pick_ep(TcpAgent** tcp, Agent** snk) {
00317
00318 *tcp = picktcp();
00319
00320
00321 if (fulltcp_) {
00322 *snk = picktcp();
00323 } else {
00324 *snk = picksink();
00325 }
00326 }
00327
00328
00329 void WebTrafPool::launchReq(Node *src_, void* ClntData, int obj, int size) {
00330 TcpAgent *ctcp;
00331 Agent *csnk;
00332
00333
00334 pick_ep(&ctcp, &csnk);
00335
00336 WebPage* pg = (WebPage*)ClntData;
00337
00338
00339 Tcl::instance().evalf("%s launch-req %d %d %s %s %s %s %d %d",
00340 name(), obj, pg->id(),
00341 src_->name(), pg->dst()->name(),
00342 ctcp->name(), csnk->name(), size, ClntData);
00343
00344
00345
00346 #if 0
00347 printf("%d \t %d \t %d \t %d \t %g %d %d\n", size, obj, pg->id(), id_,
00348 Scheduler::instance().clock(),
00349 src_->address(), pg->dst()->address());
00350 printf("** Tcp agents %d, Tcp sinks %d\n", nTcp(),nSink());
00351 #endif
00352 }
00353
00354
00355 void WebTrafPool::launchResp(int obj_id, Node *svr_, Node *clnt_, Agent *tcp, Agent* snk, int size, void *ClntData) {
00356 int pid;
00357 pid = obj_id;
00358
00359
00360 if (ClntData) {
00361 WebPage* pg = (WebPage*)ClntData;
00362 pid = pg->id();
00363 }
00364
00365
00366 Tcl::instance().evalf("%s launch-resp %d %d %s %s %s %s %d %d",
00367 name(), obj_id, pid, svr_->name(), clnt_->name(),
00368 tcp->name(), snk->name(), size, ClntData);
00369
00370
00371
00372 #if 0
00373 printf("%d \t %d \t %d \t %d \t %g %d %d\n", size, obj, pg->id(), id_,
00374 Scheduler::instance().clock(),
00375 src_->address(), pg->dst()->address());
00376 printf("** Tcp agents %d, Tcp sinks %d\n", nTcp(),nSink());
00377 #endif
00378 }
00379
00380
00381 int WebTrafPool::find_server(int sid) {
00382 int n = 0;
00383 while (server_[n].get_nid() != sid && n < nServer_) {
00384 n++;
00385 }
00386
00387 return(n);
00388 }
00389
00390 int WebTrafPool::command(int argc, const char*const* argv) {
00391
00392
00393 if (argc == 2){
00394 if (strcmp(argv[1], "use-asim") == 0) {
00395 asimflag_ = 1;
00396
00397 return (TCL_OK);
00398 }
00399 }
00400 else if (argc == 3) {
00401 if (strcmp(argv[1], "set-num-session") == 0) {
00402 if (session_ != NULL) {
00403 for (int i = 0; i < nSession_; i++)
00404 delete session_[i];
00405 delete []session_;
00406 }
00407 nSession_ = atoi(argv[2]);
00408 session_ = new WebTrafSession*[nSession_];
00409 memset(session_, 0, sizeof(WebTrafSession*)*nSession_);
00410 return (TCL_OK);
00411 } else if (strcmp(argv[1], "set-num-server") == 0) {
00412 nServer_ = atoi(argv[2]);
00413 if (server_ != NULL)
00414 delete []server_;
00415 server_ = new WebServer[nServer_](this);
00416
00417 return (TCL_OK);
00418 } else if (strcmp(argv[1], "set-num-client") == 0) {
00419 nClient_ = atoi(argv[2]);
00420 if (client_ != NULL)
00421 delete []client_;
00422 client_ = new Node*[nClient_];
00423 return (TCL_OK);
00424 } else if (strcmp(argv[1], "set-interPageOption") == 0) {
00425 int option = atoi(argv[2]);
00426 if (session_ != NULL) {
00427 for (int i = 0; i < nSession_; i++) {
00428 WebTrafSession* p = session_[i];
00429 p->set_interPageOption(option);
00430 }
00431 }
00432 return (TCL_OK);
00433 } else if (strcmp(argv[1], "doneObj") == 0) {
00434 WebPage* p = (WebPage*)atoi(argv[2]);
00435
00436 p->doneObject();
00437 return (TCL_OK);
00438 } else if (strcmp(argv[1], "set-server-mode") == 0) {
00439
00440 int mode = atoi(argv[2]);
00441 for (int n = 0; n < nServer_; n++) {
00442 server_[n].set_mode(mode);
00443
00444 }
00445
00446 return (TCL_OK);
00447 } else if (strcmp(argv[1], "set-server-rate") == 0) {
00448
00449 int rate = atoi(argv[2]);
00450
00451 for (int n = 0; n < nServer_; n++) {
00452 server_[n].set_rate(rate);
00453 }
00454
00455 return (TCL_OK);
00456 } else if (strcmp(argv[1], "set-server-qlimit") == 0) {
00457
00458 int qlimit = atoi(argv[2]);
00459
00460 for (int n = 0; n < nServer_; n++) {
00461 server_[n].set_queue_limit(qlimit);
00462 }
00463
00464 return (TCL_OK);
00465 }
00466 } else if (argc == 4) {
00467 if (strcmp(argv[1], "set-server") == 0) {
00468 Node* s = (Node*)lookup_obj(argv[3]);
00469 if (s == NULL)
00470 return (TCL_ERROR);
00471 int n = atoi(argv[2]);
00472 if (n >= nServer_) {
00473 fprintf(stderr, "Wrong server index %d\n", n);
00474 return TCL_ERROR;
00475 }
00476 server_[n].set_node(s);
00477 return (TCL_OK);
00478 } else if (strcmp(argv[1], "set-client") == 0) {
00479 Node* c = (Node*)lookup_obj(argv[3]);
00480 if (c == NULL)
00481 return (TCL_ERROR);
00482 int n = atoi(argv[2]);
00483 if (n >= nClient_) {
00484 fprintf(stderr, "Wrong client index %d\n", n);
00485 return TCL_ERROR;
00486 }
00487 client_[n] = c;
00488 return (TCL_OK);
00489 } else if (strcmp(argv[1], "recycle") == 0) {
00490
00491
00492
00493 Agent* tcp = (Agent*)lookup_obj(argv[2]);
00494 Agent* snk = (Agent*)lookup_obj(argv[3]);
00495
00496 if ((tcp == NULL) || (snk == NULL))
00497 return (TCL_ERROR);
00498
00499 if (fulltcp_) {
00500 delete tcp;
00501 delete snk;
00502 } else {
00503
00504 nTcp_++;
00505
00506 insertAgent(&tcpPool_, tcp);
00507 nSink_++;
00508 insertAgent(&sinkPool_, snk);
00509
00510 }
00511
00512 return (TCL_OK);
00513 } else if (strcmp(argv[1], "set-server-rate") == 0) {
00514
00515 int sid = atoi(argv[2]);
00516 int rate = atoi(argv[3]);
00517
00518 int n = find_server(sid);
00519 if (n >= nServer_)
00520 return (TCL_ERROR);
00521
00522 server_[n].set_rate(rate);
00523 return (TCL_OK);
00524 } else if (strcmp(argv[1], "set-server-mode") == 0) {
00525
00526 int sid = atoi(argv[2]);
00527 int mode = atoi(argv[3]);
00528
00529 int n = find_server(sid);
00530 if (n >= nServer_)
00531 return (TCL_ERROR);
00532
00533 server_[n].set_mode(mode);
00534 return (TCL_OK);
00535 } else if (strcmp(argv[1], "set-server-qlimit") == 0) {
00536
00537 int sid = atoi(argv[2]);
00538 int qlimit = atoi(argv[3]);
00539
00540 int n = find_server(sid);
00541 if (n >= nServer_)
00542 return (TCL_ERROR);
00543
00544 server_[n].set_queue_limit(qlimit);
00545 return (TCL_OK);
00546 }
00547 } else if (argc == 9) {
00548 if (strcmp(argv[1], "create-session") == 0) {
00549
00550
00551
00552
00553 int n = atoi(argv[2]);
00554 if ((n < 0)||(n >= nSession_)||(session_[n] != NULL)) {
00555 fprintf(stderr,"Invalid session index %d\n",n);
00556 return (TCL_ERROR);
00557 }
00558 int npg = (int)strtod(argv[3], NULL);
00559 double lt = strtod(argv[4], NULL);
00560 WebTrafSession* p =
00561 new WebTrafSession(this, picksrc(), npg, n, fulltcp_, recycle_page_);
00562 int res = lookup_rv(p->interPage(), argv[5]);
00563 res = (res == TCL_OK) ?
00564 lookup_rv(p->pageSize(), argv[6]) : TCL_ERROR;
00565 res = (res == TCL_OK) ?
00566 lookup_rv(p->interObj(), argv[7]) : TCL_ERROR;
00567 res = (res == TCL_OK) ?
00568 lookup_rv(p->objSize(), argv[8]) : TCL_ERROR;
00569 if (res == TCL_ERROR) {
00570 delete p;
00571 fprintf(stderr, "Invalid random variable\n");
00572 return (TCL_ERROR);
00573 }
00574 p->sched(lt);
00575 session_[n] = p;
00576
00577
00578 if(asimflag_){
00579
00580
00581 double lambda = (1/(p->interPage())->avg())/(nServer_*nClient_);
00582 double mu = ((p->objSize())->value());
00583
00584 for (int i=0; i<nServer_; i++){
00585 for(int j=0; j<nClient_; j++){
00586
00587 Tcl::instance().evalf("%s add2asim %d %d %lf %lf", this->name(),server_[i].get_nid(),client_[j]->nodeid(),lambda, mu);
00588 }
00589 }
00590
00591 }
00592
00593 return (TCL_OK);
00594 } else if (strcmp(argv[1], "job_arrival") == 0) {
00595
00596 int obj_id = atoi(argv[2]);
00597 Node* clnt_ = (Node*)lookup_obj(argv[3]);
00598 Node* svr_ = (Node*)lookup_obj(argv[4]);
00599
00600 Agent* tcp = (Agent*)lookup_obj(argv[5]);
00601 Agent* snk = (Agent*)lookup_obj(argv[6]);
00602 int size = atoi(argv[7]);
00603 void* data = (void *)atoi(argv[8]);
00604
00605 int sid = svr_->nodeid();
00606 int n = find_server(sid);
00607 if (n >= nServer_)
00608 return (TCL_ERROR);
00609
00610 double delay = server_[n].job_arrival(obj_id, clnt_, tcp, snk, size, data);
00611
00612 Tcl::instance().resultf("%f", delay);
00613 return (TCL_OK);
00614 }
00615 }
00616 return PagePool::command(argc, argv);
00617 }
00618