Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

dr.cc

Go to the documentation of this file.
00001 //
00002 // dr.cc           : Diffusion Routing Class
00003 // authors         : John Heidemann and Fabio Silva
00004 //
00005 // Copyright (C) 2000-2002 by the University of Southern California
00006 // $Id: dr.cc,v 1.14 2002/11/26 22:45:38 haldar Exp $
00007 //
00008 // This program is free software; you can redistribute it and/or
00009 // modify it under the terms of the GNU General Public License,
00010 // version 2, as published by the Free Software Foundation.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License along
00018 // with this program; if not, write to the Free Software Foundation, Inc.,
00019 // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00020 //
00021 //
00022 
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025 
00026 #include "dr.hh"
00027 
00028 class CallbackEntry {
00029 public:
00030   NR::Callback *cb_;
00031   NR::handle subscription_handle_;
00032 
00033   CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) :
00034     cb_(cb), subscription_handle_(subscription_handle) {};
00035 };
00036 
00037 class HandleEntry {
00038 public:
00039   handle hdl_;
00040   bool valid_;
00041   NRAttrVec *attrs_;
00042   NR::Callback *cb_;
00043   struct timeval exploratory_time_;
00044 
00045   HandleEntry()
00046   {
00047     GetTime(&exploratory_time_);
00048     valid_ = true;
00049     cb_ = NULL;
00050   };
00051 
00052   ~HandleEntry(){
00053 
00054     ClearAttrs(attrs_);
00055     delete attrs_;
00056   };
00057 };
00058 
00059 int InterestCallback::expire()
00060 {
00061   int retval;
00062 
00063   // Call the interestTimeout function
00064   retval = drt_->interestTimeout(handle_entry_);
00065 
00066   if (retval < 0)
00067     delete this;
00068 
00069   return retval;
00070 }
00071 
00072 int FilterKeepaliveCallback::expire()
00073 {
00074   int retval;
00075 
00076   // Call the filterTimeout function
00077   retval = drt_->filterKeepaliveTimeout(filter_entry_);
00078 
00079   if (retval < 0)
00080     delete this;
00081 
00082   return retval;
00083 }
00084 
00085 int OldAPITimer::expire()
00086 {
00087   int retval;
00088 
00089   // Call the callback function with the provided API
00090   retval = cb_->expire(0, p_);
00091 
00092   if (retval < 0)
00093     delete this;
00094 
00095   return retval;
00096 }
00097 
00098 #ifdef NS_DIFFUSION
00099 class DiffEventQueue;
00100 
00101 int DiffusionRouting::getAgentId(int id) {
00102   if (id != -1)
00103     agent_id_ = id;
00104   return agent_id_;
00105 }
00106 
00107 NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) {
00108   return(new DiffusionRouting(port, da));
00109 }
00110 #else
00111 NR *dr = NULL;
00112 
00113 #ifdef USE_THREADS
00114 void * ReceiveThread(void *dr)
00115 {
00116   // Never returns
00117   ((DiffusionRouting *)dr)->run(true, WAIT_FOREVER);
00118 
00119   return NULL;
00120 }
00121 #endif // USE_THREADS
00122 
00123 NR * NR::createNR(u_int16_t port)
00124 {
00125   // Create Diffusion Routing Class
00126   if (dr)
00127     return dr;
00128 
00129   dr = new DiffusionRouting(port);
00130 
00131 #ifdef USE_THREADS
00132   int retval;
00133   pthread_t thread;
00134 
00135   // Fork a thread for receiving Messages
00136   retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr);
00137 
00138   if (retval){
00139     DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n");
00140     exit(-1);
00141   }
00142 #endif // USE_THREADS
00143 
00144   return dr;
00145 }
00146 #endif // NS_DIFFUSION
00147 
00148 void GetLock(pthread_mutex_t *mutex)
00149 {
00150 #ifdef USE_THREADS
00151   pthread_mutex_lock(mutex);
00152 #endif // USE_THREADS
00153 }
00154 
00155 void ReleaseLock(pthread_mutex_t *mutex)
00156 {
00157 #ifdef USE_THREADS
00158   pthread_mutex_unlock(mutex);
00159 #endif // USE_THREADS
00160 }
00161 
00162 #ifdef NS_DIFFUSION
00163 DiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da)
00164 #else
00165 DiffusionRouting::DiffusionRouting(u_int16_t port)
00166 #endif // NS_DIFFUSION
00167 {
00168   struct timeval tv;
00169   DiffusionIO *device;
00170 
00171   // Initialize basic stuff
00172   next_handle_ = 1;
00173   GetTime(&tv);
00174   SetSeed(&tv);
00175   pkt_count_ = GetRand();
00176   random_id_ = GetRand();
00177   agent_id_ = 0;
00178 
00179   if (port == 0)
00180     port = DEFAULT_DIFFUSION_PORT;
00181 
00182   diffusion_port_ = port;
00183 
00184   // Initialize timer manager
00185   timers_manager_ = new TimerManager;
00186 
00187   // Initialize input device
00188 #ifdef NS_DIFFUSION
00189   device = new NsLocal(da);
00190   local_out_devices_.push_back(device);
00191 #endif // NS_DIFFUSION
00192 
00193 #ifdef UDP
00194   device = new UDPLocal(&agent_id_);
00195   in_devices_.push_back(device);
00196   local_out_devices_.push_back(device);
00197 #endif // UDP
00198 
00199   // Print initialization message
00200   DiffPrint(DEBUG_ALWAYS,
00201             "Diffusion Routing Agent initializing... Agent Id = %d\n",
00202             agent_id_);
00203 
00204 #ifdef USE_THREADS
00205   // Initialize Semaphores
00206   dr_mtx_ = new pthread_mutex_t;
00207   pthread_mutex_init(dr_mtx_, NULL);
00208 #endif // USE_THREADS
00209 }
00210 
00211 DiffusionRouting::~DiffusionRouting()
00212 {
00213   HandleList::iterator itr;
00214   HandleEntry *current;
00215 
00216   // Delete all Handles
00217   for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){
00218     current = *itr;
00219     delete current;
00220   }
00221 
00222   for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){
00223     current = *itr;
00224     delete current;
00225   }
00226 }
00227 
00228 handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb)
00229 {
00230   HandleEntry *my_handle;
00231   NRAttribute *scope_attr;
00232   TimerCallback *timer_callback;
00233 
00234   // Get lock first
00235   GetLock(dr_mtx_);
00236 
00237   // Check the published attributes
00238   if (!checkSubscription(subscribe_attrs)){
00239     DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n");
00240     ReleaseLock(dr_mtx_);
00241     return FAIL;
00242   }
00243 
00244   // Create and Initialize the handle_entry structute
00245   my_handle = new HandleEntry;
00246   my_handle->hdl_ = next_handle_;
00247   next_handle_++;
00248   my_handle->cb_ = (NR::Callback *) cb;
00249   sub_list_.push_back(my_handle);
00250 
00251   // Copy the attributes   
00252   my_handle->attrs_ = CopyAttrs(subscribe_attrs);
00253 
00254   // For subscriptions, scope is global if not specified
00255   if (!hasScope(subscribe_attrs)){
00256     scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE);
00257     my_handle->attrs_->push_back(scope_attr);
00258   }
00259 
00260   // Create Interest Timer and add it to the queue
00261   timer_callback = new InterestCallback(this, my_handle);
00262   timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback);
00263 
00264   // Release lock
00265   ReleaseLock(dr_mtx_);
00266 
00267   return my_handle->hdl_;
00268 }
00269 
00270 int DiffusionRouting::unsubscribe(handle subscription_handle)
00271 {
00272   HandleEntry *my_handle = NULL;
00273 
00274   // Get the lock first
00275   GetLock(dr_mtx_);
00276 
00277   my_handle = findHandle(subscription_handle, &sub_list_);
00278   if (!my_handle){
00279     // Handle doesn't exist, return FAIL
00280     ReleaseLock(dr_mtx_);
00281     return FAIL;
00282   }
00283 
00284   // Handle will be destroyed when next interest timeout happens
00285   my_handle->valid_ = false;
00286 
00287   // Release the lock
00288   ReleaseLock(dr_mtx_);
00289 
00290   return OK;
00291 }
00292 
00293 handle DiffusionRouting::publish(NRAttrVec *publish_attrs)
00294 {
00295   HandleEntry *my_handle;
00296   NRAttribute *scope_attr;
00297 
00298   // Get the lock first
00299   GetLock(dr_mtx_);
00300 
00301   // Check the published attributes
00302   if (!checkPublication(publish_attrs)){
00303     DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n");
00304     ReleaseLock(dr_mtx_);
00305     return FAIL;
00306   }
00307 
00308   // Create and Initialize the handle_entry structute
00309   my_handle = new HandleEntry;
00310   my_handle->hdl_ = next_handle_;
00311   next_handle_++;
00312   pub_list_.push_back(my_handle);
00313 
00314   // Copy the attributes
00315   my_handle->attrs_ = CopyAttrs(publish_attrs);
00316 
00317   // For publications, scope is local if not specified
00318   if (!hasScope(publish_attrs)){
00319     scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE);
00320     my_handle->attrs_->push_back(scope_attr);
00321   }
00322 
00323   // Release the lock
00324   ReleaseLock(dr_mtx_);
00325 
00326   return my_handle->hdl_;
00327 }
00328 
00329 int DiffusionRouting::unpublish(handle publication_handle)
00330 {
00331   HandleEntry *my_handle = NULL;
00332 
00333   // Get the lock first
00334   GetLock(dr_mtx_);
00335 
00336   my_handle = removeHandle(publication_handle, &pub_list_);
00337   if (!my_handle){
00338     // Handle doesn't exist, return FAIL
00339     ReleaseLock(dr_mtx_);
00340     return FAIL;
00341   }
00342 
00343   // Free structures
00344   delete my_handle;
00345 
00346   // Release the lock
00347   ReleaseLock(dr_mtx_);
00348 
00349   return OK;
00350 }
00351 
00352 int DiffusionRouting::send(handle publication_handle,
00353                            NRAttrVec *send_attrs)
00354 {
00355   Message *my_message;
00356   HandleEntry *my_handle;
00357   int8_t send_message_type = DATA;
00358   struct timeval current_time;
00359 
00360   // Get the lock first
00361   GetLock(dr_mtx_);
00362 
00363   // Get attributes associated with handle
00364   my_handle = findHandle(publication_handle, &pub_list_);
00365   if (!my_handle){
00366     ReleaseLock(dr_mtx_);
00367     return FAIL;
00368   }
00369 
00370   // Check the send attributes
00371   if (!checkSend(send_attrs)){
00372     DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the send attributes !\n");
00373     ReleaseLock(dr_mtx_);
00374     return FAIL;
00375   }
00376 
00377   // Check if it is time to send another exploratory data message
00378   GetTime(&current_time);
00379 
00380   if (TimevalCmp(&current_time, &(my_handle->exploratory_time_)) >= 0){
00381 
00382     // Check if it is a push data message or a regular data message
00383     if (isPushData(my_handle->attrs_)){
00384       // Push data message
00385 
00386       // Update time for the next push exploratory message
00387       GetTime(&(my_handle->exploratory_time_));
00388       my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY;
00389 
00390       send_message_type = PUSH_EXPLORATORY_DATA;
00391     }
00392     else{
00393       // Regular data message
00394 
00395       // Update time for the next exploratory message
00396       GetTime(&(my_handle->exploratory_time_));
00397       my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY;
00398     
00399       send_message_type = EXPLORATORY_DATA;
00400     }
00401   }
00402 
00403   // Initialize message structure
00404   my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_,
00405                            0, 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00406                            LOCALHOST_ADDR);
00407   // Increment pkt_counter
00408   pkt_count_++;
00409 
00410   // First, we duplicate the 'publish' attributes
00411   my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_);
00412 
00413   // Now, we add the send attributes
00414   AddAttrs(my_message->msg_attr_vec_, send_attrs);
00415 
00416   // Compute the total number and size of the joined attribute sets
00417   my_message->num_attr_ = my_message->msg_attr_vec_->size();
00418   my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_);
00419 
00420   // Release the lock
00421   ReleaseLock(dr_mtx_);
00422 
00423   // Send Packet
00424   sendMessageToDiffusion(my_message);
00425 
00426   delete my_message;
00427 
00428   return OK;
00429 }
00430 
00431 handle DiffusionRouting::addFilter(NRAttrVec *filter_attrs, u_int16_t priority,
00432                                    FilterCallback *cb)
00433 {
00434   FilterEntry *filter_entry;
00435   NRAttrVec *attrs;
00436   NRAttribute *ctrl_msg_attr;
00437   ControlMessage *control_blob;
00438   Message *my_message;
00439   TimerCallback *timer_callback;
00440 
00441   // Check parameters
00442   if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){
00443     DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !\n");
00444     return FAIL;
00445   }
00446 
00447   // Get lock first
00448   GetLock(dr_mtx_);
00449 
00450   // Create and Initialize the handle_entry structute
00451   filter_entry = new FilterEntry(next_handle_, priority, agent_id_);
00452   next_handle_++;
00453   filter_entry->cb_ = (FilterCallback *) cb;
00454   filter_list_.push_back(filter_entry);
00455 
00456   // Copy attributes (keep them for matching later)
00457   filter_entry->filter_attrs_ = CopyAttrs(filter_attrs);
00458 
00459   // Copy the attributes (and add the control attr)
00460   attrs = CopyAttrs(filter_attrs);
00461   control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00462                                     priority, filter_entry->handle_);
00463 
00464   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00465                                       (void *)control_blob,
00466                                       sizeof(ControlMessage));
00467 
00468   attrs->push_back(ctrl_msg_attr);
00469 
00470   // Initialize message structure
00471   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00472                            0, pkt_count_, random_id_, LOCALHOST_ADDR,
00473                            LOCALHOST_ADDR);
00474 
00475   // Increment pkt_counter
00476   pkt_count_++;
00477 
00478   // Add attributes to the message
00479   my_message->msg_attr_vec_ = attrs;
00480   my_message->num_attr_ = attrs->size();
00481   my_message->data_len_ = CalculateSize(attrs);
00482 
00483   // Release the lock
00484   ReleaseLock(dr_mtx_);
00485 
00486   // Send Packet
00487   sendMessageToDiffusion(my_message);
00488 
00489   // Add keepalive timer to the event queue
00490   timer_callback = new FilterKeepaliveCallback(this, filter_entry);
00491   timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback);
00492 
00493   // Delete message, attribute set and controlblob
00494   delete my_message;
00495   delete control_blob;
00496 
00497   return filter_entry->handle_;
00498 }
00499 
00500 int DiffusionRouting::removeFilter(handle filter_handle)
00501 {
00502   FilterEntry *filter_entry = NULL;
00503   ControlMessage *control_blob;
00504   NRAttribute *ctrl_msg_attr;
00505   NRAttrVec *attrs;
00506   Message *my_message;
00507 
00508   // Get lock first
00509   GetLock(dr_mtx_);
00510 
00511   filter_entry = findFilter(filter_handle);
00512   if (!filter_entry){
00513     // Handle doesn't exist, return FAIL
00514     ReleaseLock(dr_mtx_);
00515     return FAIL;
00516   }
00517 
00518   control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0);
00519 
00520   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00521                                       (void *)control_blob,
00522                                       sizeof(ControlMessage));
00523 
00524   attrs = new NRAttrVec;
00525   attrs->push_back(ctrl_msg_attr);
00526 
00527   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00528                            0, pkt_count_, random_id_, LOCALHOST_ADDR,
00529                            LOCALHOST_ADDR);
00530 
00531   // Increment pkt_counter
00532   pkt_count_++;
00533 
00534   // Add attributes to the message
00535   my_message->msg_attr_vec_ = attrs;
00536   my_message->num_attr_ = attrs->size();
00537   my_message->data_len_ = CalculateSize(attrs);
00538 
00539   // Handle will be destroyed when next keepalive timer happens
00540   filter_entry->valid_ = false;
00541 
00542   // Send Packet
00543   sendMessageToDiffusion(my_message);
00544 
00545   // Release the lock
00546   ReleaseLock(dr_mtx_);
00547 
00548   // Delete message
00549   delete my_message;
00550   delete control_blob;
00551 
00552   return OK;
00553 }
00554 
00555 handle DiffusionRouting::addTimer(int timeout, TimerCallback *callback)
00556 {
00557   return (timers_manager_->addTimer(timeout, callback));
00558 }
00559 
00560 handle DiffusionRouting::addTimer(int timeout, void *p, TimerCallbacks *cb)
00561 {
00562   TimerCallback *callback;
00563 
00564   callback = new OldAPITimer(cb, p);
00565 
00566   return (addTimer(timeout, callback));
00567 }
00568 
00569 bool DiffusionRouting::removeTimer(handle hdl)
00570 {
00571   return (timers_manager_->removeTimer(hdl));
00572 }
00573 
00574 int DiffusionRouting::filterKeepaliveTimeout(FilterEntry *filter_entry)
00575 {
00576   FilterEntry *my_entry = NULL;
00577   ControlMessage *control_blob;
00578   NRAttribute *ctrl_msg_attr;
00579   NRAttrVec *attrs;
00580   Message *my_message;
00581 
00582   // Acquire lock first
00583   GetLock(dr_mtx_);
00584 
00585   if (filter_entry->valid_){
00586     // Send keepalive
00587     control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00588                                       filter_entry->priority_,
00589                                       filter_entry->handle_);
00590 
00591     ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00592                                         (void *)control_blob,
00593                                         sizeof(ControlMessage));
00594 
00595     attrs = CopyAttrs(filter_entry->filter_attrs_);
00596     attrs->push_back(ctrl_msg_attr);
00597 
00598     my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00599                              0, pkt_count_, random_id_, LOCALHOST_ADDR,
00600                              LOCALHOST_ADDR);
00601 
00602     // Increment pkt_counter
00603     pkt_count_++;
00604 
00605     // Add attributes to the message
00606     my_message->msg_attr_vec_ = attrs;
00607     my_message->num_attr_ = attrs->size();
00608     my_message->data_len_ = CalculateSize(attrs);
00609 
00610     // Send Message
00611     sendMessageToDiffusion(my_message);
00612 
00613     delete my_message;
00614     delete control_blob;
00615 
00616     // Release lock
00617     ReleaseLock(dr_mtx_);
00618 
00619     // Reschedule another filter keepalive timer in event queue
00620     return (FILTER_KEEPALIVE_DELAY);
00621   }
00622   else{
00623     // Filter was removed
00624     my_entry = deleteFilter(filter_entry->handle_);
00625 
00626     // We should have removed the correct handle
00627     if (my_entry != filter_entry){
00628       DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !\n");
00629       exit(-1);
00630     }
00631 
00632     delete my_entry;
00633 
00634     // Release lock
00635     ReleaseLock(dr_mtx_);
00636 
00637     return -1;
00638   }
00639 }
00640 
00641 int DiffusionRouting::interestTimeout(HandleEntry *handle_entry)
00642 {
00643   HandleEntry *my_handle = NULL;
00644   Message *my_message;
00645 
00646   // Acquire lock first
00647   GetLock(dr_mtx_);
00648 
00649   if (handle_entry->valid_){
00650     // Send the interest message if entry is still valid
00651     my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0,
00652                              0, pkt_count_, random_id_, LOCALHOST_ADDR,
00653                              LOCALHOST_ADDR);
00654 
00655     // Increment pkt_counter
00656     pkt_count_++;
00657 
00658     // Add attributes to the message
00659     my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_);
00660     my_message->num_attr_ = handle_entry->attrs_->size();
00661     my_message->data_len_ = CalculateSize(handle_entry->attrs_);
00662 
00663     // Send Packet
00664     sendMessageToDiffusion(my_message);
00665 
00666     delete my_message;
00667 
00668     // Release lock
00669     ReleaseLock(dr_mtx_);
00670 
00671     // Reschedule this timer in the queue
00672     return (INTEREST_DELAY +
00673             (int) ((INTEREST_JITTER * (GetRand() * 1.0 / RAND_MAX)) -
00674                    (INTEREST_JITTER / 2)));
00675   }
00676   else{
00677     // Interest was canceled. Just delete it from the handle_list
00678     my_handle = removeHandle(handle_entry->hdl_, &sub_list_);
00679 
00680     // We should have removed the correct handle
00681     if (my_handle != handle_entry){
00682       DiffPrint(DEBUG_ALWAYS,
00683                 "Error: interestTimeout: Handles should match !\n");
00684       exit(-1);
00685     }
00686 
00687     delete my_handle;
00688 
00689     // Release lock
00690     ReleaseLock(dr_mtx_);
00691 
00692     // Delete timer from the queue
00693     return -1;
00694   }
00695 }
00696 
00697 int DiffusionRouting::sendMessage(Message *msg, handle h,
00698                                   u_int16_t priority)
00699 {
00700   RedirectMessage *original_hdr;
00701   NRAttribute *original_attr, *ctrl_msg_attr;
00702   ControlMessage *control_blob;
00703   NRAttrVec *attrs;
00704   Message *my_message;
00705 
00706   if ((priority < FILTER_MIN_PRIORITY) ||
00707       (priority > FILTER_KEEP_PRIORITY))
00708     return FAIL;
00709 
00710   // Create an attribute with the original header
00711   original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
00712                                      msg->source_port_, msg->data_len_,
00713                                      msg->num_attr_, msg->rdm_id_,
00714                                      msg->pkt_num_, msg->next_hop_,
00715                                      msg->last_hop_, 0,
00716                                      msg->next_port_);
00717 
00718   original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr,
00719                                        sizeof(RedirectMessage));
00720 
00721   // Create the attribute with the control message
00722   control_blob = new ControlMessage(SEND_MESSAGE, h, priority);
00723 
00724   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob,
00725                                       sizeof(ControlMessage));
00726 
00727   // Copy Attributes and add originalAttr and controlAttr
00728   attrs = CopyAttrs(msg->msg_attr_vec_);
00729   attrs->push_back(original_attr);
00730   attrs->push_back(ctrl_msg_attr);
00731 
00732   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00733                            0, pkt_count_, random_id_, LOCALHOST_ADDR,
00734                            LOCALHOST_ADDR);
00735 
00736   // Increment pkt_counter
00737   pkt_count_++;
00738 
00739   // Add attributes to the message
00740   my_message->msg_attr_vec_ = attrs;
00741   my_message->num_attr_ = attrs->size();
00742   my_message->data_len_ = CalculateSize(attrs);
00743 
00744   // Send Packet
00745   sendMessageToDiffusion(my_message);
00746 
00747   delete my_message;
00748   delete control_blob;
00749   delete original_hdr;
00750 
00751   return OK;
00752 }
00753 
00754 #ifndef NS_DIFFUSION
00755 void DiffusionRouting::doIt()
00756 {
00757   run(true, WAIT_FOREVER);
00758 }
00759 
00760 void DiffusionRouting::doOne(long timeout)
00761 {
00762   run(false, timeout);
00763 }
00764 
00765 void DiffusionRouting::run(bool wait_condition, long max_timeout)
00766 {
00767   DeviceList::iterator itr;
00768   int status, max_sock, fd;
00769   bool flag;
00770   DiffPacket in_pkt;
00771   fd_set fds;
00772   struct timeval tv;
00773   struct timeval max_tv;
00774 
00775   do{
00776     FD_ZERO(&fds);
00777     max_sock = 0;
00778 
00779     // Set the maximum timeout value
00780     max_tv.tv_sec = (int) (max_timeout / 1000);
00781     max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000);
00782 
00783     for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
00784       (*itr)->addInFDS(&fds, &max_sock);
00785     }
00786 
00787     // Check for the next timer
00788     timers_manager_->nextTimerTime(&tv);
00789 
00790     if (tv.tv_sec == MAXVALUE){
00791       // If we don't have any timers, we wait for POLLING_INTERVAL
00792       if (max_timeout == WAIT_FOREVER){
00793         tv.tv_sec = POLLING_INTERVAL;
00794         tv.tv_usec = 0;
00795       }
00796       else{
00797         tv = max_tv;
00798       }
00799     }
00800     else{
00801       if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){
00802         // max_timeout value is smaller than next timer's time, so we
00803         // use themax_timeout value instead
00804         tv = max_tv;
00805       }
00806     }
00807 
00808     status = select(max_sock+1, &fds, NULL, NULL, &tv);
00809 
00810     if (status == 0){
00811       // Process all timers that have expired
00812       timers_manager_->executeAllExpiredTimers();
00813     }
00814 
00815     if (status > 0){
00816       do{
00817         flag = false;
00818         for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
00819           fd = (*itr)->checkInFDS(&fds);
00820           if (fd != 0){
00821             // Message waiting
00822             in_pkt = (*itr)->recvPacket(fd);
00823             recvPacket(in_pkt);
00824 
00825             // Clear this fd
00826             FD_CLR(fd, &fds);
00827             status--;
00828             flag = true;
00829           }
00830         }
00831       } while ((status > 0) && (flag == true));
00832     }
00833     else
00834       if (status < 0){
00835         DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
00836       }
00837   } while (wait_condition);
00838 }
00839 
00840 #endif // NS_DIFFUSION
00841 
00842 #ifndef NS_DIFFUSION
00843 void DiffusionRouting::sendMessageToDiffusion(Message *msg)
00844 {
00845   DiffPacket out_pkt = NULL;
00846   struct hdr_diff *dfh;
00847   char *pos;
00848   int len;
00849 
00850   out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00851   dfh = HDR_DIFF(out_pkt);
00852 
00853   pos = (char *) out_pkt;
00854   pos = pos + sizeof(struct hdr_diff);
00855 
00856   len = PackAttrs(msg->msg_attr_vec_, pos);
00857 
00858   LAST_HOP(dfh) = htonl(msg->last_hop_);
00859   NEXT_HOP(dfh) = htonl(msg->next_hop_);
00860   DIFF_VER(dfh) = msg->version_;
00861   MSG_TYPE(dfh) = msg->msg_type_;
00862   DATA_LEN(dfh) = htons(len);
00863   PKT_NUM(dfh) = htonl(msg->pkt_num_);
00864   RDM_ID(dfh) = htonl(msg->rdm_id_);
00865   NUM_ATTR(dfh) = htons(msg->num_attr_);
00866   SRC_PORT(dfh) = htons(msg->source_port_);
00867 
00868   sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_);
00869 
00870   delete [] out_pkt;
00871 }
00872 #else
00873 void DiffusionRouting::sendMessageToDiffusion(Message *msg)
00874 {
00875   Message *my_msg;
00876   DeviceList::iterator itr;
00877   int len;
00878 
00879   my_msg = CopyMessage(msg);
00880   len = CalculateSize(my_msg->msg_attr_vec_);
00881   len = len + sizeof(struct hdr_diff);
00882 
00883   for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
00884     (*itr)->sendPacket((DiffPacket) my_msg, len, diffusion_port_);
00885   }
00886 }
00887 #endif // !NS_DIFFUSION
00888 
00889 void DiffusionRouting::sendPacketToDiffusion(DiffPacket pkt, int len, int dst)
00890 {
00891   DeviceList::iterator itr;
00892 
00893   for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
00894     (*itr)->sendPacket(pkt, len, dst);
00895   }
00896 }
00897 
00898 #ifndef NS_DIFFUSION
00899 void DiffusionRouting::recvPacket(DiffPacket pkt)
00900 {
00901   struct hdr_diff *dfh = HDR_DIFF(pkt);
00902   Message *rcv_message = NULL;
00903   int8_t version, msg_type;
00904   u_int16_t data_len, num_attr, source_port;
00905   int32_t pkt_num, rdm_id, next_hop, last_hop;
00906 
00907   // Read header
00908   version = DIFF_VER(dfh);
00909   msg_type = MSG_TYPE(dfh);
00910   source_port = ntohs(SRC_PORT(dfh));
00911   pkt_num = ntohl(PKT_NUM(dfh));
00912   rdm_id = ntohl(RDM_ID(dfh));
00913   num_attr = ntohs(NUM_ATTR(dfh));
00914   next_hop = ntohl(NEXT_HOP(dfh));
00915   last_hop = ntohl(LAST_HOP(dfh));
00916   data_len = ntohs(DATA_LEN(dfh));
00917 
00918   // Create a message structure from the incoming packet
00919   rcv_message = new Message(version, msg_type, source_port, data_len,
00920                             num_attr, pkt_num, rdm_id, next_hop, last_hop);
00921 
00922   // Read all attributes into the Message structure
00923   rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
00924 
00925   // Process the incoming message
00926   recvMessage(rcv_message);
00927 
00928   // We are done
00929   delete rcv_message;
00930   delete [] pkt;
00931 }
00932 #endif // !NS_DIFFUSION
00933 
00934 void DiffusionRouting::recvMessage(Message *msg)
00935 {
00936   // Check version
00937   if (msg->version_ != DIFFUSION_VERSION)
00938     return;
00939 
00940   // Check destination
00941   if (msg->next_hop_ != LOCALHOST_ADDR)
00942     return;
00943 
00944   // Process the incoming message
00945   if (msg->msg_type_ == REDIRECT)
00946     processControlMessage(msg);
00947   else
00948     processMessage(msg);
00949 }
00950 
00951 void DiffusionRouting::processControlMessage(Message *msg)
00952 {
00953   NRSimpleAttribute<void *> *original_header_attr = NULL;
00954   NRAttrVec::iterator place = msg->msg_attr_vec_->begin();
00955   RedirectMessage *original_header;
00956   FilterEntry *entry;
00957   handle my_handle;
00958 
00959   // Find the attribute containing the original packet header
00960   original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
00961                                                    place, &place);
00962   if (!original_header_attr){
00963     DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !\n");
00964     return;
00965   }
00966 
00967   // Restore original message header
00968   original_header = (RedirectMessage *) original_header_attr->getVal();
00969   my_handle = original_header->handle_;
00970   msg->msg_type_ = original_header->msg_type_;
00971   msg->source_port_ = original_header->source_port_;
00972   msg->pkt_num_ = original_header->pkt_num_;
00973   msg->rdm_id_ = original_header->rdm_id_;
00974   msg->next_hop_ = original_header->next_hop_;
00975   msg->last_hop_ = original_header->last_hop_;
00976   msg->num_attr_ = original_header->num_attr_;
00977   msg->new_message_ = original_header->new_message_;
00978   msg->next_port_ = original_header->next_port_;
00979 
00980   // Delete attribute from the original set
00981   msg->msg_attr_vec_->erase(place);
00982   delete original_header_attr;
00983 
00984   // Find the right callback
00985   GetLock(dr_mtx_);
00986 
00987   entry = findFilter(my_handle);
00988   if (entry && entry->valid_){
00989     // Just to confirm
00990     if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){
00991       ReleaseLock(dr_mtx_);
00992       entry->cb_->recv(msg, my_handle);
00993       return;
00994     }
00995     else{
00996       DiffPrint(DEBUG_ALWAYS,
00997                 "Warning: Filter doesn't match incoming message's attributes !\n");
00998     }
00999   }
01000   else{
01001     DiffPrint(DEBUG_IMPORTANT,
01002               "Report: Cannot find filter (possibly deleted ?)\n");
01003   }
01004 
01005   ReleaseLock(dr_mtx_);
01006 }
01007 
01008 void DiffusionRouting::processMessage(Message *msg)
01009 {
01010   CallbackList cbl;
01011   CallbackEntry *aux;
01012   HandleList::iterator sub_itr;
01013   CallbackList::iterator cbl_itr;
01014   HandleEntry *entry; 
01015   NRAttrVec *callback_attrs;
01016 
01017   // First, acquire the lock
01018   GetLock(dr_mtx_);
01019 
01020   for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){
01021     entry = *sub_itr;
01022     if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_)))
01023       if (entry->cb_){
01024         aux = new CallbackEntry(entry->cb_, entry->hdl_);
01025         cbl.push_back(aux);
01026       }
01027   }
01028 
01029   // We can release the lock now
01030   ReleaseLock(dr_mtx_);
01031 
01032   // Now we just call all callback functions
01033   for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){
01034     // Copy attributes
01035     callback_attrs = CopyAttrs(msg->msg_attr_vec_);
01036 
01037     // Call app-specific callback function
01038     aux = *cbl_itr;
01039     aux->cb_->recv(callback_attrs, aux->subscription_handle_);
01040     delete aux;
01041 
01042     // Clean up callback attributes
01043     ClearAttrs(callback_attrs);
01044     delete callback_attrs;
01045   }
01046 
01047   // We are done
01048   cbl.clear();
01049 }
01050 
01051 HandleEntry * DiffusionRouting::removeHandle(handle my_handle, HandleList *hl)
01052 {
01053   HandleList::iterator itr;
01054   HandleEntry *entry;
01055 
01056   for (itr = hl->begin(); itr != hl->end(); ++itr){
01057     entry = *itr;
01058     if (entry->hdl_ == my_handle){
01059       hl->erase(itr);
01060       return entry;
01061     }
01062   }
01063   return NULL;
01064 }
01065 
01066 HandleEntry * DiffusionRouting::findHandle(handle my_handle, HandleList *hl)
01067 {
01068   HandleList::iterator itr;
01069   HandleEntry *entry;
01070 
01071   for (itr = hl->begin(); itr != hl->end(); ++itr){
01072     entry = *itr;
01073     if (entry->hdl_ == my_handle)
01074       return entry;
01075   }
01076   return NULL;
01077 }
01078 
01079 FilterEntry * DiffusionRouting::deleteFilter(handle my_handle)
01080 {
01081   FilterList::iterator itr;
01082   FilterEntry *entry;
01083 
01084   for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01085     entry = *itr;
01086     if (entry->handle_ == my_handle){
01087       filter_list_.erase(itr);
01088       return entry;
01089     }
01090   }
01091   return NULL;
01092 }
01093 
01094 FilterEntry * DiffusionRouting::findFilter(handle my_handle)
01095 {
01096   FilterList::iterator itr;
01097   FilterEntry *entry;
01098 
01099   for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01100     entry = *itr;
01101     if (entry->handle_ == my_handle)
01102       return entry;
01103   }
01104   return NULL;
01105 }
01106 
01107 bool DiffusionRouting::hasScope(NRAttrVec *attrs)
01108 {
01109   NRAttribute *temp = NULL;
01110 
01111   temp = NRScopeAttr.find(attrs);
01112   if (temp)
01113     return true;
01114 
01115   return false;
01116 }
01117 
01118 bool DiffusionRouting::checkSubscription(NRAttrVec *attrs)
01119 {
01120   NRSimpleAttribute<int> *nrclass = NULL;
01121   NRSimpleAttribute<int> *nrscope = NULL;
01122 
01123   // We first try to locate both class and scope attributes
01124   nrclass = NRClassAttr.find(attrs);
01125   nrscope = NRScopeAttr.find(attrs);
01126 
01127   // There must be a class attribute in subscriptions
01128   if (!nrclass)
01129     return false;
01130 
01131   if (nrscope){
01132     // This subcription has both class and scope attribute. So, we
01133     // check if class/scope attributes comply with the Diffusion
01134     // Routing API
01135 
01136     // Must check scope's operator. The API requires it to be "IS"
01137     if (nrscope->getOp() != NRAttribute::IS)
01138       return false;
01139 
01140     // Ok, so first check if this is a global subscription
01141     if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) &&
01142         (nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01143         (nrclass->getOp() == NRAttribute::IS))
01144       return true;
01145 
01146     // Check for local subscriptions
01147     if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01148       return true;
01149 
01150     // Just to be sure we did not miss any case
01151     return false;
01152   }
01153 
01154   // If there is no scope attribute, we will insert one later if this
01155   // subscription looks like a global subscription
01156   if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01157       (nrclass->getOp() == NRAttribute::IS))
01158     return true;
01159 
01160   return false;
01161 }
01162 
01163 bool DiffusionRouting::checkPublication(NRAttrVec *attrs)
01164 {
01165   NRSimpleAttribute<int> *nrclass = NULL;
01166   NRSimpleAttribute<int> *nrscope = NULL;
01167 
01168   // We first try to locate both class and scope attributes
01169   nrclass = NRClassAttr.find(attrs);
01170   nrscope = NRScopeAttr.find(attrs);
01171 
01172   // There must be a class attribute in the publication
01173   if (!nrclass)
01174     return false;
01175 
01176   // In addition, the Diffusion Routing API requires the class
01177   // attribute to be set to "IS DATA_CLASS"
01178   if ((nrclass->getVal() != NRAttribute::DATA_CLASS) ||
01179       (nrclass->getOp() != NRAttribute::IS))
01180     return false;
01181 
01182   if (nrscope){
01183     // Ok, so this publication has both class and scope attributes. We
01184     // now have to check if they comply to the Diffusion Routing API
01185     // semantics for publish
01186 
01187     // Must check scope's operator. The API requires it to be "IS"
01188     if (nrscope->getOp() != NRAttribute::IS)
01189       return false;
01190 
01191     // We accept both global and local scope data messages
01192     if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) ||
01193         (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE))
01194       return true;
01195 
01196     // Just not to miss any case
01197     return false;
01198   }
01199 
01200   // A publish without a scope attribute is fine, we will include a
01201   // default NODE_LOCAL_SCOPE attribute later
01202   return true;
01203 }
01204 
01205 bool DiffusionRouting::checkSend(NRAttrVec *attrs)
01206 {
01207   NRSimpleAttribute<int> *nrclass = NULL;
01208   NRSimpleAttribute<int> *nrscope = NULL;
01209 
01210   // Currently only checks for Class and Scope attributes
01211   nrclass = NRClassAttr.find(attrs);
01212   nrscope = NRScopeAttr.find(attrs);
01213 
01214   if (nrclass || nrscope)
01215     return false;
01216 
01217   return true;
01218 }
01219 
01220 bool DiffusionRouting::isPushData(NRAttrVec *attrs)
01221 {
01222   NRSimpleAttribute<int> *nrclass = NULL;
01223   NRSimpleAttribute<int> *nrscope = NULL;
01224 
01225   // Currently only checks for Class and Scope attributes
01226   nrclass = NRClassAttr.find(attrs);
01227   nrscope = NRScopeAttr.find(attrs);
01228 
01229   // We should have both class and scope
01230   if (nrclass && nrscope){
01231     if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01232       return false;
01233     return true;
01234   }
01235   else{
01236     DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !\n");
01237     return false;
01238   }
01239 }

Generated on Tue Apr 20 12:14:14 2004 for NS2.26SourcesOriginal by doxygen 1.3.3