00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <stdlib.h>
00025 #include <assert.h>
00026 #include <string.h>
00027 #include <stdarg.h>
00028
00029 #include "tclcl.h"
00030 #include "agent.h"
00031 #include "app.h"
00032 #include "tcp-simple.h"
00033 #include "http.h"
00034 #include "http-aux.h"
00035 #include "trace.h"
00036 #include "tcpapp.h"
00037 #include "mcache.h"
00038
00039
00040
00041
00042
00043
00044 static class HttpAppClass : public TclClass {
00045 public:
00046 HttpAppClass() : TclClass("Http") {}
00047 TclObject* create(int, const char*const*) {
00048 return (new HttpApp());
00049 }
00050 } class_http_app;
00051
00052
00053 HttpApp::HttpApp() : log_(0)
00054 {
00055 bind("id_", &id_);
00056
00057 tpa_ = new Tcl_HashTable;
00058 Tcl_InitHashTable(tpa_, TCL_ONE_WORD_KEYS);
00059 }
00060
00061 HttpApp::~HttpApp()
00062 {
00063 if (tpa_ != NULL) {
00064 Tcl_DeleteHashTable(tpa_);
00065 delete tpa_;
00066 }
00067 }
00068
00069 int HttpApp::add_cnc(HttpApp* client, TcpApp *agt)
00070 {
00071 int newEntry = 1;
00072 Tcl_HashEntry *he = Tcl_CreateHashEntry(tpa_,
00073 (const char *)client->id(),
00074 &newEntry);
00075 if (he == NULL)
00076 return -1;
00077 if (newEntry)
00078 Tcl_SetHashValue(he, (ClientData)agt);
00079 return 0;
00080 }
00081
00082 void HttpApp::delete_cnc(HttpApp* client)
00083 {
00084 Tcl_HashEntry *he = Tcl_FindHashEntry(tpa_,(const char *)client->id());
00085 if (he != NULL) {
00086 TcpApp *cnc = (TcpApp *)Tcl_GetHashValue(he);
00087 Tcl_DeleteHashEntry(he);
00088 delete cnc;
00089 }
00090 }
00091
00092 TcpApp* HttpApp::lookup_cnc(HttpApp* client)
00093 {
00094 Tcl_HashEntry *he =
00095 Tcl_FindHashEntry(tpa_, (const char *)client->id());
00096 if (he == NULL)
00097 return NULL;
00098 return (TcpApp *)Tcl_GetHashValue(he);
00099 }
00100
00101
00102 int HttpApp::command(int argc, const char*const* argv)
00103 {
00104 Tcl& tcl = Tcl::instance();
00105
00106 if (argc == 2) {
00107 if (strcmp(argv[1], "id") == 0) {
00108 if (argc == 3) {
00109 id_ = atoi(argv[2]);
00110 tcl.resultf("%d", id_);
00111 } else
00112 tcl.resultf("%d", id_);
00113 return TCL_OK;
00114 } else if (strcmp(argv[1], "log") == 0) {
00115
00116 if (log_ != NULL)
00117 tcl.resultf("%s", Tcl_GetChannelName(log_));
00118 else
00119 tcl.result("");
00120 return TCL_OK;
00121 }
00122 } else if (argc == 3) {
00123 if (strcmp(argv[1], "get-modtime") == 0) {
00124 double mt;
00125 if (pool_->get_mtime(argv[2], mt) != -1) {
00126 tcl.resultf("%.17g", mt);
00127 return TCL_OK;
00128 } else
00129 return TCL_ERROR;
00130 } else if (strcmp(argv[1], "exist-page") == 0) {
00131 tcl.resultf("%d", pool_->exist_page(argv[2]));
00132 return TCL_OK;
00133 } else if (strcmp(argv[1], "get-size") == 0) {
00134 int size;
00135 if (pool_->get_size(argv[2], size) != -1) {
00136 tcl.resultf("%d", size);
00137 return TCL_OK;
00138 } else
00139 return TCL_ERROR;
00140 } else if (strcmp(argv[1], "get-age") == 0) {
00141 double age;
00142 if (pool_->get_age(argv[2], age) != -1) {
00143 tcl.resultf("%.17g", age);
00144 return TCL_OK;
00145 } else
00146 return TCL_ERROR;
00147 } else if (strcmp(argv[1], "get-cachetime") == 0) {
00148 double et;
00149 if (pool_->get_etime(argv[2], et) != -1) {
00150 tcl.resultf("%.17g", et);
00151 return TCL_OK;
00152 } else
00153 return TCL_ERROR;
00154 } else if (strcmp(argv[1], "get-page") == 0) {
00155 char buf[4096];
00156 if (pool_->get_pageinfo(argv[2], buf) != -1) {
00157 tcl.resultf("%s", buf);
00158 return TCL_OK;
00159 } else
00160 return TCL_ERROR;
00161 } else if (strcmp(argv[1], "get-cnc") == 0) {
00162
00163
00164
00165
00166
00167
00168 HttpApp *client =
00169 (HttpApp *)TclObject::lookup(argv[2]);
00170 TcpApp *cnc = (TcpApp *)lookup_cnc(client);
00171 if (cnc == NULL)
00172 tcl.result("");
00173 else
00174 tcl.resultf("%s", cnc->name());
00175 return TCL_OK;
00176
00177 } else if (strcmp(argv[1], "set-pagepool") == 0) {
00178 pool_ = (ClientPagePool*)TclObject::lookup(argv[2]);
00179 if (pool_ != NULL)
00180 return TCL_OK;
00181 else
00182 return TCL_ERROR;
00183 } else if (strcmp(argv[1], "is-connected") == 0) {
00184
00185
00186
00187 HttpApp *a = (HttpApp*)TclObject::lookup(argv[2]);
00188 TcpApp *cnc = (TcpApp*)lookup_cnc(a);
00189 if (cnc == NULL)
00190 tcl.result("0");
00191 else
00192 tcl.result("1");
00193 return TCL_OK;
00194 } else if (strcmp(argv[1], "is-valid") == 0) {
00195 ClientPage *pg =
00196 (ClientPage *)pool_->get_page(argv[2]);
00197 if (pg == NULL) {
00198 tcl.resultf("%d is-valid: No page %s",
00199 id_, argv[2]);
00200 return TCL_ERROR;
00201 }
00202 tcl.resultf("%d", pg->is_valid());
00203 return TCL_OK;
00204 } else if (strcmp(argv[1], "log") == 0) {
00205 int mode;
00206 log_ = Tcl_GetChannel(tcl.interp(),
00207 (char*)argv[2], &mode);
00208 if (log_ == 0) {
00209 tcl.resultf("%d: invalid log file handle %s\n",
00210 id_, argv[2]);
00211 return TCL_ERROR;
00212 }
00213 return TCL_OK;
00214 } else if (strcmp(argv[1], "disconnect") == 0) {
00215
00216
00217
00218
00219 HttpApp *client =
00220 (HttpApp *)TclObject::lookup(argv[2]);
00221 delete_cnc(client);
00222 return TCL_OK;
00223 } else if (strcmp(argv[1], "get-pagetype") == 0) {
00224
00225
00226
00227
00228 ClientPage *pg =
00229 (ClientPage*)pool_->get_page(argv[2]);
00230 if (pg == NULL) {
00231 tcl.resultf("%d get-pagetype: No page %s",
00232 id_, argv[2]);
00233 return TCL_ERROR;
00234 }
00235 switch (pg->type()) {
00236 case HTML:
00237 tcl.result("HTML");
00238 break;
00239 case MEDIA:
00240 tcl.result("MEDIA");
00241 break;
00242 default:
00243 fprintf(stderr, "Unknown page type %d",
00244 pg->type());
00245 return TCL_ERROR;
00246 }
00247 return TCL_OK;
00248 } else if (strcmp(argv[1], "get-layer") == 0) {
00249
00250 MediaPage *pg = (MediaPage *)pool_->get_page(argv[2]);
00251 if (pg == NULL) {
00252 tcl.resultf("%d get-layer: No page %s",
00253 id_, argv[2]);
00254 return TCL_ERROR;
00255 }
00256 if (pg->type() != MEDIA) {
00257 tcl.resultf("%d get-layer %s not a media page",
00258 id_, argv[2]);
00259 return TCL_ERROR;
00260 }
00261 tcl.resultf("%d", pg->num_layer());
00262 return TCL_OK;
00263 }
00264 } else if (argc == 4) {
00265 if (strcmp(argv[1], "connect") == 0) {
00266
00267
00268
00269
00270
00271
00272
00273
00274 HttpApp *client =
00275 (HttpApp *)TclObject::lookup(argv[2]);
00276 TcpApp *cnc = (TcpApp *)TclObject::lookup(argv[3]);
00277 if (add_cnc(client, cnc)) {
00278 tcl.resultf("%s: failed to connect to %s",
00279 name_, argv[2]);
00280 return TCL_ERROR;
00281 }
00282
00283 cnc->target() = (Process*)this;
00284 return TCL_OK;
00285 } else if (strcmp(argv[1], "set-modtime") == 0) {
00286 double mt = strtod(argv[3], NULL);
00287 if (pool_->set_mtime(argv[2], mt) != -1)
00288 return TCL_OK;
00289 else
00290 return TCL_ERROR;
00291 } else if (strcmp(argv[1], "set-cachetime") == 0) {
00292 double et = Scheduler::instance().clock();
00293 if (pool_->set_etime(argv[2], et) != -1)
00294 return TCL_OK;
00295 else
00296 return TCL_ERROR;
00297 }
00298 } else {
00299 if (strcmp(argv[1], "send") == 0) {
00300
00301
00302
00303 HttpApp *client =
00304 (HttpApp *)TclObject::lookup(argv[2]);
00305 if (client == NULL) {
00306 tcl.add_errorf("%s: bad client name %s",
00307 name_, argv[2]);
00308 return TCL_ERROR;
00309 }
00310 int bytes = atoi(argv[3]);
00311 TcpApp *cnc = (TcpApp *)lookup_cnc(client);
00312 if (cnc == NULL) {
00313
00314
00315
00316 return TCL_OK;
00317 }
00318 char *buf = strdup(argv[4]);
00319 HttpNormalData *d =
00320 new HttpNormalData(id_, bytes, buf);
00321 cnc->send(bytes, d);
00322
00323 free(buf);
00324 return TCL_OK;
00325
00326 } else if (strcmp(argv[1], "enter-page") == 0) {
00327 ClientPage* pg = pool_->enter_page(argc, argv);
00328 if (pg == NULL)
00329 return TCL_ERROR;
00330 else
00331 return TCL_OK;
00332
00333 } else if (strcmp(argv[1], "evTrace") == 0) {
00334 char buf[1024], *p;
00335 if (log_ != 0) {
00336 sprintf(buf, TIME_FORMAT" i %d ",
00337 BaseTrace::round(Scheduler::instance().clock()),
00338 id_);
00339 p = &(buf[strlen(buf)]);
00340 for (int i = 2; i < argc; i++) {
00341 strcpy(p, argv[i]);
00342 p += strlen(argv[i]);
00343 *(p++) = ' ';
00344 }
00345
00346 *(p++) = '\n', *p = 0;
00347 Tcl_Write(log_, buf, p-buf);
00348 }
00349 return TCL_OK;
00350 }
00351 }
00352
00353 return TclObject::command(argc, argv);
00354 }
00355
00356 void HttpApp::log(const char* fmt, ...)
00357 {
00358
00359 if (log_ == 0)
00360 return;
00361
00362 char buf[10240], *p;
00363 sprintf(buf, TIME_FORMAT" i %d ",
00364 BaseTrace::round(Scheduler::instance().clock()), id_);
00365 p = &(buf[strlen(buf)]);
00366 va_list ap;
00367 va_start(ap, fmt);
00368 vsprintf(p, fmt, ap);
00369 Tcl_Write(log_, buf, strlen(buf));
00370 }
00371
00372 void HttpApp::process_data(int, AppData* data)
00373 {
00374 if (data == NULL)
00375 return;
00376
00377 switch (data->type()) {
00378 case HTTP_NORMAL: {
00379 HttpNormalData *tmp = (HttpNormalData*)data;
00380 Tcl::instance().eval(tmp->str());
00381 break;
00382 }
00383 default:
00384 fprintf(stderr, "Bad http invalidation data type %d\n",
00385 data->type());
00386 abort();
00387 break;
00388 }
00389 }
00390
00391
00392
00393
00394
00395
00396 static class HttpClientClass : public TclClass {
00397 public:
00398 HttpClientClass() : TclClass("Http/Client") {}
00399 TclObject* create(int, const char*const*) {
00400 return (new HttpClient());
00401 }
00402 } class_httpclient_app;
00403
00404
00405
00406
00407
00408
00409 static class HttpServerClass : public TclClass {
00410 public:
00411 HttpServerClass() : TclClass("Http/Server") {}
00412 TclObject* create(int, const char*const*) {
00413 return (new HttpServer());
00414 }
00415 } class_httpserver_app;
00416
00417 static class HttpInvalServerClass : public TclClass {
00418 public:
00419 HttpInvalServerClass() : TclClass("Http/Server/Inval") {}
00420 TclObject* create(int, const char*const*) {
00421 return (new HttpInvalServer());
00422 }
00423 } class_httpinvalserver_app;
00424
00425 static class HttpYucInvalServerClass : public TclClass {
00426 public:
00427 HttpYucInvalServerClass() : TclClass("Http/Server/Inval/Yuc") {}
00428 TclObject* create(int, const char*const*) {
00429 return (new HttpYucInvalServer());
00430 }
00431 } class_httpyucinvalserver_app;
00432
00433 HttpYucInvalServer::HttpYucInvalServer() :
00434 inv_sender_(0), invlist_(0), num_inv_(0)
00435 {
00436 bind("hb_interval_", &hb_interval_);
00437 bind("enable_upd_", &enable_upd_);
00438 bind("Ca_", &Ca_);
00439 bind("Cb_", &Cb_);
00440 bind("push_thresh_", &push_thresh_);
00441 bind("push_high_bound_", &push_high_bound_);
00442 bind("push_low_bound_", &push_low_bound_);
00443 }
00444
00445 int HttpYucInvalServer::command(int argc, const char*const* argv)
00446 {
00447 Tcl& tcl = Tcl::instance();
00448
00449 switch (argv[1][0]) {
00450 case 'a':
00451 if (strcmp(argv[1], "add-inval-sender") == 0) {
00452 HttpUInvalAgent *tmp =
00453 (HttpUInvalAgent *)TclObject::lookup(argv[2]);
00454 if (tmp == NULL) {
00455 tcl.resultf("Non-existent agent %s", argv[2]);
00456 return TCL_ERROR;
00457 }
00458 inv_sender_ = tmp;
00459 return TCL_OK;
00460 } if (strcmp(argv[1], "add-inv") == 0) {
00461
00462
00463
00464 double mtime = strtod(argv[3], NULL);
00465 add_inv(argv[2], mtime);
00466 return TCL_OK;
00467 }
00468 break;
00469 case 'c':
00470 if (strcmp(argv[1], "count-request") == 0) {
00471 ClientPage *pg =
00472 (ClientPage *)pool_->get_page(argv[2]);
00473 if (pg == NULL) {
00474 tcl.resultf("%d count-request: No page %s",
00475 id_, argv[2]);
00476 return TCL_ERROR;
00477 }
00478 pg->count_request(Cb_, push_high_bound_);
00479 log("S NTF p %s v %d\n", argv[2], pg->counter());
00480 return TCL_OK;
00481 } else if (strcmp(argv[1], "count-inval") == 0) {
00482 ClientPage *pg =
00483 (ClientPage *)pool_->get_page(argv[2]);
00484 if (pg == NULL) {
00485 tcl.resultf("%d count-inval: No page %s",
00486 id_, argv[2]);
00487 return TCL_ERROR;
00488 }
00489 pg->count_inval(Ca_, push_low_bound_);
00490 log("S NTF p %s v %d\n", argv[2], pg->counter());
00491 return TCL_OK;
00492 }
00493 break;
00494 case 'i':
00495 if (strcmp(argv[1], "is-pushable") == 0) {
00496 ClientPage *pg =
00497 (ClientPage *)pool_->get_page(argv[2]);
00498 if (pg == NULL) {
00499 tcl.resultf("%d is-pushable: No page %s",
00500 id_, argv[2]);
00501 return TCL_ERROR;
00502 }
00503 if (pg->is_mpush() &&
00504 (Scheduler::instance().clock() - pg->mpush_time() >
00505 HTTP_HBEXPIRE_COUNT*hb_interval_)) {
00506
00507 pg->clear_mpush();
00508 fprintf(stderr,
00509 "server %d timeout mpush\n", id_);
00510 }
00511 tcl.resultf("%d", (enable_upd_ &&
00512 (pg->counter() >= push_thresh_) ||
00513 pg->is_mpush()));
00514 return TCL_OK;
00515 }
00516 break;
00517 case 'r':
00518 if ((strcmp(argv[1], "request-mpush") == 0) ||
00519 (strcmp(argv[1], "refresh-mpush") == 0)) {
00520 ClientPage *pg =
00521 (ClientPage *)pool_->get_page(argv[2]);
00522 if (pg == NULL) {
00523 tcl.resultf("%d is-valid: No page %s",
00524 id_, argv[2]);
00525 return TCL_ERROR;
00526 }
00527 pg->set_mpush(Scheduler::instance().clock());
00528 return TCL_OK;
00529 }
00530 break;
00531 case 's':
00532 if (strcmp(argv[1], "send-hb") == 0) {
00533 send_heartbeat();
00534 return TCL_OK;
00535 } else if (strcmp(argv[1], "stop-mpush") == 0) {
00536 ClientPage *pg =
00537 (ClientPage *)pool_->get_page(argv[2]);
00538 if (pg == NULL) {
00539 tcl.resultf("%d is-valid: No page %s",
00540 id_, argv[2]);
00541 return TCL_ERROR;
00542 }
00543 pg->clear_mpush();
00544 fprintf(stderr, "server %d stopped mpush\n", id_);
00545 return TCL_OK;
00546 }
00547 break;
00548 }
00549
00550 return HttpApp::command(argc, argv);
00551 }
00552
00553 void HttpYucInvalServer::add_inv(const char *name, double mtime)
00554 {
00555 InvalidationRec *p = get_invrec(name);
00556 if ((p != NULL) && (p->mtime() < mtime)) {
00557 p->detach();
00558 delete p;
00559 p = NULL;
00560 num_inv_--;
00561 }
00562 if (p == NULL) {
00563 p = new InvalidationRec(name, mtime);
00564 p->insert(&invlist_);
00565 num_inv_++;
00566 }
00567 }
00568
00569 InvalidationRec* HttpYucInvalServer::get_invrec(const char *name)
00570 {
00571
00572
00573
00574 InvalidationRec *r = invlist_;
00575 for (r = invlist_; r != NULL; r = r->next())
00576 if (strcmp(name, r->pg()) == 0)
00577 return r;
00578 return NULL;
00579 }
00580
00581 HttpHbData* HttpYucInvalServer::pack_heartbeat()
00582 {
00583 HttpHbData *data = new HttpHbData(id_, num_inv_);
00584 InvalidationRec *p = invlist_, *q;
00585 int i = 0;
00586 while (p != NULL) {
00587 data->add(i++, p);
00588
00589 if (!p->dec_scount()) {
00590
00591
00592
00593
00594 q = p;
00595 p = p->next();
00596 q->detach();
00597 delete q;
00598 num_inv_--;
00599 } else
00600 p = p->next();
00601 }
00602 return data;
00603 }
00604
00605 void HttpYucInvalServer::send_hb_helper(int size, AppData *data)
00606 {
00607 inv_sender_->send(size, data);
00608 }
00609
00610 void HttpYucInvalServer::send_heartbeat()
00611 {
00612 if (inv_sender_ == NULL)
00613 return;
00614
00615 HttpHbData* d = pack_heartbeat();
00616 send_hb_helper(d->cost(), d);
00617 }
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629 static class HttpCacheClass : public TclClass {
00630 public:
00631 HttpCacheClass() : TclClass("Http/Cache") {}
00632 TclObject* create(int, const char*const*) {
00633 return (new HttpCache());
00634 }
00635 } class_httpcache_app;
00636
00637 static class HttpInvalCacheClass : public TclClass {
00638 public:
00639 HttpInvalCacheClass() : TclClass("Http/Cache/Inval") {}
00640 TclObject* create(int, const char*const*) {
00641 return (new HttpInvalCache());
00642 }
00643 } class_httpinvalcache_app;
00644
00645 static class HttpMInvalCacheClass : public TclClass {
00646 public:
00647 HttpMInvalCacheClass() : TclClass("Http/Cache/Inval/Mcast") {}
00648 TclObject* create(int, const char*const*) {
00649 return (new HttpMInvalCache());
00650 }
00651 } class_HttpMInvalCache_app;
00652
00653
00654 HttpMInvalCache** HttpMInvalCache::CacheRepository_ = NULL;
00655 int HttpMInvalCache::NumCache_ = 0;
00656
00657 void HttpMInvalCache::add_cache(HttpMInvalCache *c)
00658 {
00659 if (CacheRepository_ == NULL) {
00660 CacheRepository_ = new HttpMInvalCache* [c->id() + 1];
00661 CacheRepository_[c->id()] = c;
00662 NumCache_ = c->id();
00663 } else if (NumCache_ < c->id()) {
00664 HttpMInvalCache** p = new HttpMInvalCache* [c->id()+1];
00665 memcpy(p, CacheRepository_,
00666 (c->id()+1)*sizeof(HttpMInvalCache*));
00667 delete[]CacheRepository_;
00668 CacheRepository_ = p;
00669 NumCache_ = c->id();
00670 p[c->id()] = c;
00671 } else
00672 CacheRepository_[c->id()] = c;
00673 }
00674
00675 HttpMInvalCache::HttpMInvalCache() :
00676 hb_timer_(this, HTTP_HBINTERVAL),
00677 inv_sender_(0), num_sender_(0), size_sender_(0),
00678 invlist_(0), num_inv_(0), inv_parent_(NULL),
00679 upd_sender_(NULL), num_updater_(0), size_updater_(0)
00680 {
00681 bind("hb_interval_", &hb_interval_);
00682 bind("enable_upd_", &enable_upd_);
00683 bind("Ca_", &Ca_);
00684 bind("Cb_", &Cb_);
00685 bind("push_thresh_", &push_thresh_);
00686 bind("push_high_bound_", &push_high_bound_);
00687 bind("push_low_bound_", &push_low_bound_);
00688
00689 hb_timer_.set_interval(hb_interval_);
00690 Tcl_InitHashTable(&sstate_, TCL_ONE_WORD_KEYS);
00691 Tcl_InitHashTable(&nbr_, TCL_ONE_WORD_KEYS);
00692 }
00693
00694 HttpMInvalCache::~HttpMInvalCache()
00695 {
00696 if (num_sender_ > 0)
00697 delete []inv_sender_;
00698 Tcl_DeleteHashTable(&sstate_);
00699 Tcl_DeleteHashTable(&nbr_);
00700 }
00701
00702 int HttpMInvalCache::command(int argc, const char*const* argv)
00703 {
00704 Tcl& tcl = Tcl::instance();
00705 if (argc < 2)
00706 return HttpInvalCache::command(argc, argv);
00707
00708 switch (argv[1][0]) {
00709 case 'a':
00710 if ((strcmp(argv[1], "add-inval-listener") == 0) ||
00711 (strcmp(argv[1], "add-upd-listener") == 0)) {
00712 HttpInvalAgent *tmp =
00713 (HttpInvalAgent *)TclObject::lookup(argv[2]);
00714 tmp->attachApp((Application *)this);
00715 return TCL_OK;
00716 } else if (strcmp(argv[1], "add-inval-sender") == 0) {
00717 HttpInvalAgent *tmp =
00718 (HttpInvalAgent *)TclObject::lookup(argv[2]);
00719 if (tmp == NULL) {
00720 tcl.resultf("Non-existent agent %s", argv[2]);
00721 return TCL_ERROR;
00722 }
00723 if (num_sender_ == size_sender_) {
00724 HttpInvalAgent **tt =
00725 new HttpInvalAgent*[size_sender_+5];
00726 memcpy(tt, inv_sender_,
00727 sizeof(HttpInvalAgent*)*size_sender_);
00728 delete []inv_sender_;
00729 size_sender_ += 5;
00730 inv_sender_ = tt;
00731 }
00732 inv_sender_[num_sender_++] = tmp;
00733 return TCL_OK;
00734 } else if (strcmp(argv[1], "add-to-map") == 0) {
00735 add_cache(this);
00736 return TCL_OK;
00737 } else if (strcmp(argv[1], "add-upd-sender") == 0) {
00738 HttpInvalAgent *tmp =
00739 (HttpInvalAgent *)TclObject::lookup(argv[2]);
00740 if (tmp == NULL) {
00741 tcl.resultf("Non-existent agent %s", argv[2]);
00742 return TCL_ERROR;
00743 }
00744 if (num_updater_ == size_updater_) {
00745 HttpInvalAgent **tt =
00746 new HttpInvalAgent*[size_updater_+5];
00747 memcpy(tt, upd_sender_,
00748 sizeof(HttpInvalAgent*)*size_updater_);
00749 delete []upd_sender_;
00750 size_updater_ += 5;
00751 upd_sender_ = tt;
00752 }
00753 upd_sender_[num_updater_++] = tmp;
00754 return TCL_OK;
00755 }
00756 break;
00757
00758 case 'c':
00759 if (strcmp(argv[1], "count-request") == 0) {
00760 ClientPage *pg =
00761 (ClientPage *)pool_->get_page(argv[2]);
00762 if (pg == NULL) {
00763 tcl.resultf("%d count-request: No page %s",
00764 id_, argv[2]);
00765 return TCL_ERROR;
00766 }
00767 pg->count_request(Cb_, push_high_bound_);
00768 log("E NTF p %s v %d\n", argv[2], pg->counter());
00769 return TCL_OK;
00770 } else if (strcmp(argv[1], "check-sstate") == 0) {
00771
00772
00773
00774
00775 int sid = atoi(argv[2]);
00776 int cid = atoi(argv[3]);
00777 check_sstate(sid, cid);
00778 return TCL_OK;
00779 }
00780 break;
00781
00782 case 'i':
00783
00784 if (strcmp(argv[1], "is-unread") == 0) {
00785 ClientPage *pg =
00786 (ClientPage *)pool_->get_page(argv[2]);
00787 if (pg == NULL) {
00788 tcl.resultf("%d is-unread: No page %s",
00789 id_, argv[2]);
00790 return TCL_ERROR;
00791 }
00792 tcl.resultf("%d", pg->is_unread());
00793 return TCL_OK;
00794 }
00795 break;
00796
00797 case 'j':
00798 if (strcmp(argv[1], "join") == 0) {
00799
00800
00801
00802
00803
00804
00805 int sid = atoi(argv[2]);
00806 HttpMInvalCache *cache =
00807 (HttpMInvalCache*)TclObject::lookup(argv[3]);
00808 if (cache == NULL) {
00809 tcl.add_errorf("Non-existent cache %s", argv[3]);
00810 return TCL_ERROR;
00811 }
00812
00813 NeighborCache *c = lookup_nbr(cache->id());
00814 if (c == NULL)
00815 add_nbr(cache);
00816
00817 check_sstate(sid, cache->id());
00818 return TCL_OK;
00819 }
00820 break;
00821
00822 case 'p':
00823 if (strcmp(argv[1], "parent-cache") == 0) {
00824
00825
00826
00827
00828
00829 int sid = atoi(argv[2]);
00830 SState *sst = lookup_sstate(sid);
00831 if (sst == NULL)
00832 tcl.result("");
00833 else {
00834
00835 NeighborCache *c = lookup_nbr(sst->cache()->cache()->id());
00836 tcl.resultf("%s", c->cache()->name());
00837 }
00838 return TCL_OK;
00839 } else if (strcmp(argv[1], "push-children") == 0) {
00840
00841 ClientPage *pg =
00842 (ClientPage *)pool_->get_page(argv[2]);
00843 if (pg == NULL) {
00844 tcl.resultf("%d is-valid: No page %s",
00845 id_, argv[2]);
00846 return TCL_ERROR;
00847 }
00848 send_upd(pg);
00849 return TCL_OK;
00850 }
00851 break;
00852
00853 case 'r':
00854 if (strcmp(argv[1], "recv-inv") == 0) {
00855
00856
00857
00858
00859
00860
00861
00862 HttpHbData *d = new HttpHbData(id_, 1);
00863 strcpy(d->rec_pg(0), argv[2]);
00864 d->rec_mtime(0) = strtod(argv[3], NULL);
00865
00866 tcl.resultf("%d", recv_inv(d));
00867 delete d;
00868 return TCL_OK;
00869 } else if (strcmp(argv[1], "recv-push") == 0) {
00870
00871
00872
00873 HttpUpdateData *d = new HttpUpdateData(id_, 1);
00874 strcpy(d->rec_page(0), argv[2]);
00875 for (int i = 3; i < argc; i+=2) {
00876 if (strcmp(argv[i], "modtime") == 0)
00877 d->rec_mtime(0) = strtod(argv[i+1], NULL);
00878 else if (strcmp(argv[i], "size") == 0) {
00879 d->rec_size(0) = atoi(argv[i+1]);
00880
00881 d->set_pgsize(d->rec_size(0));
00882 } else if (strcmp(argv[i], "age") == 0)
00883 d->rec_age(0) = strtod(argv[i+1], NULL);
00884 }
00885 tcl.resultf("%d", recv_upd(d));
00886 delete d;
00887 return TCL_OK;
00888 } else if (strcmp(argv[1], "register-server") == 0) {
00889
00890
00891
00892
00893
00894 int cid = atoi(argv[2]);
00895 int sid = atoi(argv[3]);
00896
00897 check_sstate(sid, cid);
00898 return TCL_OK;
00899 }
00900 break;
00901
00902 case 's':
00903 if (strcmp(argv[1], "start-hbtimer") == 0) {
00904 if (hb_timer_.status() == TIMER_IDLE)
00905 hb_timer_.sched();
00906 return TCL_OK;
00907 } else if (strcmp(argv[1], "server-hb") == 0) {
00908 int id = atoi(argv[2]);
00909 recv_heartbeat(id);
00910 return TCL_OK;
00911 } else if (strcmp(argv[1], "set-pinv-agent") == 0) {
00912 inv_parent_ =
00913 (HttpUInvalAgent*)TclObject::lookup(argv[2]);
00914 return TCL_OK;
00915 } else if (strcmp(argv[1], "set-parent") == 0) {
00916 HttpMInvalCache *c =
00917 (HttpMInvalCache*)TclObject::lookup(argv[2]);
00918 if (c == NULL) {
00919 tcl.add_errorf("Non-existent cache %s", argv[2]);
00920 return TCL_ERROR;
00921 }
00922
00923 add_nbr(c);
00924 return TCL_OK;
00925 } else if (strcmp(argv[1], "set-unread") == 0) {
00926 ClientPage *pg =
00927 (ClientPage *)pool_->get_page(argv[2]);
00928 if (pg == NULL) {
00929 tcl.resultf("%d is-valid: No page %s",
00930 id_, argv[2]);
00931 return TCL_ERROR;
00932 }
00933 pg->set_unread();
00934 return TCL_OK;
00935 } else if (strcmp(argv[1], "set-read") == 0) {
00936 ClientPage *pg =
00937 (ClientPage *)pool_->get_page(argv[2]);
00938 if (pg == NULL) {
00939 tcl.resultf("%d is-valid: No page %s",
00940 id_, argv[2]);
00941 return TCL_ERROR;
00942 }
00943 pg->set_read();
00944 return TCL_OK;
00945 } else if (strcmp(argv[1], "set-mandatory-push") == 0) {
00946 ClientPage *pg =
00947 (ClientPage *)pool_->get_page(argv[2]);
00948 if (pg == NULL) {
00949 tcl.resultf("%d is-valid: No page %s",
00950 id_, argv[2]);
00951 return TCL_ERROR;
00952 }
00953 pg->set_mpush(Scheduler::instance().clock());
00954 return TCL_OK;
00955 } else if (strcmp(argv[1], "stop-mpush") == 0) {
00956 ClientPage *pg =
00957 (ClientPage *)pool_->get_page(argv[2]);
00958 if (pg == NULL) {
00959 tcl.resultf("%d is-valid: No page %s",
00960 id_, argv[2]);
00961 return TCL_ERROR;
00962 }
00963 pg->clear_mpush();
00964 return TCL_OK;
00965 }
00966 break;
00967
00968 default:
00969 break;
00970 }
00971 return HttpInvalCache::command(argc, argv);
00972 }
00973
00974 void HttpMInvalCache::check_sstate(int sid, int cid)
00975 {
00976 if ((sid == cid) && (cid == id_))
00977
00978 return;
00979 SState *sst = lookup_sstate(sid);
00980 NeighborCache *c = lookup_nbr(cid);
00981 if (sst == NULL) {
00982 if (c == NULL) {
00983 fprintf(stderr,
00984 "%g: cache %d: No neighbor cache for received invalidation from %d via %d\n",
00985 Scheduler::instance().clock(), id_, sid, cid);
00986 abort();
00987 }
00988 #ifdef WEBCACHE_DEBUG
00989 fprintf(stderr,
00990 "%g: cache %d: registered server %d via cache %d\n",
00991 Scheduler::instance().clock(), id_, sid, cid);
00992 #endif
00993 sst = new SState(c);
00994 add_sstate(sid, sst);
00995 c->add_server(sid);
00996 } else if (sst->is_down()) {
00997 sst->up();
00998 if (cid != id_) {
00999 if (c == NULL) {
01000 fprintf(stderr,
01001 "[%g]: Cache %d has an invalid neighbor cache %d\n",
01002 Scheduler::instance().clock(), id_, cid);
01003 abort();
01004 }
01005 c->server_up(sid);
01006 }
01007 #ifdef WEBCACHE_DEBUG
01008 fprintf(stderr,
01009 "[%g] Cache %d reconnected to server %d via cache %d\n",
01010 Scheduler::instance().clock(), id_,
01011 sid, cid);
01012 #endif
01013 Tcl::instance().evalf("%s mark-rejoin", name_);
01014 }
01015 }
01016
01017 void HttpMInvalCache::add_sstate(int sid, SState *sst)
01018 {
01019 int newEntry = 1;
01020 Tcl_HashEntry *he =
01021 Tcl_CreateHashEntry(&sstate_, (const char *)sid, &newEntry);
01022 if (he == NULL)
01023 return;
01024 if (newEntry)
01025 Tcl_SetHashValue(he, (ClientData)sst);
01026 }
01027
01028 HttpMInvalCache::SState* HttpMInvalCache::lookup_sstate(int sid)
01029 {
01030 Tcl_HashEntry *he = Tcl_FindHashEntry(&sstate_, (const char *)sid);
01031 if (he == NULL)
01032 return NULL;
01033 return (SState *)Tcl_GetHashValue(he);
01034 }
01035
01036 NeighborCache* HttpMInvalCache::lookup_nbr(int id)
01037 {
01038 Tcl_HashEntry *he = Tcl_FindHashEntry(&nbr_, (const char *)id);
01039 if (he == NULL)
01040 return NULL;
01041 return (NeighborCache *)Tcl_GetHashValue(he);
01042 }
01043
01044
01045 void HttpMInvalCache::add_nbr(HttpMInvalCache *cache)
01046 {
01047 int newEntry = 1;
01048 Tcl_HashEntry *he =
01049 Tcl_CreateHashEntry(&nbr_, (const char *)cache->id(),
01050 &newEntry);
01051 if (he == NULL)
01052 return;
01053
01054 if (!newEntry)
01055 return;
01056
01057
01058 LivenessTimer *timer =
01059 new LivenessTimer(this,HTTP_HBEXPIRE_COUNT*hb_interval_,
01060 cache->id());
01061
01062 double time = Scheduler::instance().clock();
01063 NeighborCache *c = new NeighborCache(cache, time, timer);
01064 Tcl_SetHashValue(he, (ClientData)c);
01065 }
01066
01067
01068
01069
01070 void HttpMInvalCache::recv_heartbeat(int id)
01071 {
01072
01073 double time = Scheduler::instance().clock();
01074
01075 NeighborCache *c = lookup_nbr(id);
01076 if (c == NULL) {
01077
01078
01079
01080
01081
01082
01083 if (id == id_)
01084 return;
01085 add_nbr(map_cache(id));
01086 #ifdef WEBCACHE_DEBUG
01087 fprintf(stderr, "TLC %d discovered TLC %d\n", id_, id);
01088 #endif
01089 return;
01090 } else if (c->is_down()) {
01091
01092
01093 c->up();
01094 #ifdef WEBCACHE_DEBUG
01095 fprintf(stderr, "[%g] Cache %d reconnected to cache %d\n",
01096 Scheduler::instance().clock(), id_, id);
01097 #endif
01098 Tcl::instance().evalf("%s mark-rejoin", name_);
01099 } else
01100
01101 c->reset_timer(time);
01102 }
01103
01104 void HttpMInvalCache::invalidate_server(int sid)
01105 {
01106 SState *sst = lookup_sstate(sid);
01107 if (sst->is_down())
01108
01109 return;
01110 sst->down();
01111 pool_->invalidate_server(sid);
01112 }
01113
01114 void HttpMInvalCache::handle_node_failure(int cid)
01115 {
01116 #ifdef WEBCACHE_DEBUG
01117 fprintf(stderr, "[%g] Cache %d disconnected from cache %d\n",
01118 Scheduler::instance().clock(), id_, cid);
01119 #endif
01120 Tcl::instance().evalf("%s mark-leave", name_);
01121
01122 NeighborCache *c = lookup_nbr(cid);
01123 if (c == NULL) {
01124 fprintf(stderr, "%s: An unknown neighbor cache %d failed.\n",
01125 name_, cid);
01126 }
01127
01128 c->down();
01129
01130
01131 c->invalidate(this);
01132
01133
01134 HttpLeaveData* data = new HttpLeaveData(id_, c->num());
01135 c->pack_leave(*data);
01136 send_leave(data);
01137 }
01138
01139 void HttpMInvalCache::recv_leave(HttpLeaveData *d)
01140 {
01141 #ifdef WEBCACHE_DEBUG
01142 fprintf(stderr, "[%g] Cache %d gets a LEAVE from cache %d\n",
01143 Scheduler::instance().clock(), id_, d->id());
01144 #endif
01145
01146 if (d->num() == 0) {
01147 fprintf(stderr,
01148 "%s (%g) gets a leave from cache without server!\n",
01149 name_, Scheduler::instance().clock());
01150 return;
01151 }
01152
01153 SState *sst;
01154 HttpLeaveData* data = new HttpLeaveData(id_, d->num());
01155 NeighborCache *c = lookup_nbr(d->id());
01156 int i, j;
01157 for (i = 0, j = 0; i < d->num(); i++) {
01158 sst = lookup_sstate(d->rec_id(i));
01159
01160
01161
01162 if (sst == NULL)
01163 continue;
01164
01165 if (sst->is_down())
01166 continue;
01167
01168
01169
01170 if (c != sst->cache())
01171 continue;
01172
01173
01174
01175 sst->down();
01176 data->add(j++, d->rec_id(i));
01177 pool_->invalidate_server(d->rec_id(i));
01178 Tcl::instance().evalf("%s mark-leave", name_);
01179 }
01180
01181 if (j > 0)
01182 send_leave(data);
01183 delete data;
01184 }
01185
01186 void HttpMInvalCache::send_leave(HttpLeaveData *d)
01187 {
01188 send_hb_helper(d->cost(), d);
01189 }
01190
01191 void HttpMInvalCache::timeout(int reason)
01192 {
01193 switch (reason) {
01194 case HTTP_INVALIDATION:
01195
01196 send_heartbeat();
01197 break;
01198 case HTTP_UPDATE:
01199
01200
01201 break;
01202 default:
01203 fprintf(stderr, "%s: Unknown reason %d", name_, reason);
01204 break;
01205 }
01206 }
01207
01208 void HttpMInvalCache::process_data(int size, AppData* data)
01209 {
01210 if (data == NULL)
01211 return;
01212
01213 switch (data->type()) {
01214 case HTTP_INVALIDATION: {
01215
01216 HttpHbData *inv = (HttpHbData*)data;
01217 recv_heartbeat(inv->id());
01218 recv_inv(inv);
01219 break;
01220 }
01221 case HTTP_UPDATE: {
01222
01223 HttpUpdateData *pg = (HttpUpdateData*)data;
01224 recv_upd(pg);
01225 break;
01226 }
01227
01228 case HTTP_LEAVE: {
01229 HttpLeaveData *l = (HttpLeaveData*)data;
01230 recv_leave(l);
01231 break;
01232 }
01233 default:
01234 HttpApp::process_data(size, data);
01235 return;
01236 }
01237 }
01238
01239 void HttpMInvalCache::add_inv(const char *name, double mtime)
01240 {
01241 InvalidationRec *p = get_invrec(name);
01242 if ((p != NULL) && (p->mtime() < mtime)) {
01243 p->detach();
01244 delete p;
01245 p = NULL;
01246 num_inv_--;
01247 }
01248 if (p == NULL) {
01249 p = new InvalidationRec(name, mtime);
01250 p->insert(&invlist_);
01251 num_inv_++;
01252 }
01253 }
01254
01255 InvalidationRec* HttpMInvalCache::get_invrec(const char *name)
01256 {
01257
01258
01259
01260 InvalidationRec *r = invlist_;
01261 for (r = invlist_; r != NULL; r = r->next())
01262 if (strcmp(name, r->pg()) == 0)
01263 return r;
01264 return NULL;
01265 }
01266
01267 HttpHbData* HttpMInvalCache::pack_heartbeat()
01268 {
01269 HttpHbData *data = new HttpHbData(id_, num_inv_);
01270 InvalidationRec *p = invlist_, *q;
01271 int i = 0;
01272 while (p != NULL) {
01273 data->add(i++, p);
01274
01275 if (!p->dec_scount()) {
01276
01277
01278
01279
01280 q = p;
01281 p = p->next();
01282 q->detach();
01283 delete q;
01284 num_inv_--;
01285 } else
01286 p = p->next();
01287 }
01288 return data;
01289 }
01290
01291 int HttpMInvalCache::recv_inv(HttpHbData *data)
01292 {
01293 if (data->num_inv() == 0)
01294 return 0;
01295
01296 InvalidationRec *head;
01297 data->extract(head);
01298 int old_inv = num_inv_;
01299 process_inv(data->num_inv(), head, data->id());
01300
01301 if (old_inv < num_inv_)
01302
01303 return 1;
01304 else
01305 return 0;
01306 }
01307
01308
01309
01310
01311 void HttpMInvalCache::process_inv(int, InvalidationRec *ivlist, int cache)
01312 {
01313 InvalidationRec *p = ivlist, *q, *r;
01314
01315 while (p != NULL) {
01316 ClientPage* pg = (ClientPage *)pool_->get_page(p->pg());
01317
01318
01319
01320
01321
01322
01323 if (pg != NULL) {
01324 check_sstate(pg->server()->id(), cache);
01325
01326
01327
01328 SState *sst = lookup_sstate(pg->server()->id());
01329 if (sst == NULL) {
01330
01331 fprintf(stderr,
01332 "%s %d: couldn't find the server.\n",
01333 __FILE__, __LINE__);
01334 abort();
01335 }
01336 if ((sst->cache()->cache()->id() == cache) &&
01337 (pg->mtime() > p->mtime())) {
01338
01339 pg->count_inval(Ca_, push_low_bound_);
01340 log("E NTF p %s v %d\n",p->pg(),pg->counter());
01341 }
01342 }
01343
01344
01345 if (recv_inv_filter(pg, p) == HTTP_INVALCACHE_FILTERED) {
01346
01347
01348
01349
01350
01351
01352 q = p;
01353 p = p->next();
01354 q->detach();
01355 delete q;
01356 } else {
01357
01358
01359 pg->invalidate(p->mtime());
01360
01361 q = get_invrec(p->pg());
01362 if ((q != NULL) && (q->mtime() < p->mtime())) {
01363 q->detach();
01364 delete q;
01365 q = NULL;
01366 num_inv_--;
01367 }
01368 r = p;
01369 p = p->next();
01370 r->detach();
01371
01372 if (q == NULL) {
01373 r->insert(&invlist_);
01374 num_inv_++;
01375
01376 Tcl::instance().evalf("%s mark-invalid",name_);
01377 log("E GINV p %s m %.17g\n", r->pg(), r->mtime());
01378 } else
01379 delete r;
01380 }
01381 }
01382 }
01383
01384 void HttpMInvalCache::send_hb_helper(int size, AppData *data)
01385 {
01386 if (inv_parent_ != NULL)
01387 inv_parent_->send(size, data->copy());
01388 for (int i = 0; i < num_sender_; i++)
01389 inv_sender_[i]->send(size, data->copy());
01390 }
01391
01392 void HttpMInvalCache::send_heartbeat()
01393 {
01394 if ((num_sender_ == 0) && (inv_parent_ == NULL))
01395 return;
01396
01397 HttpHbData* d = pack_heartbeat();
01398 send_hb_helper(d->cost(), d);
01399 delete d;
01400 }
01401
01402 int HttpMInvalCache::recv_upd(HttpUpdateData *d)
01403 {
01404 if (d->num() != 1) {
01405 fprintf(stderr,
01406 "%d gets an update which contain !=1 pages.\n", id_);
01407 abort();
01408 }
01409
01410 ClientPage *pg = pool_->get_page(d->rec_page(0));
01411 if (pg != NULL)
01412 if (pg->mtime() >= d->rec_mtime(0)) {
01413
01414
01415
01416
01417
01418 return 0;
01419 } else {
01420
01421
01422 add_inv(d->rec_page(0), d->rec_mtime(0));
01423 pg->count_inval(Ca_, push_low_bound_);
01424 log("E NTF p %s v %d\n", d->rec_page(0),pg->counter());
01425 }
01426
01427
01428 ClientPage *q = pool_->enter_page(d->rec_page(0), d->rec_size(0),
01429 d->rec_mtime(0),
01430 Scheduler::instance().clock(),
01431 d->rec_age(0));
01432
01433 q->set_unread();
01434
01435 log("E GUPD m %.17g z %d\n", d->rec_mtime(0), d->pgsize());
01436 Tcl::instance().evalf("%s mark-valid", name_);
01437
01438
01439
01440 if (q->is_mpush() && (Scheduler::instance().clock() - q->mpush_time()
01441 > HTTP_HBEXPIRE_COUNT*hb_interval_)) {
01442
01443 q->clear_mpush();
01444 Tcl::instance().evalf("%s cancel-mpush-refresh %s",
01445 name_, d->rec_page(0));
01446 }
01447
01448 if (enable_upd_ && (q->counter() >= push_thresh_) || q->is_mpush())
01449
01450
01451 return 1;
01452 else
01453 return 0;
01454 }
01455
01456 HttpUpdateData* HttpMInvalCache::pack_upd(ClientPage* page)
01457 {
01458 HttpUpdateData *data = new HttpUpdateData(id_, 1);
01459 data->add(0, page);
01460 return data;
01461 }
01462
01463 void HttpMInvalCache::send_upd_helper(int pgsize, AppData* data)
01464 {
01465 for (int i = 0; i < num_updater_; i++)
01466 upd_sender_[i]->send(pgsize, data->copy());
01467 }
01468
01469 void HttpMInvalCache::send_upd(ClientPage *page)
01470 {
01471 if ((num_updater_ == 0) || !enable_upd_)
01472 return;
01473
01474 HttpUpdateData* d = pack_upd(page);
01475 send_upd_helper(d->pgsize(), d);
01476 delete d;
01477 }
01478
01479
01480
01481
01482
01483
01484 static class HttpPercInvalCacheClass : public TclClass {
01485 public:
01486 HttpPercInvalCacheClass() : TclClass("Http/Cache/Inval/Mcast/Perc") {}
01487 TclObject* create(int, const char*const*) {
01488 return (new HttpPercInvalCache());
01489 }
01490 } class_HttpPercInvalCache_app;
01491
01492 HttpPercInvalCache::HttpPercInvalCache()
01493 {
01494 bind("direct_request_", &direct_request_);
01495 }
01496
01497 int HttpPercInvalCache::command(int argc, const char*const* argv)
01498 {
01499 Tcl& tcl = Tcl::instance();
01500
01501 if (strcmp(argv[1], "is-header-valid") == 0) {
01502 ClientPage *pg =
01503 (ClientPage *)pool_->get_page(argv[2]);
01504 if (pg == NULL) {
01505 tcl.resultf("%d is-valid: No page %s",
01506 id_, argv[2]);
01507 return TCL_ERROR;
01508 }
01509 tcl.resultf("%d", pg->is_header_valid());
01510 return TCL_OK;
01511 } else if (strcmp(argv[1], "enter-metadata") == 0) {
01512
01513
01514
01515
01516
01517
01518 ClientPage *pg = pool_->enter_metadata(argc, argv);
01519 if (pg == NULL)
01520 return TCL_ERROR;
01521 else
01522 return TCL_OK;
01523 }
01524
01525 return HttpMInvalCache::command(argc, argv);
01526 }