Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

lms-agent.cc

Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2001 University of Southern California.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms are permitted
00006  * provided that the above copyright notice and this paragraph are
00007  * duplicated in all such forms and that any documentation, advertising
00008  * materials, and other materials related to such distribution and use
00009  * acknowledge that the software was developed by the University of
00010  * Southern California, Information Sciences Institute.  The name of the
00011  * University may not be used to endorse or promote products derived from
00012  * this software without specific prior written permission.
00013  *
00014  * THIS SOFTWARE IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
00015  * WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
00016  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00017  *
00018  **
00019  * Light-Weight Multicast Services (LMS), Reliable Multicast
00020  *
00021  * lms-agent.cc
00022  *
00023  * This implements the network element LMS agent, "Agent/LMS".
00024  *
00025  * Christos Papadopoulos. 
00026  * christos@isi.edu
00027  */
00028 
00029 #include <assert.h>
00030 
00031 #include "agent.h"
00032 #include "tclcl.h"
00033 #include "packet.h"
00034 #include "random.h"
00035 #include "lms.h"
00036 #include "ip.h"
00037 
00038 static int lms_agent_uid_ = 0;  // each LMS agent has a uid
00039 
00040 class LmsAgent : public Agent {
00041  public:
00042         LmsAgent ();
00043         int     command (int argc, const char*const* argv);
00044         void    recv (Packet*, Handler*);
00045  protected:
00046         //inline int    off_lms() { return off_lms_; };
00047         NsObject*       iface2link (int iface);
00048         NsObject*       pkt2agent (Packet *pkt);
00049         void            send_upstream (Packet *p);
00050         void            send_downstream (Packet *p);
00051 
00052         inline void
00053          send2replier (Packet *p)
00054                 {
00055                 hdr_ip* piph = HDR_IP(p);
00056                 hdr_cmn* c = HDR_CMN(p);
00057 #ifdef LMS_DEBUG
00058 printf("%s send2replier: Got piph->saddr: %d, downstream_addr: %d. Replier: %d\n", uname_, piph->saddr(), downstream_lms_.addr_, replier_ );
00059 #endif
00060                 if (replier_ != NULL)
00061                         replier_->recv (p);
00062                 else if ((downstream_lms_.addr_ != LMS_NOADDR) &&
00063                          (piph->saddr() != downstream_lms_.addr_) )
00064                         send_downstream(p);
00065                 else    send_upstream (p); 
00066                 };
00067 
00068         char    uname_[16];     // agent's unique name
00069         int     lms_enabled_;   // is this agent enabled? Default is YES
00070         NsObject* replier_;     // agent's replier (link or receiver)
00071         int     replier_iface_; // replier interface (-2 if replier is local)
00072         int     replier_cost_;  // replier cost
00073         int     replier_dist_;  // replier distance, in #of hops
00074         nsaddr_t upstream_lms_; // upstream LMS entity
00075         ns_addr_t downstream_lms_;// needed for incr.deployment
00076     
00077         int     upstream_iface_;// upstream interface (-2 if source is local)
00078         int     have_rcvr_;     // flag the presence of a receiver on this node
00079 
00080         // Header offsets
00081         // int  off_ip_;
00082         // int  off_lms_;
00083 };
00084 
00085 //
00086 // Declaration of Agent/LMS
00087 //
00088 static class LmsClass : public TclClass {
00089 public:
00090         LmsClass() : TclClass("Agent/LMS") {}
00091         TclObject* create(int, const char*const*) {
00092                 return (new LmsAgent());
00093         }
00094 } class_lms_agent;
00095 
00096 
00097 //
00098 // Declaration of PacketHeader/Lms
00099 //
00100 
00101 int hdr_lms::offset_;
00102 
00103 static class LmsHeaderClass : public PacketHeaderClass {
00104 public:
00105         LmsHeaderClass() : PacketHeaderClass("PacketHeader/Lms",
00106                                              sizeof(hdr_lms)) {
00107                 bind_offset(&hdr_lms::offset_);
00108         }
00109 } class_lmshdr;
00110 
00111 
00112 //
00113 // Initialization routine
00114 //
00115 LmsAgent::LmsAgent() : Agent(PT_LMS)
00116 {
00117         sprintf (uname_, "lms%d", lms_agent_uid_++);
00118         replier_  = NULL;
00119         replier_iface_ = LMS_NOIFACE;
00120         replier_cost_  = LMS_INFINITY;
00121         replier_dist_  = 0;
00122         upstream_lms_  = LMS_NOADDR;
00123         downstream_lms_.addr_ = LMS_NOADDR;
00124         downstream_lms_.port_ = LMS_NOPORT;
00125         have_rcvr_ = 0;
00126 
00127         bind("lms_enabled_", &lms_enabled_);
00128         bind("packetSize_", &size_);
00129         // bind("off_ip_", &off_ip_);
00130         // bind("off_lms_", &off_lms_);
00131 }
00132 
00133 //
00134 // Main LMS receive function
00135 //
00136 void LmsAgent::recv (Packet* pkt, Handler*)
00137 {
00138     hdr_cmn* h = HDR_CMN(pkt);
00139     hdr_lms* lh = HDR_LMS(pkt);
00140     double now = Scheduler::instance().clock();
00141     
00142 #ifdef LMS_DEBUG
00143 int a1 = lh->from_; int a2 = lh->src_;
00144 printf ("at %f %s received packet type %d from %d:%d src %d:%d group %x iface %d\n\n", now,uname_, lh->type(), a1>>8, a1&0xff, a2>>8, a2&0xff, lh->group_,h->iface());
00145 #endif
00146 
00147         // if this agent is not enabled, simply pass on the packet
00148         if (!lms_enabled_)
00149                 {
00150                 target_->recv (pkt);
00151                 return;
00152                 }
00153 
00154         switch (lh->type())
00155         {
00156          case LMS_SRC_REFRESH:
00157                 {
00158                 // If we have a replier already, and the upstream adv cost
00159                 // is less than our current cost and the upstream distance
00160                 // is less than our current distance, then choose the
00161                 // upstream link as the replier link.
00162                 struct lms_ctl *c = (struct lms_ctl *)pkt->accessdata ();
00163                 c->hop_cnt_++;
00164 #ifdef LMS_DEBUG
00165 printf ("%s LMS_SRC_REFRESH from iface %d, cost %d, ttl %d\n\n",
00166 uname_, h->iface(), c->cost_, lh->ttl_);
00167 #endif
00168                  assert (upstream_iface_ == h->iface());
00169 
00170                  if (replier_iface_ == h->iface())
00171                         {
00172                         if (replier_cost_ != c->cost_)
00173                                 replier_cost_ = c->cost_;
00174                         if (replier_dist_ != c->hop_cnt_)
00175                                 replier_dist_ = c->hop_cnt_;
00176                         }
00177                  else if (replier_cost_ >= c->cost_ && replier_dist_ > c->hop_cnt_)
00178                         {
00179 #ifdef LMS_DEBUG
00180 printf ("%s chose upstream replier\n\n", uname_);
00181 #endif
00182                         replier_cost_ = c->cost_;
00183                         replier_dist_ = c->hop_cnt_;
00184                         replier_iface_ = h->iface();
00185                         if (h->iface() < 0)
00186                                 {
00187                                 replier_ = pkt2agent (pkt);
00188                                 downstream_lms_.addr_ = LMS_NOADDR;
00189                                 downstream_lms_.port_ = LMS_NOPORT;
00190                                 }
00191                         else    {
00192                                 replier_ = iface2link (h->iface());
00193                                 downstream_lms_.addr_ = LMS_NOADDR;
00194                                 downstream_lms_.port_ = LMS_NOPORT;
00195                                 }
00196                         if (--lh->ttl_ > 0)
00197                                 target_->recv (pkt);
00198                         else    Packet::free(pkt);
00199                         return;
00200                         }
00201                  break;
00202            }
00203            
00204          case LMS_REFRESH:
00205              {
00206                  struct lms_ctl *c = (struct lms_ctl *)pkt->accessdata ();
00207                  c->hop_cnt_++;
00208 #ifdef LMS_DEBUG
00209 printf("%s replier iface %d, h->iface %d, replier cost %d, c cost %d\n",
00210   uname_, replier_iface_, h->iface(), replier_cost_, c->cost_);
00211 printf("replier_= %d, down_addr_= %d, down_port_ = %d\n\n",
00212   replier_, downstream_lms_.addr_, downstream_lms_.port_);
00213 #endif     
00214 
00215                  // update from existing replier
00216                  // adjust cost and distance and send upstream
00217                  if (replier_iface_ == h->iface())
00218                  {
00219                      if (replier_cost_ != c->cost_)
00220                          replier_cost_ = c->cost_;
00221                      if (replier_dist_ != c->hop_cnt_)
00222                          replier_dist_ = c->hop_cnt_;
00223 #ifdef LMS_DEBUG
00224 printf ("%s REPLIER_UPDATE iface %d, cost %d, hops %d\n\n",
00225    uname_, replier_iface_, replier_cost_, replier_dist_);
00226 #endif
00227 
00228     
00229                      if (h->iface() < 0)
00230                      {
00231                          replier_ = pkt2agent (pkt);
00232                          have_rcvr_ = 1;
00233                          downstream_lms_.addr_ = LMS_NOADDR;
00234                          downstream_lms_.port_ = LMS_NOPORT;
00235                      }
00236                      else
00237                      {  
00238                          // updates our entry for downstream_lms_ 
00239                          downstream_lms_ = c->downstream_lms_;
00240                          // places our own address in downstream_lms_
00241                          // before sending it upstream.
00242                          c->downstream_lms_.addr_ = addr();
00243                          c->downstream_lms_.port_ = port();
00244                          
00245                          replier_ = NULL;
00246                      }
00247                      send_upstream (pkt);
00248                  }
00249 
00250                  // found a better replier
00251                  else if (replier_cost_ > c->cost_)
00252                  {
00253                      replier_cost_ = c->cost_;
00254                      replier_dist_ = c->hop_cnt_;
00255                      replier_iface_ = h->iface();
00256                      
00257                      if (h->iface() < 0)
00258                      {
00259                          replier_ = pkt2agent (pkt);
00260                          have_rcvr_ = 1;
00261                          downstream_lms_.addr_ = LMS_NOADDR;
00262                          downstream_lms_.port_ = LMS_NOPORT;
00263                      }
00264                      else
00265                      {  
00266                          // updates our entry for downstream_lms_ 
00267                          downstream_lms_ = c->downstream_lms_;
00268                          // places our own address in downstream_lms_
00269                          // before sending it upstream.
00270                          c->downstream_lms_.addr_ = addr();
00271                          c->downstream_lms_.port_ = port();
00272                                 
00273                          replier_ = NULL;                                    
00274                      }
00275 #ifdef LMS_DEBUG
00276 printf ("%s REPLIER iface %d, cost %d, hops %d\n\n",
00277   uname_, replier_iface_, replier_cost_, replier_dist_);
00278 #endif
00279                      send_upstream (pkt);
00280                  }
00281                  else
00282                      Packet::free(pkt);
00283                  return;
00284              }
00285 
00286          case LMS_REQ:
00287              if (replier_iface_ == h->iface())
00288                  send_upstream (pkt);
00289              else {
00290                  // fill turning point info, but only
00291                  // if no-one else has done so already.
00292                  if (lh->tp_addr_ == LMS_NOADDR)
00293                         {
00294                                         lh->tp_addr_  = addr();
00295                                         lh->tp_port_  = port();
00296                                         lh->tp_iface_ = h->iface_;
00297                         }
00298                  send2replier (pkt);
00299                 }
00300              break;
00301 
00302          case LMS_DMCAST:
00303              {
00304 #ifdef LMS_DEBUG
00305 a2 = lh->src_;
00306 printf ("LMS-DMCAST at agent %s, iface %d src %d:%d group %d\n\n",
00307   uname_, lh->tp_iface_, a2>>8, a2&0xff, lh->group_);
00308 #endif
00309                  hdr_ip* iph = HDR_IP(pkt);
00310                  NsObject* tgt = iface2link (lh->tp_iface_);
00311 
00312                  if (tgt)
00313                  {
00314                      packet_t t = h->ptype();
00315                      iph->saddr() = lh->src_;
00316                      iph->daddr() = lh->group_;
00317                      tgt->recv (pkt);
00318                  }
00319                  else   {
00320                      printf ("FATAL: %s no such iface %d\n", uname_, lh->tp_iface_);
00321                      abort ();
00322                  }
00323              }
00324              break;
00325              
00326          case LMS_LEAVE:
00327              {
00328                  
00329                  if (replier_iface_ == h->iface())
00330                  {
00331                      downstream_lms_.addr_ = LMS_NOADDR;
00332                      downstream_lms_.port_ = LMS_NOPORT;
00333                      
00334                      replier_ = NULL;
00335                      replier_cost_ = 1000000;
00336                      replier_iface_ = LMS_NOIFACE;
00337                      send_upstream (pkt);
00338                  }
00339                  else
00340                      Packet::free(pkt);
00341                  
00342                  return;
00343              }
00344              
00345          case LMS_SETUP:
00346              {
00347                  
00348                  if (h->iface() < 0)
00349                  {
00350                      // Source is attached - mark it as the replier
00351                      replier_ = pkt2agent (pkt);
00352                      replier_iface_ = h->iface();
00353                      downstream_lms_.addr_ = LMS_NOADDR;
00354                      downstream_lms_.port_ = LMS_NOPORT;
00355                      
00356                      replier_cost_ = 0;         // XXX
00357                      
00358 #ifdef LMS_DEBUG
00359 printf ("%s REPLIER iface %d, cost %d\n\n", uname_, replier_iface_, replier_cost_);
00360 #endif
00361 
00362                  }
00363                  upstream_lms_ = lh->from_;
00364                  upstream_iface_ = h->iface();
00365                  
00366 #ifdef LMS_DEBUG    
00367 printf ("%s upstream %d\n\n", uname_, upstream_lms_);
00368 #endif
00369  
00370                  lh->from_ = addr();
00371                  Tcl::instance().evalf("[%s set node_] agent %d", name(), port());
00372                  target_->recv (pkt);
00373                  return;
00374              }
00375              
00376          case LMS_SPM:
00377              {
00378                  if (upstream_lms_ < 0)
00379                  {
00380                      struct lms_spm *spm = (struct lms_spm *)pkt->accessdata ();
00381                      nsaddr_t adr = spm->spm_path_;
00382 #ifdef LMS_DEBUG    
00383 printf ("%s LMS_SPM seqno %d, upstream %d:%d\n\n",
00384   uname_, spm->spm_seqno_, adr>>8, adr&0xff);
00385 #endif
00386 
00387                       if (upstream_lms_ != spm->spm_path_)
00388                           upstream_lms_ = spm->spm_path_;
00389 
00390                       spm->spm_path_ = addr();
00391                  }
00392                  break;
00393              }
00394              
00395          case LMS_LINKS:
00396              {
00397                  Tcl& tcl = Tcl::instance();
00398                  char wrk[64];
00399                  int  n1 = lh->from();
00400                  int  n2 = addr();
00401                  
00402                  if (n1 != n2 && !have_rcvr_)
00403                  {
00404                      sprintf (wrk, "lappend tree_links {%d %d}", n1, n2);
00405                      tcl.eval (wrk);
00406                  }
00407                  
00408                  lh->from_ = addr();
00409                  target_->recv (pkt);
00410                  return;
00411              }
00412          default:
00413              printf ("FATAL: %s uknown LMS packet type: %d\n", uname_, lh->type());
00414              abort ();
00415      }          
00416 }
00417 
00418      
00419 
00420 int LmsAgent::command (int argc, const char*const* argv)
00421 {
00422         Tcl& tcl = Tcl::instance();
00423 
00424         if (argc == 6) {
00425                 if (strcmp(argv[1], "send-lms") == 0) {
00426                         Packet* pkt = allocpkt();
00427                         hdr_lms* ph = HDR_LMS(pkt);
00428 
00429                         ph->type() = atoi (argv[2]);
00430                         ph->from() = atoi(argv[3]);
00431                         ph->src() = atoi(argv[4]);
00432                         ph->group() = atoi(argv[5]);
00433 
00434                         send(pkt, 0);
00435                         return (TCL_OK);
00436                 }
00437         }
00438         
00439         return (Agent::command(argc, argv));
00440 }
00441 
00442 
00443 /*
00444  * Given an interface number,
00445  * return its outgoing link
00446  */
00447 NsObject* LmsAgent::iface2link (int iface)
00448 {
00449         Tcl&    tcl = Tcl::instance();
00450         char    wrk[64];
00451 
00452         sprintf (wrk, "[%s set node_] ifaceGetOutLink %d", name (), iface);
00453         tcl.evalc (wrk);
00454         char* result = tcl.result ();
00455 
00456 #ifdef LMS_DEBUG
00457 printf ("[iface2link] agent %s\n", result);
00458 #endif
00459 
00460         NsObject* obj = (NsObject*)TclObject::lookup(result);
00461         return (obj);
00462 }
00463 
00464 /*
00465  * Given a packet, determine which agent sent it
00466  * Agent MUST be attached to this node
00467  */
00468 NsObject* LmsAgent::pkt2agent (Packet *pkt)
00469 {
00470         Tcl&            tcl = Tcl::instance();
00471         char            wrk[64];
00472         char            *result;
00473         int             port;
00474         NsObject*       agent;
00475         hdr_ip*         ih = HDR_IP(pkt);
00476         nsaddr_t        src = ih->saddr();
00477 
00478         port = ih->sport();
00479 
00480         sprintf (wrk, "[%s set node_] agent %d", name (), port);
00481         tcl.evalc (wrk);
00482         result = tcl.result ();
00483 
00484 #ifdef LMS_DEBUG
00485 printf ("[pkt2agent] port %d, agent %s\n", port, result);
00486 #endif
00487 
00488         agent = (NsObject*)TclObject::lookup (result);
00489         return (agent);
00490 }
00491 
00492 void LmsAgent::send_upstream (Packet *p)
00493 {
00494         if (upstream_lms_ < 0)
00495                 {
00496                 printf ("FATAL: %s upstream_lms_ not set!\n", uname_);
00497                 abort ();
00498                 }
00499         hdr_ip* ih = HDR_IP(p);
00500         hdr_lms* lh = HDR_LMS(p);
00501 
00502         lh->from_ = addr();
00503         ih->daddr() = upstream_lms_;
00504         target_->recv(p);
00505 }
00506 
00507 
00508 void LmsAgent::send_downstream (Packet *p)
00509 {
00510     if (downstream_lms_.addr_ < 0)
00511     {
00512         printf ("FATAL: %s downstream_lms_ not set!\n", uname_);
00513         abort ();
00514     }
00515     hdr_ip* ih = HDR_IP(p);
00516     hdr_lms* lh = HDR_LMS(p);
00517     
00518     lh->from_ = addr();
00519     ih->daddr() = downstream_lms_.addr_;
00520     ih->dport() = downstream_lms_.port_;
00521     
00522     target_->recv(p);
00523 }

Generated on Tue Apr 20 12:14:22 2004 for NS2.26SourcesOriginal by doxygen 1.3.3