Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

DiffusionRouting Class Reference

#include <dr.hh>

Inheritance diagram for DiffusionRouting:

Inheritance graph
[legend]
Collaboration diagram for DiffusionRouting:

Collaboration graph
[legend]
List of all members.

Public Types

typedef long handle

Public Member Functions

 DiffusionRouting (u_int16_t port)
void run (bool wait_condition, long max_timeout)
virtual ~DiffusionRouting ()
handle subscribe (NRAttrVec *subscribe_attrs, NR::Callback *cb)
int unsubscribe (handle subscription_handle)
handle publish (NRAttrVec *publish_attrs)
int unpublish (handle publication_handle)
int send (handle publication_handle, NRAttrVec *send_attrs)
handle addFilter (NRAttrVec *filter_attrs, u_int16_t priority, FilterCallback *cb)
int removeFilter (handle filter_handle)
int sendMessage (Message *msg, handle h, u_int16_t priority=FILTER_KEEP_PRIORITY)
handle addTimer (int timeout, TimerCallback *callback)
handle addTimer (int timeout, void *param, TimerCallbacks *cb)
bool removeTimer (handle hdl)
void doIt ()
void doOne (long timeout=WAIT_FOREVER)
int interestTimeout (HandleEntry *handle_entry)
int filterKeepaliveTimeout (FilterEntry *filter_entry)

Static Public Member Functions

NRcreateNR (u_int16_t port=0)

Protected Member Functions

void recvPacket (DiffPacket pkt)
void recvMessage (Message *msg)
void sendMessageToDiffusion (Message *msg)
void sendPacketToDiffusion (DiffPacket pkt, int len, int dst)
void processMessage (Message *msg)
void processControlMessage (Message *msg)
bool checkSubscription (NRAttrVec *attrs)
bool checkPublication (NRAttrVec *attrs)
bool checkSend (NRAttrVec *attrs)
bool isPushData (NRAttrVec *attrs)
HandleEntryremoveHandle (handle my_handle, HandleList *hl)
HandleEntryfindHandle (handle my_handle, HandleList *hl)
FilterEntrydeleteFilter (handle my_handle)
FilterEntryfindFilter (handle my_handle)
bool hasScope (NRAttrVec *attrs)

Protected Attributes

int next_handle_
HandleList pub_list_
HandleList sub_list_
FilterList filter_list_
pthread_mutex_t * dr_mtx_
TimerManagertimers_manager_
DeviceList in_devices_
DeviceList local_out_devices_
u_int16_t diffusion_port_
int pkt_count_
int random_id_
u_int16_t agent_id_

Member Typedef Documentation

typedef long NR::handle [inherited]
 

Definition at line 302 of file nr.hh.

Referenced by processControlMessage().


Constructor & Destructor Documentation

DiffusionRouting::DiffusionRouting u_int16_t  port  ) 
 

Definition at line 165 of file dr.cc.

References DEBUG_ALWAYS, DEFAULT_DIFFUSION_PORT, DiffPrint(), GetRand(), GetTime(), and SetSeed().

00167 {
00168   struct timeval tv;
00169   DiffusionIO *device;
00170 
00171   // Initialize basic stuff
00172   next_handle_ = 1;
00173   GetTime(&tv);
00174   SetSeed(&tv);
00175   pkt_count_ = GetRand();
00176   random_id_ = GetRand();
00177   agent_id_ = 0;
00178 
00179   if (port == 0)
00180     port = DEFAULT_DIFFUSION_PORT;
00181 
00182   diffusion_port_ = port;
00183 
00184   // Initialize timer manager
00185   timers_manager_ = new TimerManager;
00186 
00187   // Initialize input device
00188 #ifdef NS_DIFFUSION
00189   device = new NsLocal(da);
00190   local_out_devices_.push_back(device);
00191 #endif // NS_DIFFUSION
00192 
00193 #ifdef UDP
00194   device = new UDPLocal(&agent_id_);
00195   in_devices_.push_back(device);
00196   local_out_devices_.push_back(device);
00197 #endif // UDP
00198 
00199   // Print initialization message
00200   DiffPrint(DEBUG_ALWAYS,
00201             "Diffusion Routing Agent initializing... Agent Id = %d\n",
00202             agent_id_);
00203 
00204 #ifdef USE_THREADS
00205   // Initialize Semaphores
00206   dr_mtx_ = new pthread_mutex_t;
00207   pthread_mutex_init(dr_mtx_, NULL);
00208 #endif // USE_THREADS
00209 }

Here is the call graph for this function:

DiffusionRouting::~DiffusionRouting  )  [virtual]
 

Definition at line 211 of file dr.cc.

References pub_list_, and sub_list_.

00212 {
00213   HandleList::iterator itr;
00214   HandleEntry *current;
00215 
00216   // Delete all Handles
00217   for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){
00218     current = *itr;
00219     delete current;
00220   }
00221 
00222   for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){
00223     current = *itr;
00224     delete current;
00225   }
00226 }


Member Function Documentation

handle DiffusionRouting::addFilter NRAttrVec filter_attrs,
u_int16_t  priority,
FilterCallback cb
 

Definition at line 431 of file dr.cc.

References ADD_UPDATE_FILTER, TimerManager::addTimer(), agent_id_, CalculateSize(), FilterEntry::cb_, CONTROL, ControlMsgAttr, CopyAttrs(), Message::data_len_, DEBUG_ALWAYS, DiffPrint(), DIFFUSION_VERSION, dr_mtx_, FAIL, FilterEntry::filter_attrs_, FILTER_KEEPALIVE_DELAY, filter_list_, FILTER_MAX_PRIORITY, GetLock(), handle, FilterEntry::handle_, NRAttribute::IS, LOCALHOST_ADDR, Message::msg_attr_vec_, next_handle_, Message::num_attr_, pkt_count_, random_id_, ReleaseLock(), sendMessageToDiffusion(), and timers_manager_.

00433 {
00434   FilterEntry *filter_entry;
00435   NRAttrVec *attrs;
00436   NRAttribute *ctrl_msg_attr;
00437   ControlMessage *control_blob;
00438   Message *my_message;
00439   TimerCallback *timer_callback;
00440 
00441   // Check parameters
00442   if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){
00443     DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !\n");
00444     return FAIL;
00445   }
00446 
00447   // Get lock first
00448   GetLock(dr_mtx_);
00449 
00450   // Create and Initialize the handle_entry structute
00451   filter_entry = new FilterEntry(next_handle_, priority, agent_id_);
00452   next_handle_++;
00453   filter_entry->cb_ = (FilterCallback *) cb;
00454   filter_list_.push_back(filter_entry);
00455 
00456   // Copy attributes (keep them for matching later)
00457   filter_entry->filter_attrs_ = CopyAttrs(filter_attrs);
00458 
00459   // Copy the attributes (and add the control attr)
00460   attrs = CopyAttrs(filter_attrs);
00461   control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00462                                     priority, filter_entry->handle_);
00463 
00464   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00465                                       (void *)control_blob,
00466                                       sizeof(ControlMessage));
00467 
00468   attrs->push_back(ctrl_msg_attr);
00469 
00470   // Initialize message structure
00471   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00472                            0, pkt_count_, random_id_, LOCALHOST_ADDR,
00473                            LOCALHOST_ADDR);
00474 
00475   // Increment pkt_counter
00476   pkt_count_++;
00477 
00478   // Add attributes to the message
00479   my_message->msg_attr_vec_ = attrs;
00480   my_message->num_attr_ = attrs->size();
00481   my_message->data_len_ = CalculateSize(attrs);
00482 
00483   // Release the lock
00484   ReleaseLock(dr_mtx_);
00485 
00486   // Send Packet
00487   sendMessageToDiffusion(my_message);
00488 
00489   // Add keepalive timer to the event queue
00490   timer_callback = new FilterKeepaliveCallback(this, filter_entry);
00491   timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback);
00492 
00493   // Delete message, attribute set and controlblob
00494   delete my_message;
00495   delete control_blob;
00496 
00497   return filter_entry->handle_;
00498 }

Here is the call graph for this function:

handle DiffusionRouting::addTimer int  timeout,
void *  param,
TimerCallbacks cb
 

Definition at line 560 of file dr.cc.

References addTimer(), and handle.

00561 {
00562   TimerCallback *callback;
00563 
00564   callback = new OldAPITimer(cb, p);
00565 
00566   return (addTimer(timeout, callback));
00567 }

Here is the call graph for this function:

handle DiffusionRouting::addTimer int  timeout,
TimerCallback callback
 

Definition at line 555 of file dr.cc.

References TimerManager::addTimer(), handle, and timers_manager_.

Referenced by addTimer().

00556 {
00557   return (timers_manager_->addTimer(timeout, callback));
00558 }

Here is the call graph for this function:

bool DiffusionRouting::checkPublication NRAttrVec attrs  )  [protected]
 

Definition at line 1163 of file dr.cc.

References NRAttribute::DATA_CLASS, NRAttribute::getOp(), NRSimpleAttribute< T >::getVal(), NRAttribute::GLOBAL_SCOPE, NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE, NRClassAttr, and NRScopeAttr.

Referenced by publish().

01164 {
01165   NRSimpleAttribute<int> *nrclass = NULL;
01166   NRSimpleAttribute<int> *nrscope = NULL;
01167 
01168   // We first try to locate both class and scope attributes
01169   nrclass = NRClassAttr.find(attrs);
01170   nrscope = NRScopeAttr.find(attrs);
01171 
01172   // There must be a class attribute in the publication
01173   if (!nrclass)
01174     return false;
01175 
01176   // In addition, the Diffusion Routing API requires the class
01177   // attribute to be set to "IS DATA_CLASS"
01178   if ((nrclass->getVal() != NRAttribute::DATA_CLASS) ||
01179       (nrclass->getOp() != NRAttribute::IS))
01180     return false;
01181 
01182   if (nrscope){
01183     // Ok, so this publication has both class and scope attributes. We
01184     // now have to check if they comply to the Diffusion Routing API
01185     // semantics for publish
01186 
01187     // Must check scope's operator. The API requires it to be "IS"
01188     if (nrscope->getOp() != NRAttribute::IS)
01189       return false;
01190 
01191     // We accept both global and local scope data messages
01192     if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) ||
01193         (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE))
01194       return true;
01195 
01196     // Just not to miss any case
01197     return false;
01198   }
01199 
01200   // A publish without a scope attribute is fine, we will include a
01201   // default NODE_LOCAL_SCOPE attribute later
01202   return true;
01203 }

Here is the call graph for this function:

bool DiffusionRouting::checkSend NRAttrVec attrs  )  [protected]
 

Definition at line 1205 of file dr.cc.

References NRClassAttr, and NRScopeAttr.

Referenced by send().

01206 {
01207   NRSimpleAttribute<int> *nrclass = NULL;
01208   NRSimpleAttribute<int> *nrscope = NULL;
01209 
01210   // Currently only checks for Class and Scope attributes
01211   nrclass = NRClassAttr.find(attrs);
01212   nrscope = NRScopeAttr.find(attrs);
01213 
01214   if (nrclass || nrscope)
01215     return false;
01216 
01217   return true;
01218 }

bool DiffusionRouting::checkSubscription NRAttrVec attrs  )  [protected]
 

Definition at line 1118 of file dr.cc.

References NRAttribute::getOp(), NRSimpleAttribute< T >::getVal(), NRAttribute::GLOBAL_SCOPE, NRAttribute::INTEREST_CLASS, NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE, NRClassAttr, and NRScopeAttr.

Referenced by subscribe().

01119 {
01120   NRSimpleAttribute<int> *nrclass = NULL;
01121   NRSimpleAttribute<int> *nrscope = NULL;
01122 
01123   // We first try to locate both class and scope attributes
01124   nrclass = NRClassAttr.find(attrs);
01125   nrscope = NRScopeAttr.find(attrs);
01126 
01127   // There must be a class attribute in subscriptions
01128   if (!nrclass)
01129     return false;
01130 
01131   if (nrscope){
01132     // This subcription has both class and scope attribute. So, we
01133     // check if class/scope attributes comply with the Diffusion
01134     // Routing API
01135 
01136     // Must check scope's operator. The API requires it to be "IS"
01137     if (nrscope->getOp() != NRAttribute::IS)
01138       return false;
01139 
01140     // Ok, so first check if this is a global subscription
01141     if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) &&
01142         (nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01143         (nrclass->getOp() == NRAttribute::IS))
01144       return true;
01145 
01146     // Check for local subscriptions
01147     if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01148       return true;
01149 
01150     // Just to be sure we did not miss any case
01151     return false;
01152   }
01153 
01154   // If there is no scope attribute, we will insert one later if this
01155   // subscription looks like a global subscription
01156   if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01157       (nrclass->getOp() == NRAttribute::IS))
01158     return true;
01159 
01160   return false;
01161 }

Here is the call graph for this function:

NR * NR::createNR u_int16_t  port = 0  )  [static, inherited]
 

Definition at line 123 of file dr.cc.

References DEBUG_ALWAYS, DiffPrint(), and dr.

Referenced by GeoRoutingFilter::GeoRoutingFilter(), GradientFilter::GradientFilter(), LogFilter::LogFilter(), PingReceiverApp::PingReceiverApp(), PingSenderApp::PingSenderApp(), PushReceiverApp::PushReceiverApp(), PushSenderApp::PushSenderApp(), SrcRtFilter::SrcRtFilter(), and TagFilter::TagFilter().

00124 {
00125   // Create Diffusion Routing Class
00126   if (dr)
00127     return dr;
00128 
00129   dr = new DiffusionRouting(port);
00130 
00131 #ifdef USE_THREADS
00132   int retval;
00133   pthread_t thread;
00134 
00135   // Fork a thread for receiving Messages
00136   retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr);
00137 
00138   if (retval){
00139     DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n");
00140     exit(-1);
00141   }
00142 #endif // USE_THREADS
00143 
00144   return dr;
00145 }

Here is the call graph for this function:

FilterEntry * DiffusionRouting::deleteFilter handle  my_handle  )  [protected]
 

Definition at line 1079 of file dr.cc.

References filter_list_, and FilterEntry::handle_.

Referenced by filterKeepaliveTimeout().

01080 {
01081   FilterList::iterator itr;
01082   FilterEntry *entry;
01083 
01084   for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01085     entry = *itr;
01086     if (entry->handle_ == my_handle){
01087       filter_list_.erase(itr);
01088       return entry;
01089     }
01090   }
01091   return NULL;
01092 }

void DiffusionRouting::doIt  ) 
 

Definition at line 755 of file dr.cc.

References run(), and WAIT_FOREVER.

00756 {
00757   run(true, WAIT_FOREVER);
00758 }

Here is the call graph for this function:

void DiffusionRouting::doOne long  timeout = WAIT_FOREVER  ) 
 

Definition at line 760 of file dr.cc.

References run().

00761 {
00762   run(false, timeout);
00763 }

Here is the call graph for this function:

int DiffusionRouting::filterKeepaliveTimeout FilterEntry filter_entry  ) 
 

Definition at line 574 of file dr.cc.

References ADD_UPDATE_FILTER, agent_id_, CalculateSize(), CONTROL, ControlMsgAttr, CopyAttrs(), Message::data_len_, DEBUG_ALWAYS, deleteFilter(), DiffPrint(), DIFFUSION_VERSION, dr_mtx_, FilterEntry::filter_attrs_, FILTER_KEEPALIVE_DELAY, GetLock(), FilterEntry::handle_, NRAttribute::IS, LOCALHOST_ADDR, Message::msg_attr_vec_, Message::num_attr_, pkt_count_, FilterEntry::priority_, random_id_, ReleaseLock(), sendMessageToDiffusion(), and FilterEntry::valid_.

Referenced by FilterKeepaliveCallback::expire().

00575 {
00576   FilterEntry *my_entry = NULL;
00577   ControlMessage *control_blob;
00578   NRAttribute *ctrl_msg_attr;
00579   NRAttrVec *attrs;
00580   Message *my_message;
00581 
00582   // Acquire lock first
00583   GetLock(dr_mtx_);
00584 
00585   if (filter_entry->valid_){
00586     // Send keepalive
00587     control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00588                                       filter_entry->priority_,
00589                                       filter_entry->handle_);
00590 
00591     ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00592                                         (void *)control_blob,
00593                                         sizeof(ControlMessage));
00594 
00595     attrs = CopyAttrs(filter_entry->filter_attrs_);
00596     attrs->push_back(ctrl_msg_attr);
00597 
00598     my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00599                              0, pkt_count_, random_id_, LOCALHOST_ADDR,
00600                              LOCALHOST_ADDR);
00601 
00602     // Increment pkt_counter
00603     pkt_count_++;
00604 
00605     // Add attributes to the message
00606     my_message->msg_attr_vec_ = attrs;
00607     my_message->num_attr_ = attrs->size();
00608     my_message->data_len_ = CalculateSize(attrs);
00609 
00610     // Send Message
00611     sendMessageToDiffusion(my_message);
00612 
00613     delete my_message;
00614     delete control_blob;
00615 
00616     // Release lock
00617     ReleaseLock(dr_mtx_);
00618 
00619     // Reschedule another filter keepalive timer in event queue
00620     return (FILTER_KEEPALIVE_DELAY);
00621   }
00622   else{
00623     // Filter was removed
00624     my_entry = deleteFilter(filter_entry->handle_);
00625 
00626     // We should have removed the correct handle
00627     if (my_entry != filter_entry){
00628       DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !\n");
00629       exit(-1);
00630     }
00631 
00632     delete my_entry;
00633 
00634     // Release lock
00635     ReleaseLock(dr_mtx_);
00636 
00637     return -1;
00638   }
00639 }

Here is the call graph for this function:

FilterEntry * DiffusionRouting::findFilter handle  my_handle  )  [protected]
 

Definition at line 1094 of file dr.cc.

References filter_list_, and FilterEntry::handle_.

Referenced by processControlMessage(), and removeFilter().

01095 {
01096   FilterList::iterator itr;
01097   FilterEntry *entry;
01098 
01099   for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01100     entry = *itr;
01101     if (entry->handle_ == my_handle)
01102       return entry;
01103   }
01104   return NULL;
01105 }

HandleEntry * DiffusionRouting::findHandle handle  my_handle,
HandleList hl
[protected]
 

Definition at line 1066 of file dr.cc.

References HandleEntry::hdl_.

Referenced by send(), and unsubscribe().

01067 {
01068   HandleList::iterator itr;
01069   HandleEntry *entry;
01070 
01071   for (itr = hl->begin(); itr != hl->end(); ++itr){
01072     entry = *itr;
01073     if (entry->hdl_ == my_handle)
01074       return entry;
01075   }
01076   return NULL;
01077 }

bool DiffusionRouting::hasScope NRAttrVec attrs  )  [protected]
 

Definition at line 1107 of file dr.cc.

References NRScopeAttr.

Referenced by publish(), and subscribe().

01108 {
01109   NRAttribute *temp = NULL;
01110 
01111   temp = NRScopeAttr.find(attrs);
01112   if (temp)
01113     return true;
01114 
01115   return false;
01116 }

int DiffusionRouting::interestTimeout HandleEntry handle_entry  ) 
 

Definition at line 641 of file dr.cc.

References agent_id_, HandleEntry::attrs_, CalculateSize(), CopyAttrs(), Message::data_len_, DEBUG_ALWAYS, DiffPrint(), DIFFUSION_VERSION, dr_mtx_, GetLock(), GetRand(), HandleEntry::hdl_, INTEREST, INTEREST_DELAY, INTEREST_JITTER, LOCALHOST_ADDR, Message::msg_attr_vec_, Message::num_attr_, pkt_count_, RAND_MAX, random_id_, ReleaseLock(), removeHandle(), sendMessageToDiffusion(), sub_list_, and HandleEntry::valid_.

Referenced by InterestCallback::expire().

00642 {
00643   HandleEntry *my_handle = NULL;
00644   Message *my_message;
00645 
00646   // Acquire lock first
00647   GetLock(dr_mtx_);
00648 
00649   if (handle_entry->valid_){
00650     // Send the interest message if entry is still valid
00651     my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0,
00652                              0, pkt_count_, random_id_, LOCALHOST_ADDR,
00653                              LOCALHOST_ADDR);
00654 
00655     // Increment pkt_counter
00656     pkt_count_++;
00657 
00658     // Add attributes to the message
00659     my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_);
00660     my_message->num_attr_ = handle_entry->attrs_->size();
00661     my_message->data_len_ = CalculateSize(handle_entry->attrs_);
00662 
00663     // Send Packet
00664     sendMessageToDiffusion(my_message);
00665 
00666     delete my_message;
00667 
00668     // Release lock
00669     ReleaseLock(dr_mtx_);
00670 
00671     // Reschedule this timer in the queue
00672     return (INTEREST_DELAY +
00673             (int) ((INTEREST_JITTER * (GetRand() * 1.0 / RAND_MAX)) -
00674                    (INTEREST_JITTER / 2)));
00675   }
00676   else{
00677     // Interest was canceled. Just delete it from the handle_list
00678     my_handle = removeHandle(handle_entry->hdl_, &sub_list_);
00679 
00680     // We should have removed the correct handle
00681     if (my_handle != handle_entry){
00682       DiffPrint(DEBUG_ALWAYS,
00683                 "Error: interestTimeout: Handles should match !\n");
00684       exit(-1);
00685     }
00686 
00687     delete my_handle;
00688 
00689     // Release lock
00690     ReleaseLock(dr_mtx_);
00691 
00692     // Delete timer from the queue
00693     return -1;
00694   }
00695 }

Here is the call graph for this function:

bool DiffusionRouting::isPushData NRAttrVec attrs  )  [protected]
 

Definition at line 1220 of file dr.cc.

References DEBUG_ALWAYS, DiffPrint(), NRSimpleAttribute< T >::getVal(), NRAttribute::NODE_LOCAL_SCOPE, NRClassAttr, and NRScopeAttr.

Referenced by send().

01221 {
01222   NRSimpleAttribute<int> *nrclass = NULL;
01223   NRSimpleAttribute<int> *nrscope = NULL;
01224 
01225   // Currently only checks for Class and Scope attributes
01226   nrclass = NRClassAttr.find(attrs);
01227   nrscope = NRScopeAttr.find(attrs);
01228 
01229   // We should have both class and scope
01230   if (nrclass && nrscope){
01231     if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01232       return false;
01233     return true;
01234   }
01235   else{
01236     DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !\n");
01237     return false;
01238   }
01239 }

Here is the call graph for this function:

void DiffusionRouting::processControlMessage Message msg  )  [protected]
 

Definition at line 951 of file dr.cc.

References FilterEntry::cb_, DEBUG_ALWAYS, DEBUG_IMPORTANT, DiffPrint(), dr_mtx_, FilterEntry::filter_attrs_, findFilter(), GetLock(), NRSimpleAttribute< T >::getVal(), NR::handle, RedirectMessage::handle_, RedirectMessage::last_hop_, Message::last_hop_, Message::msg_attr_vec_, RedirectMessage::msg_type_, Message::msg_type_, RedirectMessage::new_message_, Message::new_message_, RedirectMessage::next_hop_, Message::next_hop_, RedirectMessage::next_port_, Message::next_port_, RedirectMessage::num_attr_, Message::num_attr_, OneWayMatch(), OriginalHdrAttr, RedirectMessage::pkt_num_, Message::pkt_num_, RedirectMessage::rdm_id_, Message::rdm_id_, FilterCallback::recv(), ReleaseLock(), RedirectMessage::source_port_, Message::source_port_, and FilterEntry::valid_.

Referenced by recvMessage().

00952 {
00953   NRSimpleAttribute<void *> *original_header_attr = NULL;
00954   NRAttrVec::iterator place = msg->msg_attr_vec_->begin();
00955   RedirectMessage *original_header;
00956   FilterEntry *entry;
00957   handle my_handle;
00958 
00959   // Find the attribute containing the original packet header
00960   original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
00961                                                    place, &place);
00962   if (!original_header_attr){
00963     DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !\n");
00964     return;
00965   }
00966 
00967   // Restore original message header
00968   original_header = (RedirectMessage *) original_header_attr->getVal();
00969   my_handle = original_header->handle_;
00970   msg->msg_type_ = original_header->msg_type_;
00971   msg->source_port_ = original_header->source_port_;
00972   msg->pkt_num_ = original_header->pkt_num_;
00973   msg->rdm_id_ = original_header->rdm_id_;
00974   msg->next_hop_ = original_header->next_hop_;
00975   msg->last_hop_ = original_header->last_hop_;
00976   msg->num_attr_ = original_header->num_attr_;
00977   msg->new_message_ = original_header->new_message_;
00978   msg->next_port_ = original_header->next_port_;
00979 
00980   // Delete attribute from the original set
00981   msg->msg_attr_vec_->erase(place);
00982   delete original_header_attr;
00983 
00984   // Find the right callback
00985   GetLock(dr_mtx_);
00986 
00987   entry = findFilter(my_handle);
00988   if (entry && entry->valid_){
00989     // Just to confirm
00990     if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){
00991       ReleaseLock(dr_mtx_);
00992       entry->cb_->recv(msg, my_handle);
00993       return;
00994     }
00995     else{
00996       DiffPrint(DEBUG_ALWAYS,
00997                 "Warning: Filter doesn't match incoming message's attributes !\n");
00998     }
00999   }
01000   else{
01001     DiffPrint(DEBUG_IMPORTANT,
01002               "Report: Cannot find filter (possibly deleted ?)\n");
01003   }
01004 
01005   ReleaseLock(dr_mtx_);
01006 }

Here is the call graph for this function:

void DiffusionRouting::processMessage Message msg  )  [protected]
 

Definition at line 1008 of file dr.cc.

References HandleEntry::attrs_, CallbackEntry::cb_, HandleEntry::cb_, ClearAttrs(), CopyAttrs(), dr_mtx_, GetLock(), HandleEntry::hdl_, MatchAttrs(), Message::msg_attr_vec_, NR::Callback::recv(), ReleaseLock(), sub_list_, CallbackEntry::subscription_handle_, and HandleEntry::valid_.

Referenced by recvMessage().

01009 {
01010   CallbackList cbl;
01011   CallbackEntry *aux;
01012   HandleList::iterator sub_itr;
01013   CallbackList::iterator cbl_itr;
01014   HandleEntry *entry; 
01015   NRAttrVec *callback_attrs;
01016 
01017   // First, acquire the lock
01018   GetLock(dr_mtx_);
01019 
01020   for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){
01021     entry = *sub_itr;
01022     if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_)))
01023       if (entry->cb_){
01024         aux = new CallbackEntry(entry->cb_, entry->hdl_);
01025         cbl.push_back(aux);
01026       }
01027   }
01028 
01029   // We can release the lock now
01030   ReleaseLock(dr_mtx_);
01031 
01032   // Now we just call all callback functions
01033   for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){
01034     // Copy attributes
01035     callback_attrs = CopyAttrs(msg->msg_attr_vec_);
01036 
01037     // Call app-specific callback function
01038     aux = *cbl_itr;
01039     aux->cb_->recv(callback_attrs, aux->subscription_handle_);
01040     delete aux;
01041 
01042     // Clean up callback attributes
01043     ClearAttrs(callback_attrs);
01044     delete callback_attrs;
01045   }
01046 
01047   // We are done
01048   cbl.clear();
01049 }

Here is the call graph for this function:

handle DiffusionRouting::publish NRAttrVec publish_attrs  )  [virtual]
 

Implements NR.

Definition at line 293 of file dr.cc.

References HandleEntry::attrs_, checkPublication(), CopyAttrs(), DEBUG_ALWAYS, DiffPrint(), dr_mtx_, FAIL, GetLock(), handle, hasScope(), HandleEntry::hdl_, NRAttribute::IS, next_handle_, NRAttribute::NODE_LOCAL_SCOPE, NRScopeAttr, pub_list_, and ReleaseLock().

00294 {
00295   HandleEntry *my_handle;
00296   NRAttribute *scope_attr;
00297 
00298   // Get the lock first
00299   GetLock(dr_mtx_);
00300 
00301   // Check the published attributes
00302   if (!checkPublication(publish_attrs)){
00303     DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n");
00304     ReleaseLock(dr_mtx_);
00305     return FAIL;
00306   }
00307 
00308   // Create and Initialize the handle_entry structute
00309   my_handle = new HandleEntry;
00310   my_handle->hdl_ = next_handle_;
00311   next_handle_++;
00312   pub_list_.push_back(my_handle);
00313 
00314   // Copy the attributes
00315   my_handle->attrs_ = CopyAttrs(publish_attrs);
00316 
00317   // For publications, scope is local if not specified
00318   if (!hasScope(publish_attrs)){
00319     scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE);
00320     my_handle->attrs_->push_back(scope_attr);
00321   }
00322 
00323   // Release the lock
00324   ReleaseLock(dr_mtx_);
00325 
00326   return my_handle->hdl_;
00327 }

Here is the call graph for this function:

void DiffusionRouting::recvMessage Message msg  )  [protected]
 

Definition at line 934 of file dr.cc.

References DIFFUSION_VERSION, LOCALHOST_ADDR, Message::msg_type_, Message::next_hop_, processControlMessage(), processMessage(), REDIRECT, and Message::version_.

Referenced by recvPacket().

00935 {
00936   // Check version
00937   if (msg->version_ != DIFFUSION_VERSION)
00938     return;
00939 
00940   // Check destination
00941   if (msg->next_hop_ != LOCALHOST_ADDR)
00942     return;
00943 
00944   // Process the incoming message
00945   if (msg->msg_type_ == REDIRECT)
00946     processControlMessage(msg);
00947   else
00948     processMessage(msg);
00949 }

Here is the call graph for this function:

void DiffusionRouting::recvPacket DiffPacket  pkt  )  [protected]
 

Definition at line 899 of file dr.cc.

References DATA_LEN, DIFF_VER, HDR_DIFF, int32_t, int8_t, LAST_HOP, Message::msg_attr_vec_, MSG_TYPE, NEXT_HOP, NUM_ATTR, PKT_NUM, RDM_ID, recvMessage(), SRC_PORT, u_int16_t, and UnpackAttrs().

Referenced by run().

00900 {
00901   struct hdr_diff *dfh = HDR_DIFF(pkt);
00902   Message *rcv_message = NULL;
00903   int8_t version, msg_type;
00904   u_int16_t data_len, num_attr, source_port;
00905   int32_t pkt_num, rdm_id, next_hop, last_hop;
00906 
00907   // Read header
00908   version = DIFF_VER(dfh);
00909   msg_type = MSG_TYPE(dfh);
00910   source_port = ntohs(SRC_PORT(dfh));
00911   pkt_num = ntohl(PKT_NUM(dfh));
00912   rdm_id = ntohl(RDM_ID(dfh));
00913   num_attr = ntohs(NUM_ATTR(dfh));
00914   next_hop = ntohl(NEXT_HOP(dfh));
00915   last_hop = ntohl(LAST_HOP(dfh));
00916   data_len = ntohs(DATA_LEN(dfh));
00917 
00918   // Create a message structure from the incoming packet
00919   rcv_message = new Message(version, msg_type, source_port, data_len,
00920                             num_attr, pkt_num, rdm_id, next_hop, last_hop);
00921 
00922   // Read all attributes into the Message structure
00923   rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
00924 
00925   // Process the incoming message
00926   recvMessage(rcv_message);
00927 
00928   // We are done
00929   delete rcv_message;
00930   delete [] pkt;
00931 }

Here is the call graph for this function:

int DiffusionRouting::removeFilter handle  filter_handle  ) 
 

Definition at line 500 of file dr.cc.

References agent_id_, CalculateSize(), CONTROL, ControlMsgAttr, Message::data_len_, DIFFUSION_VERSION, dr_mtx_, FAIL, findFilter(), GetLock(), FilterEntry::handle_, NRAttribute::IS, LOCALHOST_ADDR, Message::msg_attr_vec_, Message::num_attr_, OK, pkt_count_, random_id_, ReleaseLock(), REMOVE_FILTER, sendMessageToDiffusion(), and FilterEntry::valid_.

00501 {
00502   FilterEntry *filter_entry = NULL;
00503   ControlMessage *control_blob;
00504   NRAttribute *ctrl_msg_attr;
00505   NRAttrVec *attrs;
00506   Message *my_message;
00507 
00508   // Get lock first
00509   GetLock(dr_mtx_);
00510 
00511   filter_entry = findFilter(filter_handle);
00512   if (!filter_entry){
00513     // Handle doesn't exist, return FAIL
00514     ReleaseLock(dr_mtx_);
00515     return FAIL;
00516   }
00517 
00518   control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0);
00519 
00520   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00521                                       (void *)control_blob,
00522                                       sizeof(ControlMessage));
00523 
00524   attrs = new NRAttrVec;
00525   attrs->push_back(ctrl_msg_attr);
00526 
00527   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00528                            0, pkt_count_, random_id_, LOCALHOST_ADDR,
00529                            LOCALHOST_ADDR);
00530 
00531   // Increment pkt_counter
00532   pkt_count_++;
00533 
00534   // Add attributes to the message
00535   my_message->msg_attr_vec_ = attrs;
00536   my_message->num_attr_ = attrs->size();
00537   my_message->data_len_ = CalculateSize(attrs);
00538 
00539   // Handle will be destroyed when next keepalive timer happens
00540   filter_entry->valid_ = false;
00541 
00542   // Send Packet
00543   sendMessageToDiffusion(my_message);
00544 
00545   // Release the lock
00546   ReleaseLock(dr_mtx_);
00547 
00548   // Delete message
00549   delete my_message;
00550   delete control_blob;
00551 
00552   return OK;
00553 }

Here is the call graph for this function:

HandleEntry * DiffusionRouting::removeHandle handle  my_handle,
HandleList hl
[protected]
 

Definition at line 1051 of file dr.cc.

References HandleEntry::hdl_.

Referenced by interestTimeout(), and unpublish().

01052 {
01053   HandleList::iterator itr;
01054   HandleEntry *entry;
01055 
01056   for (itr = hl->begin(); itr != hl->end(); ++itr){
01057     entry = *itr;
01058     if (entry->hdl_ == my_handle){
01059       hl->erase(itr);
01060       return entry;
01061     }
01062   }
01063   return NULL;
01064 }

bool DiffusionRouting::removeTimer handle  hdl  ) 
 

Definition at line 569 of file dr.cc.

References TimerManager::removeTimer(), and timers_manager_.

00570 {
00571   return (timers_manager_->removeTimer(hdl));
00572 }

Here is the call graph for this function:

void DiffusionRouting::run bool  wait_condition,
long  max_timeout
 

Definition at line 765 of file dr.cc.

References DEBUG_IMPORTANT, DiffPacket, DiffPrint(), TimerManager::executeAllExpiredTimers(), in_devices_, MAXVALUE, TimerManager::nextTimerTime(), POLLING_INTERVAL, recvPacket(), timers_manager_, TimevalCmp(), and WAIT_FOREVER.

Referenced by doIt(), and doOne().

00766 {
00767   DeviceList::iterator itr;
00768   int status, max_sock, fd;
00769   bool flag;
00770   DiffPacket in_pkt;
00771   fd_set fds;
00772   struct timeval tv;
00773   struct timeval max_tv;
00774 
00775   do{
00776     FD_ZERO(&fds);
00777     max_sock = 0;
00778 
00779     // Set the maximum timeout value
00780     max_tv.tv_sec = (int) (max_timeout / 1000);
00781     max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000);
00782 
00783     for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
00784       (*itr)->addInFDS(&fds, &max_sock);
00785     }
00786 
00787     // Check for the next timer
00788     timers_manager_->nextTimerTime(&tv);
00789 
00790     if (tv.tv_sec == MAXVALUE){
00791       // If we don't have any timers, we wait for POLLING_INTERVAL
00792       if (max_timeout == WAIT_FOREVER){
00793         tv.tv_sec = POLLING_INTERVAL;
00794         tv.tv_usec = 0;
00795       }
00796       else{
00797         tv = max_tv;
00798       }
00799     }
00800     else{
00801       if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){
00802         // max_timeout value is smaller than next timer's time, so we
00803         // use themax_timeout value instead
00804         tv = max_tv;
00805       }
00806     }
00807 
00808     status = select(max_sock+1, &fds, NULL, NULL, &tv);
00809 
00810     if (status == 0){
00811       // Process all timers that have expired
00812       timers_manager_->executeAllExpiredTimers();
00813     }
00814 
00815     if (status > 0){
00816       do{
00817         flag = false;
00818         for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
00819           fd = (*itr)->checkInFDS(&fds);
00820           if (fd != 0){
00821             // Message waiting
00822             in_pkt = (*itr)->recvPacket(fd);
00823             recvPacket(in_pkt);
00824 
00825             // Clear this fd
00826             FD_CLR(fd, &fds);
00827             status--;
00828             flag = true;
00829           }
00830         }
00831       } while ((status > 0) && (flag == true));
00832     }
00833     else
00834       if (status < 0){
00835         DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
00836       }
00837   } while (wait_condition);
00838 }

Here is the call graph for this function:

int DiffusionRouting::send handle  publication_handle,
NRAttrVec send_attrs
[virtual]
 

Implements NR.

Definition at line 352 of file dr.cc.

References AddAttrs(), agent_id_, HandleEntry::attrs_, CalculateSize(), checkSend(), CopyAttrs(), DATA, Message::data_len_, DEBUG_ALWAYS, DiffPrint(), DIFFUSION_VERSION, dr_mtx_, EXPLORATORY_DATA, EXPLORATORY_DATA_DELAY, HandleEntry::exploratory_time_, FAIL, findHandle(), GetLock(), GetTime(), int8_t, isPushData(), LOCALHOST_ADDR, Message::msg_attr_vec_, Message::num_attr_, OK, pkt_count_, pub_list_, PUSH_EXPLORATORY_DATA, PUSH_EXPLORATORY_DELAY, random_id_, ReleaseLock(), sendMessageToDiffusion(), and TimevalCmp().

00354 {
00355   Message *my_message;
00356   HandleEntry *my_handle;
00357   int8_t send_message_type = DATA;
00358   struct timeval current_time;
00359 
00360   // Get the lock first
00361   GetLock(dr_mtx_);
00362 
00363   // Get attributes associated with handle
00364   my_handle = findHandle(publication_handle, &pub_list_);
00365   if (!my_handle){
00366     ReleaseLock(dr_mtx_);
00367     return FAIL;
00368   }
00369 
00370   // Check the send attributes
00371   if (!checkSend(send_attrs)){
00372     DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the send attributes !\n");
00373     ReleaseLock(dr_mtx_);
00374     return FAIL;
00375   }
00376 
00377   // Check if it is time to send another exploratory data message
00378   GetTime(&current_time);
00379 
00380   if (TimevalCmp(&current_time, &(my_handle->exploratory_time_)) >= 0){
00381 
00382     // Check if it is a push data message or a regular data message
00383     if (isPushData(my_handle->attrs_)){
00384       // Push data message
00385 
00386       // Update time for the next push exploratory message
00387       GetTime(&(my_handle->exploratory_time_));
00388       my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY;
00389 
00390       send_message_type = PUSH_EXPLORATORY_DATA;
00391     }
00392     else{
00393       // Regular data message
00394 
00395       // Update time for the next exploratory message
00396       GetTime(&(my_handle->exploratory_time_));
00397       my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY;
00398     
00399       send_message_type = EXPLORATORY_DATA;
00400     }
00401   }
00402 
00403   // Initialize message structure
00404   my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_,
00405                            0, 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00406                            LOCALHOST_ADDR);
00407   // Increment pkt_counter
00408   pkt_count_++;
00409 
00410   // First, we duplicate the 'publish' attributes
00411   my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_);
00412 
00413   // Now, we add the send attributes
00414   AddAttrs(my_message->msg_attr_vec_, send_attrs);
00415 
00416   // Compute the total number and size of the joined attribute sets
00417   my_message->num_attr_ = my_message->msg_attr_vec_->size();
00418   my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_);
00419 
00420   // Release the lock
00421   ReleaseLock(dr_mtx_);
00422 
00423   // Send Packet
00424   sendMessageToDiffusion(my_message);
00425 
00426   delete my_message;
00427 
00428   return OK;
00429 }

Here is the call graph for this function:

int DiffusionRouting::sendMessage Message msg,
handle  h,
u_int16_t  priority = FILTER_KEEP_PRIORITY
 

Definition at line 697 of file dr.cc.

References agent_id_, CalculateSize(), CONTROL, ControlMsgAttr, CopyAttrs(), Message::data_len_, DIFFUSION_VERSION, FAIL, FILTER_KEEP_PRIORITY, FILTER_MIN_PRIORITY, NRAttribute::IS, Message::last_hop_, LOCALHOST_ADDR, Message::msg_attr_vec_, Message::msg_type_, Message::new_message_, Message::next_hop_, Message::next_port_, Message::num_attr_, OK, OriginalHdrAttr, pkt_count_, Message::pkt_num_, random_id_, Message::rdm_id_, SEND_MESSAGE, sendMessageToDiffusion(), and Message::source_port_.

00699 {
00700   RedirectMessage *original_hdr;
00701   NRAttribute *original_attr, *ctrl_msg_attr;
00702   ControlMessage *control_blob;
00703   NRAttrVec *attrs;
00704   Message *my_message;
00705 
00706   if ((priority < FILTER_MIN_PRIORITY) ||
00707       (priority > FILTER_KEEP_PRIORITY))
00708     return FAIL;
00709 
00710   // Create an attribute with the original header
00711   original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
00712                                      msg->source_port_, msg->data_len_,
00713                                      msg->num_attr_, msg->rdm_id_,
00714                                      msg->pkt_num_, msg->next_hop_,
00715                                      msg->last_hop_, 0,
00716                                      msg->next_port_);
00717 
00718   original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr,
00719                                        sizeof(RedirectMessage));
00720 
00721   // Create the attribute with the control message
00722   control_blob = new ControlMessage(SEND_MESSAGE, h, priority);
00723 
00724   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob,
00725                                       sizeof(ControlMessage));
00726 
00727   // Copy Attributes and add originalAttr and controlAttr
00728   attrs = CopyAttrs(msg->msg_attr_vec_);
00729   attrs->push_back(original_attr);
00730   attrs->push_back(ctrl_msg_attr);
00731 
00732   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00733                            0, pkt_count_, random_id_, LOCALHOST_ADDR,
00734                            LOCALHOST_ADDR);
00735 
00736   // Increment pkt_counter
00737   pkt_count_++;
00738 
00739   // Add attributes to the message
00740   my_message->msg_attr_vec_ = attrs;
00741   my_message->num_attr_ = attrs->size();
00742   my_message->data_len_ = CalculateSize(attrs);
00743 
00744   // Send Packet
00745   sendMessageToDiffusion(my_message);
00746 
00747   delete my_message;
00748   delete control_blob;
00749   delete original_hdr;
00750 
00751   return OK;
00752 }

Here is the call graph for this function:

void DiffusionRouting::sendMessageToDiffusion Message msg  )  [protected]
 

Definition at line 843 of file dr.cc.

References AllocateBuffer(), DATA_LEN, DIFF_VER, DiffPacket, diffusion_port_, HDR_DIFF, LAST_HOP, Message::last_hop_, len, Message::msg_attr_vec_, MSG_TYPE, Message::msg_type_, NEXT_HOP, Message::next_hop_, NUM_ATTR, Message::num_attr_, PackAttrs(), PKT_NUM, Message::pkt_num_, RDM_ID, Message::rdm_id_, sendPacketToDiffusion(), Message::source_port_, SRC_PORT, and Message::version_.

Referenced by addFilter(), filterKeepaliveTimeout(), interestTimeout(), removeFilter(), send(), and sendMessage().

00844 {
00845   DiffPacket out_pkt = NULL;
00846   struct hdr_diff *dfh;
00847   char *pos;
00848   int len;
00849 
00850   out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00851   dfh = HDR_DIFF(out_pkt);
00852 
00853   pos = (char *) out_pkt;
00854   pos = pos + sizeof(struct hdr_diff);
00855 
00856   len = PackAttrs(msg->msg_attr_vec_, pos);
00857 
00858   LAST_HOP(dfh) = htonl(msg->last_hop_);
00859   NEXT_HOP(dfh) = htonl(msg->next_hop_);
00860   DIFF_VER(dfh) = msg->version_;
00861   MSG_TYPE(dfh) = msg->msg_type_;
00862   DATA_LEN(dfh) = htons(len);
00863   PKT_NUM(dfh) = htonl(msg->pkt_num_);
00864   RDM_ID(dfh) = htonl(msg->rdm_id_);
00865   NUM_ATTR(dfh) = htons(msg->num_attr_);
00866   SRC_PORT(dfh) = htons(msg->source_port_);
00867 
00868   sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_);
00869 
00870   delete [] out_pkt;
00871 }

Here is the call graph for this function:

void DiffusionRouting::sendPacketToDiffusion DiffPacket  pkt,
int  len,
int  dst
[protected]
 

Definition at line 889 of file dr.cc.

References len, and local_out_devices_.

Referenced by sendMessageToDiffusion().

00890 {
00891   DeviceList::iterator itr;
00892 
00893   for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
00894     (*itr)->sendPacket(pkt, len, dst);
00895   }
00896 }

handle DiffusionRouting::subscribe NRAttrVec subscribe_attrs,
NR::Callback cb
[virtual]
 

Implements NR.

Definition at line 228 of file dr.cc.

References TimerManager::addTimer(), HandleEntry::attrs_, HandleEntry::cb_, checkSubscription(), CopyAttrs(), DEBUG_ALWAYS, DiffPrint(), dr_mtx_, FAIL, GetLock(), NRAttribute::GLOBAL_SCOPE, handle, hasScope(), HandleEntry::hdl_, NRAttribute::IS, next_handle_, NRScopeAttr, ReleaseLock(), SMALL_TIMEOUT, sub_list_, and timers_manager_.

00229 {
00230   HandleEntry *my_handle;
00231   NRAttribute *scope_attr;
00232   TimerCallback *timer_callback;
00233 
00234   // Get lock first
00235   GetLock(dr_mtx_);
00236 
00237   // Check the published attributes
00238   if (!checkSubscription(subscribe_attrs)){
00239     DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n");
00240     ReleaseLock(dr_mtx_);
00241     return FAIL;
00242   }
00243 
00244   // Create and Initialize the handle_entry structute
00245   my_handle = new HandleEntry;
00246   my_handle->hdl_ = next_handle_;
00247   next_handle_++;
00248   my_handle->cb_ = (NR::Callback *) cb;
00249   sub_list_.push_back(my_handle);
00250 
00251   // Copy the attributes   
00252   my_handle->attrs_ = CopyAttrs(subscribe_attrs);
00253 
00254   // For subscriptions, scope is global if not specified
00255   if (!hasScope(subscribe_attrs)){
00256     scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE);
00257     my_handle->attrs_->push_back(scope_attr);
00258   }
00259 
00260   // Create Interest Timer and add it to the queue
00261   timer_callback = new InterestCallback(this, my_handle);
00262   timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback);
00263 
00264   // Release lock
00265   ReleaseLock(dr_mtx_);
00266 
00267   return my_handle->hdl_;
00268 }

Here is the call graph for this function:

int DiffusionRouting::unpublish handle  publication_handle  )  [virtual]
 

Implements NR.

Definition at line 329 of file dr.cc.

References dr_mtx_, FAIL, GetLock(), OK, pub_list_, ReleaseLock(), and removeHandle().

00330 {
00331   HandleEntry *my_handle = NULL;
00332 
00333   // Get the lock first
00334   GetLock(dr_mtx_);
00335 
00336   my_handle = removeHandle(publication_handle, &pub_list_);
00337   if (!my_handle){
00338     // Handle doesn't exist, return FAIL
00339     ReleaseLock(dr_mtx_);
00340     return FAIL;
00341   }
00342 
00343   // Free structures
00344   delete my_handle;
00345 
00346   // Release the lock
00347   ReleaseLock(dr_mtx_);
00348 
00349   return OK;
00350 }

Here is the call graph for this function:

int DiffusionRouting::unsubscribe handle  subscription_handle  )  [virtual]
 

Implements NR.

Definition at line 270 of file dr.cc.

References dr_mtx_, FAIL, findHandle(), GetLock(), OK, ReleaseLock(), sub_list_, and HandleEntry::valid_.

00271 {
00272   HandleEntry *my_handle = NULL;
00273 
00274   // Get the lock first
00275   GetLock(dr_mtx_);
00276 
00277   my_handle = findHandle(subscription_handle, &sub_list_);
00278   if (!my_handle){
00279     // Handle doesn't exist, return FAIL
00280     ReleaseLock(dr_mtx_);
00281     return FAIL;
00282   }
00283 
00284   // Handle will be destroyed when next interest timeout happens
00285   my_handle->valid_ = false;
00286 
00287   // Release the lock
00288   ReleaseLock(dr_mtx_);
00289 
00290   return OK;
00291 }

Here is the call graph for this function:


Member Data Documentation

u_int16_t DiffusionRouting::agent_id_ [protected]
 

Definition at line 219 of file dr.hh.

Referenced by addFilter(), filterKeepaliveTimeout(), interestTimeout(), removeFilter(), send(), and sendMessage().

u_int16_t DiffusionRouting::diffusion_port_ [protected]
 

Definition at line 212 of file dr.hh.

Referenced by sendMessageToDiffusion().

pthread_mutex_t* DiffusionRouting::dr_mtx_ [protected]
 

Definition at line 202 of file dr.hh.

Referenced by addFilter(), filterKeepaliveTimeout(), interestTimeout(), processControlMessage(), processMessage(), publish(), removeFilter(), send(), subscribe(), unpublish(), and unsubscribe().

FilterList DiffusionRouting::filter_list_ [protected]
 

Definition at line 199 of file dr.hh.

Referenced by addFilter(), deleteFilter(), and findFilter().

DeviceList DiffusionRouting::in_devices_ [protected]
 

Definition at line 208 of file dr.hh.

Referenced by run().

DeviceList DiffusionRouting::local_out_devices_ [protected]
 

Definition at line 209 of file dr.hh.

Referenced by sendPacketToDiffusion().

int DiffusionRouting::next_handle_ [protected]
 

Definition at line 196 of file dr.hh.

Referenced by addFilter(), publish(), and subscribe().

int DiffusionRouting::pkt_count_ [protected]
 

Definition at line 213 of file dr.hh.

Referenced by addFilter(), filterKeepaliveTimeout(), interestTimeout(), removeFilter(), send(), and sendMessage().

HandleList DiffusionRouting::pub_list_ [protected]
 

Definition at line 197 of file dr.hh.

Referenced by publish(), send(), unpublish(), and ~DiffusionRouting().

int DiffusionRouting::random_id_ [protected]
 

Definition at line 214 of file dr.hh.

Referenced by addFilter(), filterKeepaliveTimeout(), interestTimeout(), removeFilter(), send(), and sendMessage().

HandleList DiffusionRouting::sub_list_ [protected]
 

Definition at line 198 of file dr.hh.

Referenced by interestTimeout(), processMessage(), subscribe(), unsubscribe(), and ~DiffusionRouting().

TimerManager* DiffusionRouting::timers_manager_ [protected]
 

Definition at line 205 of file dr.hh.

Referenced by addFilter(), addTimer(), removeTimer(), run(), and subscribe().


The documentation for this class was generated from the following files:
Generated on Tue Apr 20 12:39:14 2004 for NS2.26SourcesOriginal by doxygen 1.3.3