Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

srm.cc

Go to the documentation of this file.
00001 /* -*-  Mode:C++; c-basic-offset:8; tab-width:8; indent-tabs-mode:t -*- */
00002 //
00003 // Copyright (c) 1997 by the University of Southern California
00004 // All rights reserved.
00005 //
00006 // Permission to use, copy, modify, and distribute this software and its
00007 // documentation in source and binary forms for non-commercial purposes
00008 // and without fee is hereby granted, provided that the above copyright
00009 // notice appear in all copies and that both the copyright notice and
00010 // this permission notice appear in supporting documentation. and that
00011 // any documentation, advertising materials, and other materials related
00012 // to such distribution and use acknowledge that the software was
00013 // developed by the University of Southern California, Information
00014 // Sciences Institute.  The name of the University may not be used to
00015 // endorse or promote products derived from this software without
00016 // specific prior written permission.
00017 //
00018 // THE UNIVERSITY OF SOUTHERN CALIFORNIA makes no representations about
00019 // the suitability of this software for any purpose.  THIS SOFTWARE IS
00020 // PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES,
00021 // INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
00022 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00023 //
00024 // Other copyrights might apply to parts of this software and are so
00025 // noted when applicable.
00026 //
00027 //      Maintainer:     Kannan Varadhan <kannan@isi.edu>
00028 //      Version Date:   Tue Jul 22 15:41:16 PDT 1997
00029 //
00030 
00031 #ifndef lint
00032 static const char rcsid[] =
00033     "@(#) $Header: /nfs/jade/vint/CVSROOT/ns-2/mcast/srm.cc,v 1.26 2002/09/18 05:41:51 sundarra Exp $ (USC/ISI)";
00034 #endif
00035 
00036 #include <stdlib.h>
00037 #include <assert.h>
00038 
00039 #include "config.h"
00040 #include "agent.h"
00041 #include "ip.h"
00042 #include "srm.h"
00043 #include "trace.h"
00044 #include "rtp.h"
00045 
00046 
00047 int hdr_srm::offset_;
00048 int hdr_asrm::offset_;
00049 
00050 static class SRMHeaderClass : public PacketHeaderClass {
00051 public:
00052         SRMHeaderClass() : PacketHeaderClass("PacketHeader/SRM",
00053                                              sizeof(hdr_srm)) {
00054                 bind_offset(&hdr_srm::offset_);
00055         }
00056 } class_srmhdr;
00057 
00058 static class ASRMHeaderClass : public PacketHeaderClass {
00059 public:
00060         ASRMHeaderClass() : PacketHeaderClass("PacketHeader/aSRM",
00061                                               sizeof(hdr_asrm)) {
00062                 bind_offset(&hdr_asrm::offset_);
00063         }
00064 } class_adaptive_srmhdr;
00065 
00066 
00067 static class SRMAgentClass : public TclClass {
00068 public:
00069         SRMAgentClass() : TclClass("Agent/SRM") {}
00070         TclObject* create(int, const char*const*) {
00071                 return (new SRMAgent());
00072         }
00073 } class_srm_agent;
00074 
00075 static class ASRMAgentClass : public TclClass {
00076 public:
00077         ASRMAgentClass() : TclClass("Agent/SRM/Adaptive") {}
00078         TclObject* create(int, const char*const*) {
00079                 return (new ASRMAgent());
00080         }
00081 } class_adaptive_srm_agent;
00082 
00083 
00084 SRMAgent::SRMAgent() 
00085         : Agent(PT_SRM), dataCtr_(-1), sessCtr_(-1), siphash_(0), seqno_(-1),
00086     app_type_(PT_NTYPE)
00087 {
00088         sip_ = new SRMinfo(-1);
00089 
00090         bind("packetSize_", &packetSize_);
00091         bind("groupSize_", &groupSize_);
00092         bind("app_fid_", &app_fid_);
00093 }
00094 
00095 SRMAgent::~SRMAgent()
00096 {
00097         cleanup();
00098 }
00099 
00100 int SRMAgent::command(int argc, const char*const* argv)
00101 {
00102         Tcl& tcl = Tcl::instance();
00103 
00104         if (strcmp(argv[1], "send") == 0) {
00105                 if (strcmp(argv[2], "session") == 0) {
00106                         send_sess();
00107                         return TCL_OK;
00108                 }
00109                 if (strcmp(argv[2], "request") == 0) {
00110                         int round = atoi(argv[3]);
00111                         int sender = atoi(argv[4]);
00112                         int msgid  = atoi(argv[5]);
00113                         send_ctrl(SRM_RQST, round, sender, msgid, 0);
00114                         return TCL_OK;
00115                 }
00116                 if (strcmp(argv[2], "repair") == 0) {
00117                         int round = atoi(argv[3]);
00118                         int sender = atoi(argv[4]);
00119                         int msgid  = atoi(argv[5]);
00120                         send_ctrl(SRM_REPR, round, sender, msgid, packetSize_);
00121                         return TCL_OK;
00122                 }
00123                 tcl.resultf("%s: invalid send request %s", name_, argv[2]);
00124                 return TCL_ERROR;
00125         }
00126         if (argc == 2) {
00127                 if (strcmp(argv[1], "distances?") == 0) {
00128                         tcl.result("");
00129                         if (sip_->sender_ >= 0) {  // i.e. this agent is active
00130                                 for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00131                                         tcl.resultf("%s %d %f", tcl.result(),
00132                                                     sp->sender_,
00133                                                     sp->distance_);
00134                                 }
00135                         }
00136                         return TCL_OK;
00137                 }
00138                 if (strcmp(argv[1], "start") == 0) {
00139                         start();
00140                         return TCL_OK;
00141                 }
00142         }
00143         if (argc == 3) {
00144                 if (strcmp(argv[1], "distance?") == 0) {
00145                         int sender = atoi(argv[2]);
00146                         SRMinfo* sp = get_state(sender);
00147                         tcl.resultf("%lf", sp->distance_);
00148                         return TCL_OK;
00149                 }
00150         if (strcmp(argv[1], "eventtrace") == 0) {
00151                 return (TCL_OK);
00152         }
00153         }
00154         return Agent::command(argc, argv);
00155 }
00156 
00157 void SRMAgent::recv(Packet* p, Handler* h)
00158 {
00159         hdr_ip*  ih = hdr_ip::access(p);
00160         hdr_srm* sh = hdr_srm::access(p);
00161         
00162         if (ih->daddr() == -1) {
00163                 // Packet from local agent.  Add srm headers, set dst, and fwd
00164                 sh->type() = SRM_DATA;
00165                 sh->sender() = addr();
00166                 sh->seqnum() = ++dataCtr_;
00167                 addExtendedHeaders(p);
00168                 ih->dst() = dst_;
00169                 target_->recv(p, h);
00170         } else {
00171 
00172 #if 0
00173                 static char *foo[] = {"NONE", "DATA", "SESS", "RQST", "REPR"};
00174                 fprintf(stderr, "%7.4f %s %d recvd SRM_%s <%d, %d> from %d\n",
00175                         Scheduler::instance().clock(), name_, addr_,
00176                         foo[sh->type()],
00177                         sh->sender(), sh->seqnum(), ih->src());
00178 #endif
00179                 
00180                 parseExtendedHeaders(p);
00181                 switch (sh->type()) {
00182                 case SRM_DATA:
00183                         recv_data(sh->sender(), sh->seqnum(), p->accessdata());
00184                         break;
00185                 case SRM_RQST:
00186                         recv_rqst(ih->saddr(),
00187                                   sh->round(), sh->sender(), sh->seqnum());
00188                         break;
00189                 case SRM_REPR:
00190                         recv_repr(sh->round(), sh->sender(), sh->seqnum(),
00191                                   p->accessdata());
00192                         break;
00193                 case SRM_SESS:
00194                         // This seqnum() is the session sequence number,
00195                         // not the data packet sequence numbers seen before.
00196                         recv_sess(p, sh->seqnum(), (int*) p->accessdata());
00197                         break;
00198                 }
00199                 Packet::free(p);
00200         }
00201 }
00202 
00203 void SRMAgent::sendmsg(int nbytes, const char* /*flags*/)
00204 {
00205         if (nbytes == -1) {
00206                 printf("Error:  sendmsg() for SRM should not be -1\n");
00207                 return;
00208         }
00209         // The traffic generator may have reset our payload type when it
00210         // initialized.  If so, save the current payload type as app_type_,
00211         // and set type_ to PT_SRM.  Use app_type_ for all app. packets
00212         // 
00213         if (type_ != PT_SRM) {
00214                 app_type_ = type_;
00215                 type_ = PT_SRM;
00216         }
00217         size_ = nbytes;
00218         Packet *p;
00219         p = allocpkt();
00220         hdr_ip*  ih = hdr_ip::access(p);
00221         hdr_srm* sh = hdr_srm::access(p);
00222         hdr_rtp* rh = hdr_rtp::access(p);
00223         hdr_cmn* ch = hdr_cmn::access(p);
00224         //hdr_cmn* ch = hdr_cmn::access(p);
00225         
00226         ch->ptype() = app_type_;
00227         ch->size() =  size_;
00228         ih->flowid() = app_fid_;
00229         rh->seqno() = ++seqno_;
00230         // Add srm headers, set dst, and fwd
00231         sh->type() = SRM_DATA;
00232         sh->sender() = addr();
00233         sh->seqnum() = ++dataCtr_;
00234         addExtendedHeaders(p);
00235         ih->dst() = dst_;
00236         target_->recv(p);
00237 }
00238 
00239 
00240 void SRMAgent::send_ctrl(int type, int round, int sender, int msgid, int size)
00241 {
00242         Packet* p = Agent::allocpkt();
00243         hdr_srm* sh = hdr_srm::access(p);
00244         sh->type() = type;
00245         sh->sender() = sender;
00246         sh->seqnum() = msgid;
00247         sh->round() = round;
00248         addExtendedHeaders(p);
00249 
00250         hdr_cmn* ch = hdr_cmn::access(p);
00251         ch->size() = sizeof(hdr_srm) + size;
00252         target_->recv(p);
00253 }
00254 
00255 void SRMAgent::recv_data(int sender, int msgid, u_char*)
00256 {
00257         Tcl& tcl = Tcl::instance();
00258         SRMinfo* sp = get_state(sender);
00259         if (msgid > sp->ldata_) {
00260                 (void) request(sp, msgid - 1);
00261                 sp->setReceived(msgid);
00262                 sp->ldata_ = msgid;
00263         } else {
00264                 tcl.evalf("%s recv data %d %d", name_, sender, msgid);
00265         }
00266 }
00267 
00268 void SRMAgent::recv_rqst(int requestor, int round, int sender, int msgid)
00269 {
00270         Tcl& tcl = Tcl::instance();
00271         SRMinfo* sp = get_state(sender);
00272         if (msgid > sp->ldata_) {
00273                 (void) request(sp, msgid);      // request upto msgid
00274                 sp->ldata_ = msgid;
00275         } else {
00276                 tcl.evalf("%s recv request %d %d %d %d", name_,
00277                           requestor, round, sender, msgid);
00278         }
00279 }
00280 
00281 void SRMAgent::recv_repr(int round, int sender, int msgid, u_char*)
00282 {
00283         Tcl& tcl = Tcl::instance();
00284         SRMinfo* sp = get_state(sender);
00285         if (msgid > sp->ldata_) {
00286                 (void) request(sp, msgid - 1);  // request upto msgid - 1
00287                 sp->setReceived(msgid);
00288                 sp->ldata_ = msgid;
00289         } else {
00290                 tcl.evalf("%s recv repair %d %d %d", name_,
00291                           round, sender, msgid);
00292         }
00293         // Notice that we currently make no provisions for a listener
00294         // agent to receive the data.
00295 }
00296 
00297 void SRMAgent::send_sess()
00298 {
00299         int     size = (1 + groupSize_ * 4) * sizeof(int);
00300         Packet* p = Agent::allocpkt(size);
00301         hdr_srm* sh = hdr_srm::access(p);
00302         sh->type() = SRM_SESS;
00303         sh->sender() = addr();
00304         sh->seqnum() = ++sessCtr_;
00305         addExtendedHeaders(p);
00306 
00307         int* data = (int*) p->accessdata();
00308         *data++ = groupSize_;
00309         for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00310                 *data++ = sp->sender_;
00311                 *data++ = sp->ldata_;
00312                 *data++ = sp->recvTime_;
00313                 *data++ = sp->sendTime_;
00314         }
00315         data = (int*) p->accessdata();
00316         data[4] = (int) (Scheduler::instance().clock()*1000);
00317 
00318         hdr_cmn* ch = hdr_cmn::access(p);
00319         ch->size() = size+ sizeof(hdr_srm);
00320 
00321         target_->recv(p, (Handler*)NULL);
00322 }
00323 
00324 #define GET_SESSION_INFO                        \
00325         sender = *data++;                       \
00326         dataCnt = *data++;                      \
00327         rtime = *data++;                        \
00328         stime = *data++
00329 
00330 void SRMAgent::recv_sess(Packet*, int sessCtr, int* data)
00331 {
00332         SRMinfo* sp;
00333         
00334         int sender, dataCnt, rtime, stime;
00335         int now, sentAt, sentBy;
00336         int cnt = *data++;
00337         int i;
00338 
00339         /* The first block contains the sender's own state */
00340         GET_SESSION_INFO;
00341         if (sender == addr())                   // sender's own session message
00342                 return;
00343 
00344         sp = get_state(sender);
00345         if (sp->lsess_ > sessCtr)               // older session message recd.
00346                 return;
00347         
00348         now = (int) (Scheduler::instance().clock() * 1000);
00349         sentBy = sender;                        // to later compute rtt
00350         sentAt = stime;
00351         
00352         sp->lsess_ = sessCtr;
00353         sp->recvTime_ = now;
00354         sp->sendTime_ = stime;
00355         (void) request(sp, dataCnt);
00356         if (sp->ldata_ < dataCnt)
00357                 sp->ldata_ = dataCnt;
00358         
00359         for (i = 1; i < cnt; i++) {
00360                 GET_SESSION_INFO;
00361                 if (sender == addr() && now) {
00362                         //
00363                         //    This session message from sender sentBy:
00364                         //                vvvvv
00365                         //          now <=======+ sentAt
00366                         //               |     |
00367                         //        stime +=======> rtime
00368                         //                ^^^^^
00369                         //   Earlier session message sent by ``this'' agent
00370                         //
00371                         int rtt = (now - sentAt) + (rtime - stime);
00372                         sp = get_state(sentBy);
00373                         sp->distance_ = (double) rtt / 2 / 1000;
00374 #if 0
00375                         fprintf(stderr,
00376                                 "%7.4f %s compute distance to %d: %f\n",
00377                                 Scheduler::instance().clock(), name_,
00378                                 sentBy, sp->distance_);
00379 #endif
00380                         continue;
00381                 }
00382                 sp = get_state(sender);
00383                 (void) request(sp, dataCnt);
00384                 if (sp->ldata_ < dataCnt)
00385                         sp->ldata_ = dataCnt;
00386         }
00387 }

Generated on Tue Apr 20 12:14:33 2004 for NS2.26SourcesOriginal by doxygen 1.3.3