Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

lms-sender.cc

Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2001 University of Southern California.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms are permitted
00006  * provided that the above copyright notice and this paragraph are
00007  * duplicated in all such forms and that any documentation, advertising
00008  * materials, and other materials related to such distribution and use
00009  * acknowledge that the software was developed by the University of
00010  * Southern California, Information Sciences Institute.  The name of the
00011  * University may not be used to endorse or promote products derived from
00012  * this software without specific prior written permission.
00013  *
00014  * THIS SOFTWARE IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
00015  * WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
00016  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00017  *
00018  **
00019  * Light-Weight Multicast Services (LMS), Reliable Multicast
00020  *
00021  * lms-sender.cc
00022  *
00023  * This implements the Sending LMS agent, "Agent/LMS/Sender".
00024  *
00025  * Christos Papadopoulos. 
00026  * christos@isi.edu
00027  */
00028 
00029 #include "agent.h"
00030 #include "tclcl.h"
00031 #include "packet.h"
00032 #include "ip.h"
00033 #include "rtp.h"
00034 #include "config.h"
00035 #include "random.h"
00036 #include "trafgen.h"
00037 #include "lms.h"
00038 
00039 static int snd_uid_ = 0;
00040 
00041 class LmsSender;
00042 
00043 class LmsSender: public Agent {
00044  public:
00045         LmsSender();
00046         LmsSender *next_;
00047         int  command (int argc, const char*const* argv);
00048         void recv (Packet* pkt, Handler*);
00049 
00050  protected:
00051         void handle_lms_pkt (Packet* pkt);
00052         virtual void sendmsg(int nbytes, const char *flags = 0);
00053         void send_lms_pkt (int, int);
00054         void send_dmcast (hdr_lms* lh, int seqno, int fid);
00055         int  add_req (Packet *rq);
00056         void send_spm ();
00057         void solicit_naks ();
00058         void print_stats ();
00059         void print_all_stats (int drops);
00060     
00061         char    uname_[16];     // unique name in the form r#
00062         int     spm_seqno_;     // spm sequence number
00063         int     lms_cost_;      // for sender this is usually zero
00064         int     lms_ttl_;       // solicit nacks this many hops away
00065         int     packetSize_;    // remember pkt size for retransmissions
00066         int     lmsPacketSize_; // packet size for lms packets
00067         Packet  *req_list_;     // request list to detect duplicate requests
00068         int     req_list_sz_;   // current size of above list
00069 
00070         // Stats
00071         int     req_rcvd_;      // number of requests received
00072         int     dup_reqs_;      // number of duplicate requests received
00073         int     dmcasts_sent_;  // number of dmcasts sent
00074 
00075  static int     max_dup_naks_;  // max num of duplicate naks    
00076     
00077         // int  maxpkts_;
00078         int     seqno_;         // XXX: RTP should handle this
00079 
00080         //LmsSenderTimer lms_sender_timer_;
00081         
00082         // int  off_lms_;
00083         // int  off_ip_;
00084         // int  off_rtp_;
00085         // int  off_cmn_;
00086 };
00087 
00088 static class LmsSenderClass : public TclClass {
00089 public:
00090         LmsSenderClass() : TclClass("Agent/LMS/Sender") {}
00091         TclObject* create(int, const char*const*) {
00092                 return (new LmsSender());
00093         }
00094 } class_lms_sender;
00095 
00096 
00097 // Obligatory definition of the class static
00098 int LmsSender::max_dup_naks_ = 0;
00099 
00100 LmsSender::LmsSender(): Agent(PT_LMS), seqno_(-1)
00101 {
00102         sprintf (uname_, "sender%d", snd_uid_++);
00103         
00104         lms_cost_ = 0;
00105         spm_seqno_ = 0;
00106         req_list_ = NULL;
00107         req_rcvd_ = dup_reqs_ = max_dup_naks_ = 0;
00108         dmcasts_sent_ = 0;
00109         bind("lmsPacketSize_", &lmsPacketSize_);
00110         bind("packetSize_", &packetSize_);
00111         // bind("off_ip_", &off_ip_);
00112         // bind("off_lms_", &off_lms_);
00113         // bind("off_rtp_", &off_rtp_);
00114         // bind("off_cmn_", &off_cmn_);
00115         // bind("maxpkts_", &maxpkts_);
00116 }
00117 
00118 //
00119 // Send a data packet
00120 //
00121 void LmsSender::sendmsg(int nbytes, const char* flags)
00122 {
00123         Packet*   p = allocpkt ();
00124         hdr_ip* ih = HDR_IP(p);
00125         hdr_cmn* th = HDR_CMN(p);
00126         hdr_rtp* rh = HDR_RTP(p); // XXX
00127 
00128         bzero (HDR_LMS(p), sizeof (struct hdr_lms));
00129         packetSize_ = nbytes;   
00130         th->size_ = packetSize_;   
00131         rh->seqno() = ++seqno_; // XXX
00132 
00133         ih->flowid() = 1; //for coloring in NAM
00134 
00135 #ifdef LMS_DEBUG
00136 packet_t t = th->ptype();
00137 const char* nname = packet_info.name(t);
00138 double now = Scheduler::instance().clock();
00139 printf("SNDR: at %f %s sendmsg pkt %d type is %s, size is %d\n\n",
00140   now, uname_, seqno_, nname, th->size_);
00141 #endif
00142 
00143         target_->recv(p);
00144 }
00145 
00146 //
00147 // Send an LMS packet
00148 // set LMS header and adjust size_
00149 //
00150 void LmsSender::send_lms_pkt (int type, int cost)
00151 {
00152         Packet* p = allocpkt();
00153 
00154         (HDR_CMN(p))->ptype_ = PT_LMS;
00155         (HDR_IP(p))->flowid() = 7; //Coloring in NAM
00156 
00157         if (type != LMS_SETUP)
00158                 (HDR_CMN(p))->size_ = lmsPacketSize_; 
00159                 //(HDR_CMN(p))->size_ = sizeof(hdr_lms); 
00160         else 
00161                 (HDR_CMN(p))->size_ = packetSize_; 
00162                 //(HDR_CMN(p))->size_ = sizeof(hdr_lms); 
00163 
00164         hdr_lms* lh = HDR_LMS(p);
00165         lh->type_ = type;
00166         lh->cost_ = cost;
00167         lh->from_ = addr();
00168 //      lh->from_ = port();
00169         lh->src_ = addr();
00170         lh->group_ = daddr();
00171         lh->tp_addr_  = LMS_NOADDR;
00172         lh->tp_port_ = -1;
00173         lh->tp_iface_ = LMS_NOIFACE;
00174         lh->lo_ = lh->hi_ = -1;
00175         lh->ts_ = Scheduler::instance().clock();
00176 
00177 #ifdef LMS_DEBUG
00178 printf ("SNDR: %s send_lms_pkt from %d src %d dst %x type %d cost %d size %d at %f\n\n",
00179   uname_, lh->from_, lh->src_, lh->group_, lh->type_, lh->cost_,
00180   (HDR_CMN(p))->size_, lh->ts_);
00181 #endif 
00182         target_->recv(p);
00183 }
00184 
00185 //
00186 // Receive a packet
00187 //
00188 void LmsSender::recv(Packet* pkt, Handler*)
00189 {
00190         hdr_cmn* h = HDR_CMN(pkt);
00191 
00192         if (h->ptype_ == PT_LMS)
00193                 handle_lms_pkt (pkt);
00194         else    {
00195                 printf ("ERROR: %s received non LMS pkt type %d\n", uname_, h->ptype_);
00196                 Packet::free(pkt);
00197                 }
00198 }
00199 
00200 /*
00201  * Handle LMS packets
00202  */
00203 void LmsSender::handle_lms_pkt (Packet* pkt)
00204 {
00205         int st = 0;
00206 
00207         hdr_lms* lh = HDR_LMS(pkt);
00208         hdr_ip* iph = HDR_IP(pkt);
00209 #ifdef LMS_DEBUG
00210 int a1, a2;
00211 #endif
00212         switch (lh->type())
00213                 {
00214                 case LMS_REQ:
00215                         req_rcvd_++;
00216                         if (lh->src_ != addr())
00217                                 {
00218                                 printf ("ERROR: %s REQ with wrong source addr %d:%d\n",
00219                                         uname_, lh->src_>>8, lh->src_&0xff);
00220                                 abort ();
00221                                 }
00222 #ifdef LMS_DEBUG
00223 a1 = lh->from(); a2 = lh->src();
00224 printf ("SNDR: %s got LMS_REQ from %d:%d src %d:%d group 0x%x\n\n",
00225   uname_, a1>>8, a1&0xff, a2>>8, a2&0xff, lh->group());
00226 printf ("SNDR: TP: (%d, %d)\n\n", lh->tp_addr_, lh->tp_iface_);
00227 #endif
00228                         if ((st = add_req (pkt)) != 0)
00229                                 for (int i = lh->lo_; i <= lh->hi_; i++)
00230                                         send_dmcast (lh, i, 3);
00231                         else dup_reqs_++;
00232 
00233                         break;
00234                 case LMS_REFRESH:
00235                         break;
00236                 default:
00237                         printf ("***ERROR: %s Unexpected LMS packet type: %d\n\n", uname_, lh->type());
00238                         abort ();
00239                 }
00240         if (!st)
00241                 Packet::free(pkt);
00242 }
00243 
00244 int LmsSender::command(int argc, const char*const* argv)
00245 {
00246     if (argc == 2) {
00247         if (strcmp(argv[1], "lms-setup") == 0) {
00248             send_lms_pkt (LMS_SETUP, packetSize_);      // XXX
00249             return (TCL_OK);
00250         }
00251         if (strcmp(argv[1], "gather-links") == 0) {
00252             send_lms_pkt (LMS_LINKS, 0);
00253             return (TCL_OK);
00254         }
00255         if (strcmp(argv[1], "print-stats") == 0) {
00256             print_stats ();
00257             return (TCL_OK);
00258         }
00259     }
00260     if (argc == 3)
00261     {
00262         if (strcmp(argv[1], "solicit-naks") == 0) {
00263             lms_ttl_ = atoi (argv[2]);
00264             solicit_naks ();
00265             return (TCL_OK);
00266         }
00267         if (strcmp(argv[1], "set-replier-cost") == 0)
00268         {
00269             lms_cost_  = atoi (argv[2]);
00270             send_lms_pkt (LMS_REFRESH, lms_cost_);
00271             return (TCL_OK);
00272         }
00273         if (strcmp(argv[1], "print-all-stats") == 0) {
00274             print_all_stats ( atoi(argv[2]) );
00275             return (TCL_OK);
00276         }         
00277         if (strcmp(argv[1], "send") == 0) {
00278             sendmsg( atoi (argv[2]), 0);
00279             return (TCL_OK);
00280         }
00281     }
00282     return (Agent::command(argc, argv));
00283 }
00284 
00285 /*
00286  * Send a LMS_DMCAST in response to LMS_REQ packet
00287  */
00288 void LmsSender::send_dmcast (hdr_lms* lh, int seqno, int fid)
00289 {
00290         Packet          *p = allocpkt();
00291         hdr_rtp         *rh = HDR_RTP(p);
00292         hdr_ip          *piph = HDR_IP(p);
00293         hdr_cmn         *ch = HDR_CMN(p);
00294         hdr_lms         *plh = HDR_LMS(p);
00295 
00296         dmcasts_sent_++;
00297         rh->seqno() = seqno;
00298         piph->daddr() = lh->tp_addr_;
00299         piph->dport() = lh->tp_port_;
00300         ch->size_= packetSize_;
00301         piph->flowid() = fid;           // Green for repair; to color packets in NAM
00302 
00303         plh->type_ = LMS_DMCAST;
00304         plh->from_ = addr();
00305 //      plh->from_ = port();
00306         plh->src_ = addr();
00307         plh->group_ = daddr();
00308         plh->tp_addr_ = lh->tp_addr_;
00309         plh->tp_port_ = lh->tp_port_;
00310         plh->tp_iface_ = lh->tp_iface_;
00311         packet_t t = ch->ptype();
00312         const char* nname = packet_info.name(t);
00313         
00314         target_->recv(p);
00315 }
00316 
00317 // Add a new request to the request list.
00318 // Return 1 if the request was added,
00319 // 0 if this was a duplicate request and was not added.
00320 // Requests are added to the head of the list
00321 // and the list size is limited to 10 (XXX: should have time limit instead)
00322 //
00323 int LmsSender::add_req (Packet *rq)
00324 {
00325         hdr_lms *lh = HDR_LMS(rq);
00326         Packet  *p = req_list_;
00327         int     i = 0;
00328 
00329         if (!p)
00330         {
00331             req_list_ = rq;
00332             rq->next_ = 0;
00333             req_list_sz_ = 1;
00334             return 1;
00335         }
00336         while (p)
00337         {
00338             if (i++ > 10)
00339                 break;
00340             hdr_lms *plh = HDR_LMS(p);
00341             if ((plh->lo_ == lh->lo_) &&
00342                 (plh->hi_ == lh->hi_) &&
00343                 (plh->tp_addr_ == lh->tp_addr_) &&
00344                 (plh->tp_iface_ == lh->tp_iface_))
00345             {
00346                 struct lms_nak *nh = (struct lms_nak *)p->accessdata();
00347                 // increments the dup_cnt_ for this NAK, and updates
00348                 // max_dup_naks_ if appropriate
00349                 ++nh->dup_cnt_;
00350 
00351 #ifdef LMS_DEBUG
00352    printf ("SNDR: %s got %d dup reqs, max is %d\n", uname_, nh->dup_cnt_, max_dup_naks_);
00353 #endif                
00354                 
00355                 if( nh->dup_cnt_ > max_dup_naks_)
00356                     max_dup_naks_ = nh->dup_cnt_;
00357                                 
00358                 return 0;                
00359             }
00360             
00361             p = p->next_;
00362         }
00363         if (i > 10 && p && p->next_)
00364         {
00365             Packet::free (p->next_);
00366             p->next_ = 0;
00367             req_list_sz_--;
00368         }
00369         rq->next_ = req_list_;
00370         req_list_ = rq;
00371         req_list_sz_++;
00372         return 1;
00373 }
00374 
00375 
00376 void LmsSender::send_spm ()
00377 {
00378         Packet* p = allocpkt (sizeof (struct lms_spm));
00379         (HDR_CMN(p))->ptype_ = PT_LMS;
00380                 (HDR_CMN(p))->size_ = sizeof(struct lms_spm) + sizeof(hdr_lms);
00381         (HDR_IP(p))->flowid() = 7;
00382 
00383         hdr_lms* lh = HDR_LMS(p);
00384         
00385         lh->type_ = LMS_SPM;
00386 
00387         struct lms_spm *spm = (struct lms_spm *)p->accessdata();
00388         spm->spm_seqno_ = spm_seqno_++;
00389         spm->spm_path_ = addr();
00390         spm->spm_ts_ = Scheduler::instance().clock();
00391 
00392         target_->recv(p);
00393 }
00394 
00395 void LmsSender::solicit_naks ()
00396 {
00397         Packet* p = allocpkt(sizeof (struct lms_ctl));
00398         struct lms_ctl  *ctl = (struct lms_ctl*)p->accessdata ();
00399         (HDR_CMN(p))->size_ = sizeof(struct lms_ctl) + sizeof(hdr_lms); 
00400         (HDR_CMN(p))->ptype_ = PT_LMS;
00401     (HDR_IP(p))->flowid() = 7;
00402 
00403         hdr_lms* lh = HDR_LMS(p);
00404         lh->type_ = LMS_SRC_REFRESH;
00405         lh->ttl_  = lms_ttl_;
00406         lh->from_ = addr();
00407         lh->src_  = addr();
00408         lh->group_ = daddr();
00409 
00410         ctl->cost_ = lms_cost_;
00411         ctl->hop_cnt_ = 0;
00412 
00413         target_->recv(p);
00414 }
00415 
00416 void LmsSender::print_stats ()
00417 {
00418         printf ("%s:\n", uname_);
00419         printf ("\tPackets sent:\t\t%d\n", seqno_);
00420         printf ("\tRequests received:\t%d\n", req_rcvd_);
00421         printf ("\tDuplicate Requests:\t%d\n", dup_reqs_);
00422         printf ("\tDMCASTs sent:\t\t%d\n", dmcasts_sent_);
00423         printf ("\n");
00424 }
00425 
00426 void LmsSender::print_all_stats (int drops)
00427 {
00428     if (drops) 
00429         printf ("\t%.5lf\t  %d", float(dup_reqs_)/float(drops), max_dup_naks_);
00430     else 
00431         printf ("\t0.0\t  0");
00432 }

Generated on Tue Apr 20 12:14:22 2004 for NS2.26SourcesOriginal by doxygen 1.3.3