Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

diffusion.cc

Go to the documentation of this file.
00001 // 
00002 // diffusion.cc  : Main Diffusion program
00003 // authors       : Chalermek Intanagonwiwat and Fabio Silva
00004 //
00005 // Copyright (C) 2000-2002 by the University of Southern California
00006 // $Id: diffusion.cc,v 1.7 2002/11/26 22:45:38 haldar Exp $
00007 //
00008 // This program is free software; you can redistribute it and/or
00009 // modify it under the terms of the GNU General Public License,
00010 // version 2, as published by the Free Software Foundation.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License along
00018 // with this program; if not, write to the Free Software Foundation, Inc.,
00019 // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00020 //
00021 //
00022 
00023 #include "diffusion.hh"
00024 
00025 #ifndef NS_DIFFUSION
00026 DiffusionCoreAgent *agent;
00027 #endif // !NS_DIFFUSION
00028 
00029 class HashEntry {
00030 public:
00031   bool dummy;
00032 
00033   HashEntry() { 
00034     dummy  = false;
00035   }
00036 };
00037 
00038 class NeighborEntry {
00039 public:
00040   int32_t id;
00041   struct timeval tmv;
00042 
00043   NeighborEntry(int _id) : id(_id)
00044   {
00045     GetTime(&tmv);
00046   }
00047 };
00048 
00049 int NeighborsTimeoutTimer::expire()
00050 {
00051   agent_->neighborsTimeout();
00052 
00053   return 0;
00054 }
00055 
00056 int FilterTimeoutTimer::expire()
00057 {
00058   agent_->filterTimeout();
00059 
00060   return 0;
00061 }
00062 
00063 int DiffusionStopTimer::expire()
00064 {
00065   agent_->timeToStop();
00066 #ifndef NS_DIFFUSION
00067   exit(0);
00068 #endif // !NS_DIFFUSION
00069 
00070   // Never gets here !
00071   return 0;
00072 }
00073 
00074 void DiffusionCoreAgent::timeToStop()
00075 {
00076   FILE *outfile = NULL;
00077 
00078   if (global_debug_level > DEBUG_SOME_DETAILS){
00079 #ifdef NS_DIFFUSION
00080     outfile = fopen("/tmp/diffusion.out", "a");
00081 #else
00082     outfile = fopen("/tmp/diffusion.out", "w");
00083 #endif // NS_DIFFUSION
00084 
00085     if (outfile == NULL){
00086       DiffPrint(DEBUG_ALWAYS ,"Diffusion Error: Can't create /tmp/diffusion.out\n");
00087       return;
00088     }
00089   }
00090 
00091 #ifdef STATS
00092   stats_->printStats(stdout);
00093   if (outfile)
00094     stats_->printStats(outfile);
00095 #  ifndef WIRED
00096 #     ifdef USE_RPC
00097   rpcstats_->printStats(stdout);
00098   if (outfile)
00099     rpcstats_->printStats(outfile);
00100 #     endif // USE_RPC
00101 #  endif // WIRED
00102 #endif // STATS
00103 
00104   if (outfile)
00105     fclose(outfile);
00106 }
00107 
00108 #ifndef NS_DIFFUSION
00109 
00110 void signal_handler(int p)
00111 {
00112   agent->timeToStop();
00113   exit(0);
00114 }
00115 
00116 void DiffusionCoreAgent::usage()
00117 {
00118 #ifdef IO_LOG
00119   DiffPrint(DEBUG_ALWAYS, "Usage: diffusion [-d debug] [-f filename] [-t stoptime] [-v] [-h] [-l] [-p port]\n\n");
00120 #else
00121   DiffPrint(DEBUG_ALWAYS, "Usage: diffusion [-d debug] [-f filename] [-t stoptime] [-v] [-h] [-p port]\n\n");
00122 #endif // IO_LOG
00123   DiffPrint(DEBUG_ALWAYS, "\t-d - Sets debug level (0-10)\n");
00124   DiffPrint(DEBUG_ALWAYS, "\t-t - Schedule diffusion to exit after stoptime seconds\n");
00125   DiffPrint(DEBUG_ALWAYS, "\t-f - Uses filename as the config file\n");
00126   DiffPrint(DEBUG_ALWAYS, "\t-v - Prints diffusion version\n");
00127   DiffPrint(DEBUG_ALWAYS, "\t-h - Prints this information\n");
00128   DiffPrint(DEBUG_ALWAYS, "\t-p - Sets diffusion port to port\n");
00129 #ifdef IO_LOG
00130   DiffPrint(DEBUG_ALWAYS, "\t-l - Turns on i/o logging\n");
00131 #endif // IO_LOG
00132 
00133   DiffPrint(DEBUG_ALWAYS, "\n");
00134 
00135   exit(0);
00136 }
00137 
00138 void DiffusionCoreAgent::run()
00139 {
00140   DeviceList::iterator device_itr;
00141   DiffPacket in_pkt;
00142   fd_set fds;
00143   //  DiffusionEvent *e;
00144   bool flag;
00145   int status, max_sock, fd;
00146   struct timeval tv;
00147 
00148   // Main Select Loop
00149   while (1){
00150 
00151     // Wait for incoming packets
00152     FD_ZERO(&fds);
00153     max_sock = 0;
00154 
00155     // Figure out how much time to wait
00156     timers_manager_->nextTimerTime(&tv);
00157     if (tv.tv_sec == 0 && tv.tv_usec == 0){
00158       // Timer has expired !
00159       timers_manager_->executeAllExpiredTimers();
00160       continue;
00161     }
00162 
00163     for (device_itr = in_devices_.begin();
00164          device_itr != in_devices_.end(); ++device_itr){
00165       (*device_itr)->addInFDS(&fds, &max_sock);
00166     }
00167 
00168     status = select(max_sock+1, &fds, NULL, NULL, &tv);
00169 
00170     if (status == 0){
00171       // We process all expired timers
00172       timers_manager_->executeAllExpiredTimers();
00173     }
00174 
00175     // Check for new packets
00176     if (status > 0){
00177       do{
00178         flag = false;
00179         for (device_itr = in_devices_.begin();
00180              device_itr != in_devices_.end(); ++device_itr){
00181           fd = (*device_itr)->checkInFDS(&fds);
00182           if (fd != 0){
00183             // Message waiting
00184             in_pkt = (*device_itr)->recvPacket(fd);
00185 
00186             if (in_pkt)
00187               recvPacket(in_pkt);
00188 
00189             // Clear this fd
00190             FD_CLR(fd, &fds);
00191             status--;
00192             flag = true;
00193           }
00194         }
00195       } while ((status > 0) && (flag == true));
00196     }
00197 
00198     // This should not happen
00199     if (status < 0){
00200       DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
00201     }
00202   }
00203 }
00204 #endif // !NS_DIFFUSION
00205 
00206 void DiffusionCoreAgent::neighborsTimeout()
00207 {
00208   struct timeval tmv;
00209   NeighborEntry *neighbor_entry;
00210   NeighborList::iterator neighbor_itr;
00211 
00212   DiffPrint(DEBUG_MORE_DETAILS, "Neighbors Timeout !\n");
00213 
00214   GetTime(&tmv);
00215 
00216   neighbor_itr = neighbor_list_.begin();
00217 
00218   while(neighbor_itr != neighbor_list_.end()){
00219     neighbor_entry = *neighbor_itr;
00220     if (tmv.tv_sec > neighbor_entry->tmv.tv_sec + NEIGHBORS_TIMEOUT){
00221       // This neighbor expired
00222       neighbor_itr = neighbor_list_.erase(neighbor_itr);
00223       delete neighbor_entry;
00224     }
00225     else{
00226       neighbor_itr++;
00227     }
00228   }
00229 }
00230 
00231 void DiffusionCoreAgent::filterTimeout()
00232 {
00233   struct timeval tmv;
00234   FilterEntry *filter_entry;
00235   FilterList::iterator filter_itr;
00236 
00237   DiffPrint(DEBUG_MORE_DETAILS, "Filter Timeout !\n");
00238 
00239   GetTime(&tmv);
00240 
00241   filter_itr = filter_list_.begin();
00242 
00243   while(filter_itr != filter_list_.end()){
00244     filter_entry = *filter_itr;
00245     if (tmv.tv_sec > filter_entry->tmv_.tv_sec + FILTER_TIMEOUT){
00246 
00247       // This filter expired
00248       DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d timed out !\n",
00249                 filter_entry->agent_, filter_entry->handle_,
00250                 filter_entry->priority_);
00251       filter_itr = filter_list_.erase(filter_itr);
00252       delete filter_entry;
00253     }
00254     else{
00255       filter_itr++;
00256     }
00257   }
00258 }
00259 
00260 void DiffusionCoreAgent::sendMessage(Message *msg)
00261 {
00262   Tcl_HashEntry *tcl_hash_entry;
00263   unsigned int key[2];
00264   Message *send_message;
00265   
00266   send_message = new Message(DIFFUSION_VERSION, msg->msg_type_, diffusion_port_,
00267                              0, 0, msg->pkt_num_, msg->rdm_id_,
00268                              msg->next_hop_, 0);
00269 
00270   send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);
00271   send_message->num_attr_ = send_message->msg_attr_vec_->size();
00272   send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);
00273 
00274   // Adjust message size for logging and check hash
00275   key[0] = msg->pkt_num_;
00276   key[1] = msg->rdm_id_;
00277   tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);
00278   if (tcl_hash_entry)
00279     msg->new_message_ = 0;
00280   else
00281     msg->new_message_ = 1;
00282 
00283   send_message->new_message_ = msg->new_message_;
00284 
00285   // Check if message goes to an agent or the network
00286   if (msg->next_port_){
00287     // Message goes to an agent
00288     send_message->last_hop_ = LOCALHOST_ADDR;
00289 
00290     // If it's a local message, it has to go to a local agent
00291     if (send_message->next_hop_ != LOCALHOST_ADDR){
00292       DiffPrint(DEBUG_ALWAYS, "Error: Message destination is a local agent but next_hop != LOCALHOST_ADDR !\n");
00293       delete send_message;
00294       return;
00295     }
00296 
00297     // Send the message to the agent specified
00298     sendMessageToLibrary(send_message, msg->next_port_);
00299   }
00300   else{
00301     // Message goes to the network
00302     send_message->last_hop_ = my_id_;
00303 
00304 #ifdef STATS
00305     stats_->logOutgoingMessage(send_message);
00306 #endif // STATS
00307 
00308     // Add message to the hash table      
00309     if (tcl_hash_entry == NULL)
00310       putHash(key[0], key[1]);
00311     else
00312       DiffPrint(DEBUG_DETAILS, "Message being sent is an old message !\n");
00313 
00314     // Send Message
00315     sendMessageToNetwork(send_message);
00316   }
00317 
00318   delete send_message;
00319 }
00320 
00321 void DiffusionCoreAgent::forwardMessage(Message *msg, FilterEntry *filter_entry)
00322 {
00323   RedirectMessage *original_hdr;
00324   NRAttribute *original_header_attr;
00325   Message *send_message;
00326 
00327   // Create an attribute with the original header
00328   original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
00329                                      msg->source_port_, msg->data_len_,
00330                                      msg->num_attr_, msg->rdm_id_,
00331                                      msg->pkt_num_, msg->next_hop_,
00332                                      msg->last_hop_, filter_entry->handle_,
00333                                      msg->next_port_);
00334 
00335   original_header_attr = OriginalHdrAttr.make(NRAttribute::IS,
00336                                               (void *)original_hdr,
00337                                               sizeof(RedirectMessage));
00338 
00339   send_message = new Message(DIFFUSION_VERSION, REDIRECT, diffusion_port_, 0,
00340                              0, pkt_count_, random_id_, LOCALHOST_ADDR, my_id_);
00341 
00342   // Increment pkt_counter
00343   pkt_count_++;
00344 
00345   // Duplicate the message's attributes
00346   send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);
00347   
00348   // Add the extra attribute
00349   send_message->msg_attr_vec_->push_back(original_header_attr);
00350   send_message->num_attr_ = send_message->msg_attr_vec_->size();
00351   send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);
00352 
00353   sendMessageToLibrary(send_message, filter_entry->agent_);
00354 
00355   delete send_message;
00356   delete original_hdr;
00357 }
00358 
00359 #ifndef NS_DIFFUSION
00360 void DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id)
00361 {
00362   DiffPacket out_pkt = NULL;
00363   struct hdr_diff *dfh;
00364   int len;
00365   char *pos;
00366 
00367   out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00368   dfh = HDR_DIFF(out_pkt);
00369 
00370   pos = (char *) out_pkt;
00371   pos = pos + sizeof(struct hdr_diff);
00372 
00373   len = PackAttrs(msg->msg_attr_vec_, pos);
00374 
00375   LAST_HOP(dfh) = htonl(msg->last_hop_);
00376   NEXT_HOP(dfh) = htonl(msg->next_hop_);
00377   DIFF_VER(dfh) = msg->version_;
00378   MSG_TYPE(dfh) = msg->msg_type_;
00379   DATA_LEN(dfh) = htons(len);
00380   PKT_NUM(dfh) = htonl(msg->pkt_num_);
00381   RDM_ID(dfh) = htonl(msg->rdm_id_);
00382   NUM_ATTR(dfh) = htons(msg->num_attr_);
00383   SRC_PORT(dfh) = htons(msg->source_port_);
00384 
00385   sendPacketToLibrary(out_pkt, sizeof(struct hdr_diff) + len, agent_id);
00386 
00387   delete [] out_pkt;
00388 }
00389 #else
00390 void DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id)
00391 {
00392   Message *send_message;
00393   DeviceList::iterator device_itr;
00394   int len;
00395 
00396   send_message = CopyMessage(msg);
00397   len = CalculateSize(send_message->msg_attr_vec_);
00398   len = len + sizeof(struct hdr_diff);
00399 
00400   for (device_itr = local_out_devices_.begin();
00401        device_itr != local_out_devices_.end(); ++device_itr){
00402     (*device_itr)->sendPacket((DiffPacket) send_message, len, agent_id);
00403   }
00404 }
00405 #endif // !NS_DIFFUSION
00406 
00407 #ifndef NS_DIFFUSION
00408 void DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
00409 {
00410   DiffPacket out_pkt = NULL;
00411   struct hdr_diff *dfh;
00412   int len;
00413   char *pos;
00414 
00415   out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00416   dfh = HDR_DIFF(out_pkt);
00417 
00418   pos = (char *) out_pkt;
00419   pos = pos + sizeof(struct hdr_diff);
00420 
00421   len = PackAttrs(msg->msg_attr_vec_, pos);
00422 
00423   LAST_HOP(dfh) = htonl(msg->last_hop_);
00424   NEXT_HOP(dfh) = htonl(msg->next_hop_);
00425   DIFF_VER(dfh) = msg->version_;
00426   MSG_TYPE(dfh) = msg->msg_type_;
00427   DATA_LEN(dfh) = htons(len);
00428   PKT_NUM(dfh) = htonl(msg->pkt_num_);
00429   RDM_ID(dfh) = htonl(msg->rdm_id_);
00430   NUM_ATTR(dfh) = htons(msg->num_attr_);
00431   SRC_PORT(dfh) = htons(msg->source_port_);
00432 
00433   sendPacketToNetwork(out_pkt, sizeof(struct hdr_diff) + len, msg->next_hop_);
00434 
00435   delete [] out_pkt;
00436 }
00437 #else
00438 void DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
00439 {
00440   Message *send_message;
00441   int len;
00442   int32_t dst;
00443   DeviceList::iterator device_itr;
00444 
00445   send_message = CopyMessage(msg);
00446   len = CalculateSize(send_message->msg_attr_vec_);
00447   len = len + sizeof(struct hdr_diff);
00448   dst = send_message->next_hop_;
00449 
00450   for (device_itr = out_devices_.begin();
00451        device_itr != out_devices_.end(); ++device_itr){
00452     (*device_itr)->sendPacket((DiffPacket) send_message, len, dst);
00453   }
00454 }
00455 #endif // !NS_DIFFUSION
00456 
00457 void DiffusionCoreAgent::sendPacketToLibrary(DiffPacket pkt, int len,
00458                                              u_int16_t dst)
00459 {
00460   DeviceList::iterator device_itr;
00461 
00462   for (device_itr = local_out_devices_.begin();
00463        device_itr != local_out_devices_.end(); ++device_itr){
00464     (*device_itr)->sendPacket(pkt, len, dst);
00465   }
00466 }
00467 
00468 void DiffusionCoreAgent::sendPacketToNetwork(DiffPacket pkt, int len, int dst)
00469 {
00470   DeviceList::iterator device_itr;
00471 
00472   for (device_itr = out_devices_.begin();
00473        device_itr != out_devices_.end(); ++device_itr){
00474     (*device_itr)->sendPacket(pkt, len, dst);
00475   }
00476 }
00477 
00478 void DiffusionCoreAgent::updateNeighbors(int id)
00479 {
00480   NeighborList::iterator neighbor_itr;
00481   NeighborEntry *neighbor_entry;
00482 
00483   if (id == LOCALHOST_ADDR || id == my_id_)
00484     return;
00485 
00486   for (neighbor_itr = neighbor_list_.begin();
00487        neighbor_itr != neighbor_list_.end(); ++neighbor_itr){
00488     if ((*neighbor_itr)->id == id)
00489       break;
00490   }
00491 
00492   if (neighbor_itr == neighbor_list_.end()){
00493     // This is a new neighbor
00494     neighbor_entry = new NeighborEntry(id);
00495     neighbor_list_.push_front(neighbor_entry);
00496   }
00497   else{
00498     // Just update the neighbor timeout
00499     GetTime(&((*neighbor_itr)->tmv));
00500   }
00501 }
00502 
00503 FilterEntry * DiffusionCoreAgent::findFilter(int16_t handle, u_int16_t agent)
00504 {
00505   FilterList::iterator filter_itr;
00506   FilterEntry *filter_entry;
00507 
00508   for (filter_itr = filter_list_.begin();
00509        filter_itr != filter_list_.end(); ++filter_itr){
00510     filter_entry = *filter_itr;
00511     if (handle != filter_entry->handle_ || agent != filter_entry->agent_)
00512       continue;
00513 
00514     // Found
00515     return filter_entry;
00516   }
00517   return NULL;
00518 }
00519 
00520 FilterEntry * DiffusionCoreAgent::deleteFilter(int16_t handle, u_int16_t agent)
00521 {
00522   FilterList::iterator filter_itr = filter_list_.begin();
00523   FilterEntry *filter_entry = NULL;
00524 
00525   while (filter_itr != filter_list_.end()){
00526     filter_entry = *filter_itr;
00527     if (handle == filter_entry->handle_ && agent == filter_entry->agent_){
00528       filter_list_.erase(filter_itr);
00529       break;
00530     }
00531     filter_entry = NULL;
00532     filter_itr++;
00533   }
00534   return filter_entry;
00535 }
00536 
00537 bool DiffusionCoreAgent::addFilter(NRAttrVec *attrs, u_int16_t agent,
00538                                    int16_t handle, u_int16_t priority)
00539 {
00540   FilterList::iterator filter_itr;
00541   FilterEntry *filter_entry;
00542 
00543   filter_itr = filter_list_.begin();
00544   while (filter_itr != filter_list_.end()){
00545     filter_entry = *filter_itr;
00546     if (filter_entry->priority_ == priority)
00547       return false;
00548     filter_itr++;
00549   }
00550 
00551   filter_entry = new FilterEntry(handle, priority, agent);
00552 
00553   // Copy the Attribute Vector
00554   filter_entry->filter_attrs_ = CopyAttrs(attrs);
00555 
00556   // Add this filter to the filter list
00557   filter_list_.push_back(filter_entry);
00558 
00559   return true;
00560 }
00561 
00562 FilterList::iterator DiffusionCoreAgent::findMatchingFilter(NRAttrVec *attrs,
00563                                                             FilterList::iterator filter_itr)
00564 {
00565   FilterEntry *filter_entry;
00566 
00567   for (;filter_itr != filter_list_.end(); ++filter_itr){
00568     filter_entry = *filter_itr;
00569 
00570     if (OneWayMatch(filter_entry->filter_attrs_, attrs)){
00571       // That's a match !
00572       break;
00573     }
00574   }
00575   return filter_itr;
00576 }
00577 
00578 bool DiffusionCoreAgent::restoreOriginalHeader(Message *msg)
00579 {
00580   NRAttrVec::iterator attr_itr = msg->msg_attr_vec_->begin();
00581   NRSimpleAttribute<void *> *original_header_attr = NULL;
00582   RedirectMessage *original_hdr;
00583 
00584   // Find original Header
00585   original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
00586                                                    attr_itr, &attr_itr);
00587   if (!original_header_attr){
00588     DiffPrint(DEBUG_ALWAYS, "Error: DiffusionCoreAgent::ProcessControlMessage couldn't find the OriginalHdrAttr !\n");
00589     return false;
00590   }
00591 
00592   // Restore original Header
00593   original_hdr = (RedirectMessage *) original_header_attr->getVal();
00594 
00595   msg->msg_type_ = original_hdr->msg_type_;
00596   msg->source_port_ = original_hdr->source_port_;
00597   msg->pkt_num_ = original_hdr->pkt_num_;
00598   msg->rdm_id_ = original_hdr->rdm_id_;
00599   msg->next_hop_ = original_hdr->next_hop_;
00600   msg->last_hop_ = original_hdr->last_hop_;
00601   msg->new_message_ = original_hdr->new_message_;
00602   msg->num_attr_ = original_hdr->num_attr_;
00603   msg->data_len_ = original_hdr->data_len_;
00604   msg->next_port_ = original_hdr->next_port_;
00605 
00606   // Delete attribute from original set
00607   msg->msg_attr_vec_->erase(attr_itr);
00608   delete original_header_attr;
00609 
00610   return true;
00611 }
00612 
00613 FilterList * DiffusionCoreAgent::getFilterList(NRAttrVec *attrs)
00614 {
00615   FilterList *matching_filter_list = new FilterList;
00616   FilterList::iterator known_filters_itr, filter_list_itr;
00617   FilterEntry *matching_filter_entry, *filter_entry;
00618 
00619   // We need to come up with a list of filters to call
00620   // F1 will be called before F2 if F1->priority > F2->priority
00621 
00622   known_filters_itr = findMatchingFilter(attrs, filter_list_.begin());
00623 
00624   while (known_filters_itr != filter_list_.end()){
00625     // We have a match !
00626     matching_filter_entry = *known_filters_itr;
00627 
00628     for (filter_list_itr = matching_filter_list->begin();
00629          filter_list_itr != matching_filter_list->end(); ++filter_list_itr){
00630       filter_entry = *filter_list_itr;
00631 
00632       // Figure out where to insert 
00633       if (matching_filter_entry->priority_ > filter_entry->priority_)
00634         break;
00635     }
00636 
00637     // Insert matching filter in the list
00638     matching_filter_list->insert(filter_list_itr, matching_filter_entry);
00639 
00640     // Continue the search
00641     known_filters_itr++;
00642     known_filters_itr = findMatchingFilter(attrs, known_filters_itr);
00643   }
00644   return matching_filter_list;
00645 }
00646 
00647 u_int16_t DiffusionCoreAgent::getNextFilterPriority(int16_t handle,
00648                                                     u_int16_t priority,
00649                                                     u_int16_t agent)
00650 {
00651   FilterList::iterator filter_itr;
00652   FilterEntry *filter_entry;
00653 
00654   if ((priority < FILTER_MIN_PRIORITY) ||
00655       (priority > FILTER_KEEP_PRIORITY))
00656     return FILTER_INVALID_PRIORITY;
00657 
00658   if (priority < FILTER_KEEP_PRIORITY)
00659     return (priority - 1);
00660 
00661   filter_itr = filter_list_.begin();
00662 
00663   while (filter_itr != filter_list_.end()){
00664     filter_entry = *filter_itr;
00665 
00666     if ((filter_entry->handle_ == handle) && (filter_entry->agent_ == agent)){
00667       // Found this filter
00668       return (filter_entry->priority_ - 1);
00669     }
00670 
00671     filter_itr++;
00672   }
00673 
00674   return FILTER_INVALID_PRIORITY;
00675 }
00676 
00677 void DiffusionCoreAgent::processMessage(Message *msg)
00678 {
00679   FilterList *filter_list;
00680   FilterList::iterator filter_list_itr;
00681   FilterEntry *filter_entry;
00682 
00683   filter_list = getFilterList(msg->msg_attr_vec_);
00684 
00685   // Ok, we have a list of Filters to call. Send this message
00686   // to the first filter on this list
00687   if (filter_list->size() > 0){
00688     filter_list_itr = filter_list->begin();
00689     filter_entry = *filter_list_itr;
00690 
00691     forwardMessage(msg, filter_entry);
00692     filter_list->clear();
00693   }
00694   delete filter_list;
00695 }
00696 
00697 void DiffusionCoreAgent::processControlMessage(Message *msg)
00698 {
00699   NRSimpleAttribute<void *> *ctrl_msg_attr = NULL;
00700   NRAttrVec::iterator attr_itr;
00701   ControlMessage *control_blob = NULL;
00702   FilterList *filter_list;
00703   FilterList::iterator filter_list_itr;
00704   FilterEntry *filter_entry;
00705   int command, param1, param2;
00706   u_int16_t priority, source_port, new_priority;
00707   int16_t handle;
00708   bool filter_is_last = false;
00709 
00710   // Control messages should not come from other nodes
00711   if (msg->last_hop_ != LOCALHOST_ADDR){
00712     DiffPrint(DEBUG_ALWAYS,
00713               "Error: Received control message from another node !\n");
00714     return;
00715   }
00716 
00717   // Find the control attribute
00718   attr_itr = msg->msg_attr_vec_->begin();
00719   ctrl_msg_attr = ControlMsgAttr.find_from(msg->msg_attr_vec_,
00720                                            attr_itr, &attr_itr);
00721 
00722   if (!ctrl_msg_attr){
00723     // Control message is invalid
00724     DiffPrint(DEBUG_ALWAYS, "Error: Control message received is invalid !\n");
00725     return;
00726   }
00727 
00728   // Extract the control message info
00729   control_blob = (ControlMessage *) ctrl_msg_attr->getVal();
00730   command = control_blob->command_;
00731   param1 = control_blob->param1_;
00732   param2 = control_blob->param2_;
00733 
00734   // Filter API definitions
00735   //
00736   // command = ADD_UPDATE_FILTER
00737   // param1  = priority
00738   // param2  = handle
00739   // attrs   = other attrs specify the filter
00740   // 
00741   // Remarks: If this filter is already present for this module,
00742   //          we don't create a new one. A filter is identified
00743   //          by the handle and the originating agent. The filter
00744   //          gets refreshed if it already exists. If attrs and
00745   //          handle are the same, we update the priority.
00746   //
00747   //
00748   // command = REMOVE_FILTER
00749   // param1  = handle
00750   //
00751   // Remarks: Remove the filter identified by (agent, handle)
00752   //          If it's not found, a warning message is generated.
00753   //
00754   //
00755   // Remarks: Send message from a local App to another App or
00756   //          a neighbor. If agent_id is zero, the packet goes
00757   //          out to the network. Otherwise, it goes to the
00758   //          agent_id located on this node.
00759   //
00760   //
00761   // command = SEND_MESSAGE
00762   // param1  = handle
00763   // param2  = priority
00764   //
00765   // Remarks: Send this message to the next filter or to a local
00766   //          application. We have to assemble the list again
00767   //          and figure out the current agent's position on the
00768   //          list. Then, we send to the next guy. If there is
00769   //          no other filter in the list, we try to send it to
00770   //          the network, if next_hop contains a node address.
00771 
00772   logControlMessage(msg, command, param1, param2);
00773 
00774   // First we remove the control attribute from the message
00775   msg->msg_attr_vec_->erase(attr_itr);
00776   delete ctrl_msg_attr;
00777 
00778   switch(command){
00779   case ADD_UPDATE_FILTER:
00780 
00781     priority = param1;
00782     handle = param2;
00783 
00784     filter_entry = findFilter(handle, msg->source_port_);
00785 
00786     if (filter_entry){
00787       // Filter already present, must be an update message
00788       if (PerfectMatch(filter_entry->filter_attrs_, msg->msg_attr_vec_)){
00789         // Attrs also match, let's update the filter's timeout
00790         GetTime(&(filter_entry->tmv_));
00791 
00792         // Check if the priority has changed...
00793         if (priority == filter_entry->priority_){
00794           // Nothing to do !
00795           DiffPrint(DEBUG_SOME_DETAILS, "Filter %d, %d, %d refreshed.\n",
00796                     filter_entry->agent_, filter_entry->handle_,
00797                     filter_entry->priority_);
00798         }
00799         else{
00800           // Update the priority
00801           DiffPrint(DEBUG_NO_DETAILS,
00802                     "Updated priority of filter %d, %d, %d to %d\n",
00803                     msg->source_port_, handle, filter_entry->priority_, priority);
00804           filter_entry->priority_ = priority;
00805         }
00806 
00807         break;
00808       }
00809       else{
00810         // Filter attributes have changed ! This is not allowed !
00811         DiffPrint(DEBUG_ALWAYS,
00812                   "Filter attributes cannot change during an update !\n");
00813         break;
00814       }
00815     }
00816     else{
00817       // This is a new filter
00818       if (!addFilter(msg->msg_attr_vec_, msg->source_port_, handle, priority)){
00819         DiffPrint(DEBUG_ALWAYS, "Failed to add filter %d, %d, %d\n",
00820                   msg->source_port_, handle, priority);
00821       }
00822       else{
00823         DiffPrint(DEBUG_NO_DETAILS, "Adding filter %d, %d, %d\n",
00824                   msg->source_port_, handle, priority);
00825       }
00826     }
00827 
00828     break;
00829 
00830   case REMOVE_FILTER:
00831 
00832     handle = param1;
00833     filter_entry = deleteFilter(handle, msg->source_port_);
00834     if (filter_entry){
00835       // Filter deleted
00836       DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d deleted.\n",
00837                 filter_entry->agent_, filter_entry->handle_,
00838                 filter_entry->priority_);
00839 
00840       delete filter_entry;
00841     }
00842     else{
00843       DiffPrint(DEBUG_ALWAYS, "Couldn't find filter to delete !\n");
00844     }
00845 
00846     break;
00847 
00848   case SEND_MESSAGE:
00849 
00850     handle = param1;
00851     priority = param2;
00852     source_port = msg->source_port_;
00853 
00854     if (!restoreOriginalHeader(msg))
00855       break;
00856 
00857     new_priority = getNextFilterPriority(handle, priority, source_port);
00858 
00859     if (new_priority == FILTER_INVALID_PRIORITY)
00860       break;
00861 
00862     // Now process the incoming message
00863     filter_list = getFilterList(msg->msg_attr_vec_);
00864 
00865     // Find the filter after the 'current' filter on the list
00866     if (filter_list->size() > 0){
00867       for (filter_list_itr = filter_list->begin();
00868            filter_list_itr != filter_list->end(); ++filter_list_itr){
00869         filter_entry = *filter_list_itr;
00870         if (filter_entry->priority_ <= new_priority){
00871           forwardMessage(msg, filter_entry);
00872           break;
00873         }
00874       }
00875 
00876       if (filter_list_itr == filter_list->end())
00877         filter_is_last = true;
00878 
00879     }
00880     else{
00881       filter_is_last = true;
00882     }
00883 
00884     if (filter_is_last){
00885       // Forward message to the network or the destination application
00886       sendMessage(msg);
00887     }
00888 
00889     filter_list->clear();
00890 
00891     delete filter_list;
00892 
00893     break;
00894 
00895   default:
00896 
00897     DiffPrint(DEBUG_ALWAYS, "Error: Unknown control message received !\n");
00898 
00899     break;
00900   }
00901 }
00902 
00903 void DiffusionCoreAgent::logControlMessage(Message *msg, int command,
00904                                            int param1, int param2)
00905 {
00906   // Logs the incoming message
00907 }
00908 
00909 #ifdef NS_DIFFUSION
00910 DiffusionCoreAgent::DiffusionCoreAgent(DiffRoutingAgent *diffrtg, int nodeid)
00911 {
00912 #else
00913 DiffusionCoreAgent::DiffusionCoreAgent(int argc, char **argv)
00914 {
00915   int opt;
00916   int debug_level;
00917 #endif // NS_DIFFUSION
00918   DeviceList *in_devices, *out_devices, *local_out_devices;
00919   DiffusionIO *device;
00920   TimerCallback *callback;
00921   char *scadds_env;
00922   long stop_time;
00923   struct timeval tv;
00924 #ifdef IO_LOG
00925   IOLog *pseudo_io_device;
00926   bool use_io_log = false;
00927 #endif // IO_LOG
00928 
00929   opterr = 0;
00930   config_file_ = NULL;
00931   stop_time = 0;
00932 
00933   scadds_env = getenv("scadds_addr");
00934   diffusion_port_ = DEFAULT_DIFFUSION_PORT;
00935 
00936 #ifndef NS_DIFFUSION
00937   // Parse command line options
00938   while (1){
00939     opt = getopt(argc, argv, COMMAND_LINE_ARGS);
00940 
00941     switch(opt){
00942 
00943     case 'p': 
00944 
00945       diffusion_port_ = (u_int16_t) atoi(optarg);
00946       if ((diffusion_port_ < 1024) || (diffusion_port_ >= 65535)){
00947         DiffPrint(DEBUG_ALWAYS,
00948                   "Diffusion Error: Port must be between 1024 and 65535 !\n");
00949         exit(-1);
00950       }
00951 
00952       break;
00953 
00954     case 't':
00955 
00956       stop_time = atol(optarg);
00957       if (stop_time <= 0){
00958         DiffPrint(DEBUG_ALWAYS, "Diffusion Error: stop time must be > 0\n");
00959         exit(-1);
00960       }
00961       else{
00962         DiffPrint(DEBUG_ALWAYS, "%s will stop after %ld seconds\n",
00963                   PROGRAM, stop_time);
00964       }
00965 
00966       break;
00967 
00968 #ifdef IO_LOG
00969     case 'l':
00970 
00971       use_io_log = true;
00972 
00973       break;
00974 
00975 #endif // IO_LOG
00976 
00977     case 'h':
00978 
00979       usage();
00980 
00981       break;
00982 
00983     case 'v':
00984 
00985       DiffPrint(DEBUG_ALWAYS, "\n%s %s\n", PROGRAM, RELEASE);
00986       exit(0);
00987 
00988       break;
00989 
00990     case 'd':
00991 
00992       debug_level = atoi(optarg);
00993 
00994       if (debug_level < 1 || debug_level > 10){
00995         DiffPrint(DEBUG_ALWAYS,
00996                   "Error: Debug level outside range or missing !\n");
00997         usage();
00998       }
00999 
01000       global_debug_level = debug_level;
01001 
01002       break;
01003 
01004     case 'f':
01005 
01006       if (!strncasecmp(optarg, "-", 1)){
01007         DiffPrint(DEBUG_ALWAYS, "Error: Parameter is missing !\n");
01008         usage();
01009       }
01010 
01011       config_file_ = strdup(optarg);
01012 
01013       break;
01014 
01015     case '?':
01016 
01017       DiffPrint(DEBUG_ALWAYS,
01018                 "Error: %c isn't a valid option or its parameter is missing !\n",
01019                 optopt);
01020       usage();
01021 
01022       break;
01023 
01024     case ':':
01025 
01026       DiffPrint(DEBUG_ALWAYS, "Parameter missing !\n");
01027       usage();
01028 
01029       break;
01030     }
01031 
01032     if (opt == -1)
01033       break;
01034   }
01035 
01036   if (!config_file_)
01037     config_file_ = strdup(DEFAULT_CONFIG_FILE);
01038 
01039   // Get diffusion ID
01040   if (scadds_env != NULL){
01041     my_id_ = atoi(scadds_env);
01042   }
01043   else{
01044     DiffPrint(DEBUG_ALWAYS,
01045               "Diffusion : scadds_addr not set. Using random id.\n");
01046 
01047     // Generate random ID
01048     do{
01049       GetTime(&tv);
01050       SetSeed(&tv);
01051       my_id_ = GetRand();
01052     }
01053     while(my_id_ == LOCALHOST_ADDR || my_id_ == BROADCAST_ADDR);
01054   }
01055 #else
01056   my_id_ = nodeid;
01057 #endif // !NS_DIFFUSION
01058 
01059   // Initialize variables
01060   lon_ = 0.0;
01061   lat_ = 0.0;
01062 
01063 #ifdef STATS
01064   stats_ = new DiffusionStats(my_id_);
01065 #  ifndef WIRED
01066 #     ifdef USE_RPC
01067   rpcstats_ = new RPCStats(my_id_);
01068 #     endif // USE_RPC
01069 #  endif // !WIRED
01070 #endif // STATS
01071 
01072   GetTime(&tv);
01073   SetSeed(&tv);
01074   pkt_count_ = GetRand();
01075   random_id_ = GetRand();
01076 
01077   Tcl_InitHashTable(&htable_, 2);
01078 
01079   // Initialize EventQueue
01080   timers_manager_ = new TimerManager;
01081 
01082   // Create regular timers
01083   callback = new NeighborsTimeoutTimer(this);
01084   timers_manager_->addTimer(NEIGHBORS_DELAY, callback);
01085 
01086   callback = new FilterTimeoutTimer(this);
01087   timers_manager_->addTimer(FILTER_DELAY, callback);
01088 
01089   if (stop_time > 0){
01090     callback = new DiffusionStopTimer(this);
01091     timers_manager_->addTimer((stop_time * 1000), callback);
01092   }
01093 
01094   GetTime(&tv);
01095 
01096   // Print Initialization message
01097   DiffPrint(DEBUG_ALWAYS, "Diffusion : starting at time %ld:%ld\n",
01098             tv.tv_sec, tv.tv_usec);
01099   DiffPrint(DEBUG_ALWAYS, "Diffusion : Node id = %d\n", my_id_);
01100 
01101   // Initialize diffusion io devices
01102 #ifdef IO_LOG
01103   if (use_io_log){
01104     pseudo_io_device = new IOLog(my_id_);
01105     in_devices_.push_back(pseudo_io_device);
01106     out_devices_.push_back(pseudo_io_device);
01107 
01108     in_devices = &(pseudo_io_device->in_devices_);
01109     out_devices = &(pseudo_io_device->out_devices_);
01110     local_out_devices = &(local_out_devices_);
01111   }
01112   else{
01113     in_devices = &(in_devices_);
01114     out_devices = &(out_devices_);
01115     local_out_devices = &(local_out_devices_);
01116   }
01117 #else
01118   in_devices = &(in_devices_);
01119   out_devices = &(out_devices_);
01120   local_out_devices = &(local_out_devices_);
01121 #endif // IO_LOG
01122 
01123 #ifdef NS_DIFFUSION
01124   device = new LocalApp(diffrtg);
01125   local_out_devices->push_back(device);
01126 
01127   device = new LinkLayerAbs(diffrtg);
01128   out_devices->push_back(device);
01129 #endif // NS_DIFFUSION
01130 
01131 #ifdef UDP
01132   device = new UDPLocal(&diffusion_port_);
01133   in_devices->push_back(device);
01134   local_out_devices->push_back(device);
01135 
01136 #ifdef WIRED
01137   device = new UDPWired(config_file_);
01138   out_devices->push_back(device);
01139 #endif // WIRED
01140 #endif // UDP
01141 
01142 #ifdef USE_RPC
01143   device = new RPCIO();
01144   in_devices->push_back(device);
01145   out_devices->push_back(device);
01146 #endif // USE_RPC
01147 
01148 #ifdef USE_MOTE_NIC
01149   device = new MOTEIO();
01150   in_devices->push_back(device);
01151   out_devices->push_back(device);
01152 #endif // USE_MOTE_NIC
01153 
01154 #ifdef USE_WINSNG2
01155   device = new WINSNG2();
01156   in_devices->push_back(device);
01157   out_devices->push_back(device);
01158 #endif // USE_WINSNG2
01159 }
01160 
01161 HashEntry * DiffusionCoreAgent::getHash(unsigned int pkt_num,
01162                                          unsigned int rdm_id)
01163 {
01164   unsigned int key[2];
01165 
01166   key[0] = pkt_num;
01167   key[1] = rdm_id;
01168 
01169   Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
01170 
01171   if (entryPtr == NULL)
01172     return NULL;
01173 
01174   return (HashEntry *)Tcl_GetHashValue(entryPtr);
01175 }
01176 
01177 void DiffusionCoreAgent::putHash(unsigned int pkt_num,
01178                                  unsigned int rdm_id)
01179 {
01180   Tcl_HashEntry *tcl_hash_entry;
01181   HashEntry *hash_entry;
01182   HashList::iterator hash_itr;
01183   unsigned int key[2];
01184   int new_hash_key;
01185 
01186   if (hash_list_.size() == HASH_TABLE_MAX_SIZE){
01187     // Hash table reached maximum size
01188 
01189     for (int i = 0; ((i < HASH_TABLE_REMOVE_AT_ONCE)
01190                      && (hash_list_.size() > 0)); i++){
01191       hash_itr = hash_list_.begin();
01192       tcl_hash_entry = *hash_itr;
01193       hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
01194       delete hash_entry;
01195       hash_list_.erase(hash_itr);
01196       Tcl_DeleteHashEntry(tcl_hash_entry);
01197     }
01198   }
01199 
01200   key[0] = pkt_num;
01201   key[1] = rdm_id;
01202 
01203   tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *)key, &new_hash_key);
01204 
01205   if (new_hash_key == 0){
01206     DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n");
01207     return;
01208   }
01209 
01210   hash_entry = new HashEntry;
01211 
01212   Tcl_SetHashValue(tcl_hash_entry, hash_entry);
01213   hash_list_.push_back(tcl_hash_entry);
01214 }
01215 
01216 #ifndef NS_DIFFUSION
01217 void DiffusionCoreAgent::recvPacket(DiffPacket pkt)
01218 {
01219   struct hdr_diff *dfh = HDR_DIFF(pkt);
01220   Message *rcv_message = NULL;
01221   int8_t version, msg_type;
01222   u_int16_t data_len, num_attr, source_port;
01223   int32_t rdm_id, pkt_num, next_hop, last_hop;   
01224 
01225   // Read header
01226   version = DIFF_VER(dfh);
01227   msg_type = MSG_TYPE(dfh);
01228   source_port = ntohs(SRC_PORT(dfh));
01229   pkt_num = ntohl(PKT_NUM(dfh));
01230   rdm_id = ntohl(RDM_ID(dfh));
01231   num_attr = ntohs(NUM_ATTR(dfh));
01232   next_hop = ntohl(NEXT_HOP(dfh));
01233   last_hop = ntohl(LAST_HOP(dfh));
01234   data_len = ntohs(DATA_LEN(dfh));
01235 
01236   // Packet is good, create a message
01237   rcv_message = new Message(version, msg_type, source_port, data_len,
01238                             num_attr, pkt_num, rdm_id, next_hop, last_hop);
01239 
01240   // Read all attributes into the Message structure
01241   rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
01242 
01243   // Process the incoming message
01244   recvMessage(rcv_message);
01245 
01246   // Don't forget to message when we're done
01247   delete rcv_message;
01248 
01249   delete [] pkt;
01250 }
01251 #endif // !NS_DIFFUSION
01252 
01253 void DiffusionCoreAgent::recvMessage(Message *msg)
01254 {
01255   Tcl_HashEntry *tcl_hash_entry;
01256   unsigned int key[2];
01257 
01258   // Check version
01259   if (msg->version_ != DIFFUSION_VERSION)
01260     return;
01261 
01262   // Check for ID conflict
01263   if (msg->last_hop_ == my_id_){
01264     DiffPrint(DEBUG_ALWAYS, "Error: A diffusion ID conflict has been detected !\n");
01265     exit(-1);
01266   }
01267 
01268   // Address filtering
01269   if ((msg->next_hop_ != BROADCAST_ADDR) &&
01270       (msg->next_hop_ != LOCALHOST_ADDR) &&
01271       (msg->next_hop_ != my_id_))
01272     return;
01273 
01274   // Control Messages are unique and don't go to the hash
01275   if (msg->msg_type_ != CONTROL){
01276     // Hash table keeps info about packets
01277   
01278     key[0] = msg->pkt_num_;
01279     key[1] = msg->rdm_id_;
01280     tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);
01281 
01282     if (tcl_hash_entry != NULL){
01283       DiffPrint(DEBUG_DETAILS, "Received old message !\n");
01284       msg->new_message_ = 0;
01285     }
01286     else{
01287       // Add message to the hash table
01288       putHash(key[0], key[1]);
01289       msg->new_message_ = 1;
01290     }
01291   }
01292 
01293 #ifdef STATS
01294   stats_->logIncomingMessage(msg);
01295 #endif // STATS
01296 
01297   // Check if it's a control of a regular message
01298   if (msg->msg_type_ == CONTROL)
01299     processControlMessage(msg);
01300   else
01301     processMessage(msg);
01302 }
01303 
01304 #ifndef USE_SINGLE_ADDRESS_SPACE
01305 int main(int argc, char **argv)
01306 {
01307   agent = new DiffusionCoreAgent(argc, argv);
01308 
01309   signal(SIGINT, signal_handler);
01310 
01311   agent->run();
01312 
01313   return 0;
01314 }
01315 #endif // !USE_SINGLE_ADDRESS_SPACE

Generated on Tue Apr 20 12:14:14 2004 for NS2.26SourcesOriginal by doxygen 1.3.3