00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #ifndef lint
00032 static const char rcsid[] =
00033 "@(#) $Header: /nfs/jade/vint/CVSROOT/ns-2/mcast/srm.cc,v 1.26 2002/09/18 05:41:51 sundarra Exp $ (USC/ISI)";
00034 #endif
00035
00036 #include <stdlib.h>
00037 #include <assert.h>
00038
00039 #include "config.h"
00040 #include "agent.h"
00041 #include "ip.h"
00042 #include "srm.h"
00043 #include "trace.h"
00044 #include "rtp.h"
00045
00046
00047 int hdr_srm::offset_;
00048 int hdr_asrm::offset_;
00049
00050 static class SRMHeaderClass : public PacketHeaderClass {
00051 public:
00052 SRMHeaderClass() : PacketHeaderClass("PacketHeader/SRM",
00053 sizeof(hdr_srm)) {
00054 bind_offset(&hdr_srm::offset_);
00055 }
00056 } class_srmhdr;
00057
00058 static class ASRMHeaderClass : public PacketHeaderClass {
00059 public:
00060 ASRMHeaderClass() : PacketHeaderClass("PacketHeader/aSRM",
00061 sizeof(hdr_asrm)) {
00062 bind_offset(&hdr_asrm::offset_);
00063 }
00064 } class_adaptive_srmhdr;
00065
00066
00067 static class SRMAgentClass : public TclClass {
00068 public:
00069 SRMAgentClass() : TclClass("Agent/SRM") {}
00070 TclObject* create(int, const char*const*) {
00071 return (new SRMAgent());
00072 }
00073 } class_srm_agent;
00074
00075 static class ASRMAgentClass : public TclClass {
00076 public:
00077 ASRMAgentClass() : TclClass("Agent/SRM/Adaptive") {}
00078 TclObject* create(int, const char*const*) {
00079 return (new ASRMAgent());
00080 }
00081 } class_adaptive_srm_agent;
00082
00083
00084 SRMAgent::SRMAgent()
00085 : Agent(PT_SRM), dataCtr_(-1), sessCtr_(-1), siphash_(0), seqno_(-1),
00086 app_type_(PT_NTYPE)
00087 {
00088 sip_ = new SRMinfo(-1);
00089
00090 bind("packetSize_", &packetSize_);
00091 bind("groupSize_", &groupSize_);
00092 bind("app_fid_", &app_fid_);
00093 }
00094
00095 SRMAgent::~SRMAgent()
00096 {
00097 cleanup();
00098 }
00099
00100 int SRMAgent::command(int argc, const char*const* argv)
00101 {
00102 Tcl& tcl = Tcl::instance();
00103
00104 if (strcmp(argv[1], "send") == 0) {
00105 if (strcmp(argv[2], "session") == 0) {
00106 send_sess();
00107 return TCL_OK;
00108 }
00109 if (strcmp(argv[2], "request") == 0) {
00110 int round = atoi(argv[3]);
00111 int sender = atoi(argv[4]);
00112 int msgid = atoi(argv[5]);
00113 send_ctrl(SRM_RQST, round, sender, msgid, 0);
00114 return TCL_OK;
00115 }
00116 if (strcmp(argv[2], "repair") == 0) {
00117 int round = atoi(argv[3]);
00118 int sender = atoi(argv[4]);
00119 int msgid = atoi(argv[5]);
00120 send_ctrl(SRM_REPR, round, sender, msgid, packetSize_);
00121 return TCL_OK;
00122 }
00123 tcl.resultf("%s: invalid send request %s", name_, argv[2]);
00124 return TCL_ERROR;
00125 }
00126 if (argc == 2) {
00127 if (strcmp(argv[1], "distances?") == 0) {
00128 tcl.result("");
00129 if (sip_->sender_ >= 0) {
00130 for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00131 tcl.resultf("%s %d %f", tcl.result(),
00132 sp->sender_,
00133 sp->distance_);
00134 }
00135 }
00136 return TCL_OK;
00137 }
00138 if (strcmp(argv[1], "start") == 0) {
00139 start();
00140 return TCL_OK;
00141 }
00142 }
00143 if (argc == 3) {
00144 if (strcmp(argv[1], "distance?") == 0) {
00145 int sender = atoi(argv[2]);
00146 SRMinfo* sp = get_state(sender);
00147 tcl.resultf("%lf", sp->distance_);
00148 return TCL_OK;
00149 }
00150 if (strcmp(argv[1], "eventtrace") == 0) {
00151 return (TCL_OK);
00152 }
00153 }
00154 return Agent::command(argc, argv);
00155 }
00156
00157 void SRMAgent::recv(Packet* p, Handler* h)
00158 {
00159 hdr_ip* ih = hdr_ip::access(p);
00160 hdr_srm* sh = hdr_srm::access(p);
00161
00162 if (ih->daddr() == -1) {
00163
00164 sh->type() = SRM_DATA;
00165 sh->sender() = addr();
00166 sh->seqnum() = ++dataCtr_;
00167 addExtendedHeaders(p);
00168 ih->dst() = dst_;
00169 target_->recv(p, h);
00170 } else {
00171
00172 #if 0
00173 static char *foo[] = {"NONE", "DATA", "SESS", "RQST", "REPR"};
00174 fprintf(stderr, "%7.4f %s %d recvd SRM_%s <%d, %d> from %d\n",
00175 Scheduler::instance().clock(), name_, addr_,
00176 foo[sh->type()],
00177 sh->sender(), sh->seqnum(), ih->src());
00178 #endif
00179
00180 parseExtendedHeaders(p);
00181 switch (sh->type()) {
00182 case SRM_DATA:
00183 recv_data(sh->sender(), sh->seqnum(), p->accessdata());
00184 break;
00185 case SRM_RQST:
00186 recv_rqst(ih->saddr(),
00187 sh->round(), sh->sender(), sh->seqnum());
00188 break;
00189 case SRM_REPR:
00190 recv_repr(sh->round(), sh->sender(), sh->seqnum(),
00191 p->accessdata());
00192 break;
00193 case SRM_SESS:
00194
00195
00196 recv_sess(p, sh->seqnum(), (int*) p->accessdata());
00197 break;
00198 }
00199 Packet::free(p);
00200 }
00201 }
00202
00203 void SRMAgent::sendmsg(int nbytes, const char* )
00204 {
00205 if (nbytes == -1) {
00206 printf("Error: sendmsg() for SRM should not be -1\n");
00207 return;
00208 }
00209
00210
00211
00212
00213 if (type_ != PT_SRM) {
00214 app_type_ = type_;
00215 type_ = PT_SRM;
00216 }
00217 size_ = nbytes;
00218 Packet *p;
00219 p = allocpkt();
00220 hdr_ip* ih = hdr_ip::access(p);
00221 hdr_srm* sh = hdr_srm::access(p);
00222 hdr_rtp* rh = hdr_rtp::access(p);
00223 hdr_cmn* ch = hdr_cmn::access(p);
00224
00225
00226 ch->ptype() = app_type_;
00227 ch->size() = size_;
00228 ih->flowid() = app_fid_;
00229 rh->seqno() = ++seqno_;
00230
00231 sh->type() = SRM_DATA;
00232 sh->sender() = addr();
00233 sh->seqnum() = ++dataCtr_;
00234 addExtendedHeaders(p);
00235 ih->dst() = dst_;
00236 target_->recv(p);
00237 }
00238
00239
00240 void SRMAgent::send_ctrl(int type, int round, int sender, int msgid, int size)
00241 {
00242 Packet* p = Agent::allocpkt();
00243 hdr_srm* sh = hdr_srm::access(p);
00244 sh->type() = type;
00245 sh->sender() = sender;
00246 sh->seqnum() = msgid;
00247 sh->round() = round;
00248 addExtendedHeaders(p);
00249
00250 hdr_cmn* ch = hdr_cmn::access(p);
00251 ch->size() = sizeof(hdr_srm) + size;
00252 target_->recv(p);
00253 }
00254
00255 void SRMAgent::recv_data(int sender, int msgid, u_char*)
00256 {
00257 Tcl& tcl = Tcl::instance();
00258 SRMinfo* sp = get_state(sender);
00259 if (msgid > sp->ldata_) {
00260 (void) request(sp, msgid - 1);
00261 sp->setReceived(msgid);
00262 sp->ldata_ = msgid;
00263 } else {
00264 tcl.evalf("%s recv data %d %d", name_, sender, msgid);
00265 }
00266 }
00267
00268 void SRMAgent::recv_rqst(int requestor, int round, int sender, int msgid)
00269 {
00270 Tcl& tcl = Tcl::instance();
00271 SRMinfo* sp = get_state(sender);
00272 if (msgid > sp->ldata_) {
00273 (void) request(sp, msgid);
00274 sp->ldata_ = msgid;
00275 } else {
00276 tcl.evalf("%s recv request %d %d %d %d", name_,
00277 requestor, round, sender, msgid);
00278 }
00279 }
00280
00281 void SRMAgent::recv_repr(int round, int sender, int msgid, u_char*)
00282 {
00283 Tcl& tcl = Tcl::instance();
00284 SRMinfo* sp = get_state(sender);
00285 if (msgid > sp->ldata_) {
00286 (void) request(sp, msgid - 1);
00287 sp->setReceived(msgid);
00288 sp->ldata_ = msgid;
00289 } else {
00290 tcl.evalf("%s recv repair %d %d %d", name_,
00291 round, sender, msgid);
00292 }
00293
00294
00295 }
00296
00297 void SRMAgent::send_sess()
00298 {
00299 int size = (1 + groupSize_ * 4) * sizeof(int);
00300 Packet* p = Agent::allocpkt(size);
00301 hdr_srm* sh = hdr_srm::access(p);
00302 sh->type() = SRM_SESS;
00303 sh->sender() = addr();
00304 sh->seqnum() = ++sessCtr_;
00305 addExtendedHeaders(p);
00306
00307 int* data = (int*) p->accessdata();
00308 *data++ = groupSize_;
00309 for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00310 *data++ = sp->sender_;
00311 *data++ = sp->ldata_;
00312 *data++ = sp->recvTime_;
00313 *data++ = sp->sendTime_;
00314 }
00315 data = (int*) p->accessdata();
00316 data[4] = (int) (Scheduler::instance().clock()*1000);
00317
00318 hdr_cmn* ch = hdr_cmn::access(p);
00319 ch->size() = size+ sizeof(hdr_srm);
00320
00321 target_->recv(p, (Handler*)NULL);
00322 }
00323
00324 #define GET_SESSION_INFO \
00325 sender = *data++; \
00326 dataCnt = *data++; \
00327 rtime = *data++; \
00328 stime = *data++
00329
00330 void SRMAgent::recv_sess(Packet*, int sessCtr, int* data)
00331 {
00332 SRMinfo* sp;
00333
00334 int sender, dataCnt, rtime, stime;
00335 int now, sentAt, sentBy;
00336 int cnt = *data++;
00337 int i;
00338
00339
00340 GET_SESSION_INFO;
00341 if (sender == addr())
00342 return;
00343
00344 sp = get_state(sender);
00345 if (sp->lsess_ > sessCtr)
00346 return;
00347
00348 now = (int) (Scheduler::instance().clock() * 1000);
00349 sentBy = sender;
00350 sentAt = stime;
00351
00352 sp->lsess_ = sessCtr;
00353 sp->recvTime_ = now;
00354 sp->sendTime_ = stime;
00355 (void) request(sp, dataCnt);
00356 if (sp->ldata_ < dataCnt)
00357 sp->ldata_ = dataCnt;
00358
00359 for (i = 1; i < cnt; i++) {
00360 GET_SESSION_INFO;
00361 if (sender == addr() && now) {
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371 int rtt = (now - sentAt) + (rtime - stime);
00372 sp = get_state(sentBy);
00373 sp->distance_ = (double) rtt / 2 / 1000;
00374 #if 0
00375 fprintf(stderr,
00376 "%7.4f %s compute distance to %d: %f\n",
00377 Scheduler::instance().clock(), name_,
00378 sentBy, sp->distance_);
00379 #endif
00380 continue;
00381 }
00382 sp = get_state(sender);
00383 (void) request(sp, dataCnt);
00384 if (sp->ldata_ < dataCnt)
00385 sp->ldata_ = dataCnt;
00386 }
00387 }