#include <dr.hh>
Inheritance diagram for DiffusionRouting:


|
|
Definition at line 302 of file nr.hh. Referenced by processControlMessage(). |
|
|
Definition at line 165 of file dr.cc. References DEBUG_ALWAYS, DEFAULT_DIFFUSION_PORT, DiffPrint(), GetRand(), GetTime(), and SetSeed().
00167 {
00168 struct timeval tv;
00169 DiffusionIO *device;
00170
00171 // Initialize basic stuff
00172 next_handle_ = 1;
00173 GetTime(&tv);
00174 SetSeed(&tv);
00175 pkt_count_ = GetRand();
00176 random_id_ = GetRand();
00177 agent_id_ = 0;
00178
00179 if (port == 0)
00180 port = DEFAULT_DIFFUSION_PORT;
00181
00182 diffusion_port_ = port;
00183
00184 // Initialize timer manager
00185 timers_manager_ = new TimerManager;
00186
00187 // Initialize input device
00188 #ifdef NS_DIFFUSION
00189 device = new NsLocal(da);
00190 local_out_devices_.push_back(device);
00191 #endif // NS_DIFFUSION
00192
00193 #ifdef UDP
00194 device = new UDPLocal(&agent_id_);
00195 in_devices_.push_back(device);
00196 local_out_devices_.push_back(device);
00197 #endif // UDP
00198
00199 // Print initialization message
00200 DiffPrint(DEBUG_ALWAYS,
00201 "Diffusion Routing Agent initializing... Agent Id = %d\n",
00202 agent_id_);
00203
00204 #ifdef USE_THREADS
00205 // Initialize Semaphores
00206 dr_mtx_ = new pthread_mutex_t;
00207 pthread_mutex_init(dr_mtx_, NULL);
00208 #endif // USE_THREADS
00209 }
|
Here is the call graph for this function:

|
|
Definition at line 211 of file dr.cc. References pub_list_, and sub_list_.
00212 {
00213 HandleList::iterator itr;
00214 HandleEntry *current;
00215
00216 // Delete all Handles
00217 for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){
00218 current = *itr;
00219 delete current;
00220 }
00221
00222 for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){
00223 current = *itr;
00224 delete current;
00225 }
00226 }
|
|
||||||||||||||||
|
Definition at line 431 of file dr.cc. References ADD_UPDATE_FILTER, TimerManager::addTimer(), agent_id_, CalculateSize(), FilterEntry::cb_, CONTROL, ControlMsgAttr, CopyAttrs(), Message::data_len_, DEBUG_ALWAYS, DiffPrint(), DIFFUSION_VERSION, dr_mtx_, FAIL, FilterEntry::filter_attrs_, FILTER_KEEPALIVE_DELAY, filter_list_, FILTER_MAX_PRIORITY, GetLock(), handle, FilterEntry::handle_, NRAttribute::IS, LOCALHOST_ADDR, Message::msg_attr_vec_, next_handle_, Message::num_attr_, pkt_count_, random_id_, ReleaseLock(), sendMessageToDiffusion(), and timers_manager_.
00433 {
00434 FilterEntry *filter_entry;
00435 NRAttrVec *attrs;
00436 NRAttribute *ctrl_msg_attr;
00437 ControlMessage *control_blob;
00438 Message *my_message;
00439 TimerCallback *timer_callback;
00440
00441 // Check parameters
00442 if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){
00443 DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !\n");
00444 return FAIL;
00445 }
00446
00447 // Get lock first
00448 GetLock(dr_mtx_);
00449
00450 // Create and Initialize the handle_entry structute
00451 filter_entry = new FilterEntry(next_handle_, priority, agent_id_);
00452 next_handle_++;
00453 filter_entry->cb_ = (FilterCallback *) cb;
00454 filter_list_.push_back(filter_entry);
00455
00456 // Copy attributes (keep them for matching later)
00457 filter_entry->filter_attrs_ = CopyAttrs(filter_attrs);
00458
00459 // Copy the attributes (and add the control attr)
00460 attrs = CopyAttrs(filter_attrs);
00461 control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00462 priority, filter_entry->handle_);
00463
00464 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00465 (void *)control_blob,
00466 sizeof(ControlMessage));
00467
00468 attrs->push_back(ctrl_msg_attr);
00469
00470 // Initialize message structure
00471 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00472 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00473 LOCALHOST_ADDR);
00474
00475 // Increment pkt_counter
00476 pkt_count_++;
00477
00478 // Add attributes to the message
00479 my_message->msg_attr_vec_ = attrs;
00480 my_message->num_attr_ = attrs->size();
00481 my_message->data_len_ = CalculateSize(attrs);
00482
00483 // Release the lock
00484 ReleaseLock(dr_mtx_);
00485
00486 // Send Packet
00487 sendMessageToDiffusion(my_message);
00488
00489 // Add keepalive timer to the event queue
00490 timer_callback = new FilterKeepaliveCallback(this, filter_entry);
00491 timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback);
00492
00493 // Delete message, attribute set and controlblob
00494 delete my_message;
00495 delete control_blob;
00496
00497 return filter_entry->handle_;
00498 }
|
Here is the call graph for this function:

|
||||||||||||||||
|
Definition at line 560 of file dr.cc. References addTimer(), and handle.
00561 {
00562 TimerCallback *callback;
00563
00564 callback = new OldAPITimer(cb, p);
00565
00566 return (addTimer(timeout, callback));
00567 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 555 of file dr.cc. References TimerManager::addTimer(), handle, and timers_manager_. Referenced by addTimer().
00556 {
00557 return (timers_manager_->addTimer(timeout, callback));
00558 }
|
Here is the call graph for this function:

|
|
Definition at line 1163 of file dr.cc. References NRAttribute::DATA_CLASS, NRAttribute::getOp(), NRSimpleAttribute< T >::getVal(), NRAttribute::GLOBAL_SCOPE, NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE, NRClassAttr, and NRScopeAttr. Referenced by publish().
01164 {
01165 NRSimpleAttribute<int> *nrclass = NULL;
01166 NRSimpleAttribute<int> *nrscope = NULL;
01167
01168 // We first try to locate both class and scope attributes
01169 nrclass = NRClassAttr.find(attrs);
01170 nrscope = NRScopeAttr.find(attrs);
01171
01172 // There must be a class attribute in the publication
01173 if (!nrclass)
01174 return false;
01175
01176 // In addition, the Diffusion Routing API requires the class
01177 // attribute to be set to "IS DATA_CLASS"
01178 if ((nrclass->getVal() != NRAttribute::DATA_CLASS) ||
01179 (nrclass->getOp() != NRAttribute::IS))
01180 return false;
01181
01182 if (nrscope){
01183 // Ok, so this publication has both class and scope attributes. We
01184 // now have to check if they comply to the Diffusion Routing API
01185 // semantics for publish
01186
01187 // Must check scope's operator. The API requires it to be "IS"
01188 if (nrscope->getOp() != NRAttribute::IS)
01189 return false;
01190
01191 // We accept both global and local scope data messages
01192 if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) ||
01193 (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE))
01194 return true;
01195
01196 // Just not to miss any case
01197 return false;
01198 }
01199
01200 // A publish without a scope attribute is fine, we will include a
01201 // default NODE_LOCAL_SCOPE attribute later
01202 return true;
01203 }
|
Here is the call graph for this function:

|
|
Definition at line 1205 of file dr.cc. References NRClassAttr, and NRScopeAttr. Referenced by send().
01206 {
01207 NRSimpleAttribute<int> *nrclass = NULL;
01208 NRSimpleAttribute<int> *nrscope = NULL;
01209
01210 // Currently only checks for Class and Scope attributes
01211 nrclass = NRClassAttr.find(attrs);
01212 nrscope = NRScopeAttr.find(attrs);
01213
01214 if (nrclass || nrscope)
01215 return false;
01216
01217 return true;
01218 }
|
|
|
Definition at line 1118 of file dr.cc. References NRAttribute::getOp(), NRSimpleAttribute< T >::getVal(), NRAttribute::GLOBAL_SCOPE, NRAttribute::INTEREST_CLASS, NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE, NRClassAttr, and NRScopeAttr. Referenced by subscribe().
01119 {
01120 NRSimpleAttribute<int> *nrclass = NULL;
01121 NRSimpleAttribute<int> *nrscope = NULL;
01122
01123 // We first try to locate both class and scope attributes
01124 nrclass = NRClassAttr.find(attrs);
01125 nrscope = NRScopeAttr.find(attrs);
01126
01127 // There must be a class attribute in subscriptions
01128 if (!nrclass)
01129 return false;
01130
01131 if (nrscope){
01132 // This subcription has both class and scope attribute. So, we
01133 // check if class/scope attributes comply with the Diffusion
01134 // Routing API
01135
01136 // Must check scope's operator. The API requires it to be "IS"
01137 if (nrscope->getOp() != NRAttribute::IS)
01138 return false;
01139
01140 // Ok, so first check if this is a global subscription
01141 if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) &&
01142 (nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01143 (nrclass->getOp() == NRAttribute::IS))
01144 return true;
01145
01146 // Check for local subscriptions
01147 if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01148 return true;
01149
01150 // Just to be sure we did not miss any case
01151 return false;
01152 }
01153
01154 // If there is no scope attribute, we will insert one later if this
01155 // subscription looks like a global subscription
01156 if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01157 (nrclass->getOp() == NRAttribute::IS))
01158 return true;
01159
01160 return false;
01161 }
|
Here is the call graph for this function:

|
|
Definition at line 123 of file dr.cc. References DEBUG_ALWAYS, DiffPrint(), and dr. Referenced by GeoRoutingFilter::GeoRoutingFilter(), GradientFilter::GradientFilter(), LogFilter::LogFilter(), PingReceiverApp::PingReceiverApp(), PingSenderApp::PingSenderApp(), PushReceiverApp::PushReceiverApp(), PushSenderApp::PushSenderApp(), SrcRtFilter::SrcRtFilter(), and TagFilter::TagFilter().
00124 {
00125 // Create Diffusion Routing Class
00126 if (dr)
00127 return dr;
00128
00129 dr = new DiffusionRouting(port);
00130
00131 #ifdef USE_THREADS
00132 int retval;
00133 pthread_t thread;
00134
00135 // Fork a thread for receiving Messages
00136 retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr);
00137
00138 if (retval){
00139 DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n");
00140 exit(-1);
00141 }
00142 #endif // USE_THREADS
00143
00144 return dr;
00145 }
|
Here is the call graph for this function:

|
|
Definition at line 1079 of file dr.cc. References filter_list_, and FilterEntry::handle_. Referenced by filterKeepaliveTimeout().
01080 {
01081 FilterList::iterator itr;
01082 FilterEntry *entry;
01083
01084 for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01085 entry = *itr;
01086 if (entry->handle_ == my_handle){
01087 filter_list_.erase(itr);
01088 return entry;
01089 }
01090 }
01091 return NULL;
01092 }
|
|
|
Definition at line 755 of file dr.cc. References run(), and WAIT_FOREVER.
00756 {
00757 run(true, WAIT_FOREVER);
00758 }
|
Here is the call graph for this function:

|
|
Definition at line 760 of file dr.cc. References run().
00761 {
00762 run(false, timeout);
00763 }
|
Here is the call graph for this function:

|
|
Definition at line 574 of file dr.cc. References ADD_UPDATE_FILTER, agent_id_, CalculateSize(), CONTROL, ControlMsgAttr, CopyAttrs(), Message::data_len_, DEBUG_ALWAYS, deleteFilter(), DiffPrint(), DIFFUSION_VERSION, dr_mtx_, FilterEntry::filter_attrs_, FILTER_KEEPALIVE_DELAY, GetLock(), FilterEntry::handle_, NRAttribute::IS, LOCALHOST_ADDR, Message::msg_attr_vec_, Message::num_attr_, pkt_count_, FilterEntry::priority_, random_id_, ReleaseLock(), sendMessageToDiffusion(), and FilterEntry::valid_. Referenced by FilterKeepaliveCallback::expire().
00575 {
00576 FilterEntry *my_entry = NULL;
00577 ControlMessage *control_blob;
00578 NRAttribute *ctrl_msg_attr;
00579 NRAttrVec *attrs;
00580 Message *my_message;
00581
00582 // Acquire lock first
00583 GetLock(dr_mtx_);
00584
00585 if (filter_entry->valid_){
00586 // Send keepalive
00587 control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00588 filter_entry->priority_,
00589 filter_entry->handle_);
00590
00591 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00592 (void *)control_blob,
00593 sizeof(ControlMessage));
00594
00595 attrs = CopyAttrs(filter_entry->filter_attrs_);
00596 attrs->push_back(ctrl_msg_attr);
00597
00598 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00599 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00600 LOCALHOST_ADDR);
00601
00602 // Increment pkt_counter
00603 pkt_count_++;
00604
00605 // Add attributes to the message
00606 my_message->msg_attr_vec_ = attrs;
00607 my_message->num_attr_ = attrs->size();
00608 my_message->data_len_ = CalculateSize(attrs);
00609
00610 // Send Message
00611 sendMessageToDiffusion(my_message);
00612
00613 delete my_message;
00614 delete control_blob;
00615
00616 // Release lock
00617 ReleaseLock(dr_mtx_);
00618
00619 // Reschedule another filter keepalive timer in event queue
00620 return (FILTER_KEEPALIVE_DELAY);
00621 }
00622 else{
00623 // Filter was removed
00624 my_entry = deleteFilter(filter_entry->handle_);
00625
00626 // We should have removed the correct handle
00627 if (my_entry != filter_entry){
00628 DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !\n");
00629 exit(-1);
00630 }
00631
00632 delete my_entry;
00633
00634 // Release lock
00635 ReleaseLock(dr_mtx_);
00636
00637 return -1;
00638 }
00639 }
|
Here is the call graph for this function:

|
|
Definition at line 1094 of file dr.cc. References filter_list_, and FilterEntry::handle_. Referenced by processControlMessage(), and removeFilter().
01095 {
01096 FilterList::iterator itr;
01097 FilterEntry *entry;
01098
01099 for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01100 entry = *itr;
01101 if (entry->handle_ == my_handle)
01102 return entry;
01103 }
01104 return NULL;
01105 }
|
|
||||||||||||
|
Definition at line 1066 of file dr.cc. References HandleEntry::hdl_. Referenced by send(), and unsubscribe().
01067 {
01068 HandleList::iterator itr;
01069 HandleEntry *entry;
01070
01071 for (itr = hl->begin(); itr != hl->end(); ++itr){
01072 entry = *itr;
01073 if (entry->hdl_ == my_handle)
01074 return entry;
01075 }
01076 return NULL;
01077 }
|
|
|
Definition at line 1107 of file dr.cc. References NRScopeAttr. Referenced by publish(), and subscribe().
01108 {
01109 NRAttribute *temp = NULL;
01110
01111 temp = NRScopeAttr.find(attrs);
01112 if (temp)
01113 return true;
01114
01115 return false;
01116 }
|
|
|
Definition at line 641 of file dr.cc. References agent_id_, HandleEntry::attrs_, CalculateSize(), CopyAttrs(), Message::data_len_, DEBUG_ALWAYS, DiffPrint(), DIFFUSION_VERSION, dr_mtx_, GetLock(), GetRand(), HandleEntry::hdl_, INTEREST, INTEREST_DELAY, INTEREST_JITTER, LOCALHOST_ADDR, Message::msg_attr_vec_, Message::num_attr_, pkt_count_, RAND_MAX, random_id_, ReleaseLock(), removeHandle(), sendMessageToDiffusion(), sub_list_, and HandleEntry::valid_. Referenced by InterestCallback::expire().
00642 {
00643 HandleEntry *my_handle = NULL;
00644 Message *my_message;
00645
00646 // Acquire lock first
00647 GetLock(dr_mtx_);
00648
00649 if (handle_entry->valid_){
00650 // Send the interest message if entry is still valid
00651 my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0,
00652 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00653 LOCALHOST_ADDR);
00654
00655 // Increment pkt_counter
00656 pkt_count_++;
00657
00658 // Add attributes to the message
00659 my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_);
00660 my_message->num_attr_ = handle_entry->attrs_->size();
00661 my_message->data_len_ = CalculateSize(handle_entry->attrs_);
00662
00663 // Send Packet
00664 sendMessageToDiffusion(my_message);
00665
00666 delete my_message;
00667
00668 // Release lock
00669 ReleaseLock(dr_mtx_);
00670
00671 // Reschedule this timer in the queue
00672 return (INTEREST_DELAY +
00673 (int) ((INTEREST_JITTER * (GetRand() * 1.0 / RAND_MAX)) -
00674 (INTEREST_JITTER / 2)));
00675 }
00676 else{
00677 // Interest was canceled. Just delete it from the handle_list
00678 my_handle = removeHandle(handle_entry->hdl_, &sub_list_);
00679
00680 // We should have removed the correct handle
00681 if (my_handle != handle_entry){
00682 DiffPrint(DEBUG_ALWAYS,
00683 "Error: interestTimeout: Handles should match !\n");
00684 exit(-1);
00685 }
00686
00687 delete my_handle;
00688
00689 // Release lock
00690 ReleaseLock(dr_mtx_);
00691
00692 // Delete timer from the queue
00693 return -1;
00694 }
00695 }
|
Here is the call graph for this function:

|
|
Definition at line 1220 of file dr.cc. References DEBUG_ALWAYS, DiffPrint(), NRSimpleAttribute< T >::getVal(), NRAttribute::NODE_LOCAL_SCOPE, NRClassAttr, and NRScopeAttr. Referenced by send().
01221 {
01222 NRSimpleAttribute<int> *nrclass = NULL;
01223 NRSimpleAttribute<int> *nrscope = NULL;
01224
01225 // Currently only checks for Class and Scope attributes
01226 nrclass = NRClassAttr.find(attrs);
01227 nrscope = NRScopeAttr.find(attrs);
01228
01229 // We should have both class and scope
01230 if (nrclass && nrscope){
01231 if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01232 return false;
01233 return true;
01234 }
01235 else{
01236 DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !\n");
01237 return false;
01238 }
01239 }
|
Here is the call graph for this function:

|
|
Definition at line 951 of file dr.cc. References FilterEntry::cb_, DEBUG_ALWAYS, DEBUG_IMPORTANT, DiffPrint(), dr_mtx_, FilterEntry::filter_attrs_, findFilter(), GetLock(), NRSimpleAttribute< T >::getVal(), NR::handle, RedirectMessage::handle_, RedirectMessage::last_hop_, Message::last_hop_, Message::msg_attr_vec_, RedirectMessage::msg_type_, Message::msg_type_, RedirectMessage::new_message_, Message::new_message_, RedirectMessage::next_hop_, Message::next_hop_, RedirectMessage::next_port_, Message::next_port_, RedirectMessage::num_attr_, Message::num_attr_, OneWayMatch(), OriginalHdrAttr, RedirectMessage::pkt_num_, Message::pkt_num_, RedirectMessage::rdm_id_, Message::rdm_id_, FilterCallback::recv(), ReleaseLock(), RedirectMessage::source_port_, Message::source_port_, and FilterEntry::valid_. Referenced by recvMessage().
00952 {
00953 NRSimpleAttribute<void *> *original_header_attr = NULL;
00954 NRAttrVec::iterator place = msg->msg_attr_vec_->begin();
00955 RedirectMessage *original_header;
00956 FilterEntry *entry;
00957 handle my_handle;
00958
00959 // Find the attribute containing the original packet header
00960 original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
00961 place, &place);
00962 if (!original_header_attr){
00963 DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !\n");
00964 return;
00965 }
00966
00967 // Restore original message header
00968 original_header = (RedirectMessage *) original_header_attr->getVal();
00969 my_handle = original_header->handle_;
00970 msg->msg_type_ = original_header->msg_type_;
00971 msg->source_port_ = original_header->source_port_;
00972 msg->pkt_num_ = original_header->pkt_num_;
00973 msg->rdm_id_ = original_header->rdm_id_;
00974 msg->next_hop_ = original_header->next_hop_;
00975 msg->last_hop_ = original_header->last_hop_;
00976 msg->num_attr_ = original_header->num_attr_;
00977 msg->new_message_ = original_header->new_message_;
00978 msg->next_port_ = original_header->next_port_;
00979
00980 // Delete attribute from the original set
00981 msg->msg_attr_vec_->erase(place);
00982 delete original_header_attr;
00983
00984 // Find the right callback
00985 GetLock(dr_mtx_);
00986
00987 entry = findFilter(my_handle);
00988 if (entry && entry->valid_){
00989 // Just to confirm
00990 if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){
00991 ReleaseLock(dr_mtx_);
00992 entry->cb_->recv(msg, my_handle);
00993 return;
00994 }
00995 else{
00996 DiffPrint(DEBUG_ALWAYS,
00997 "Warning: Filter doesn't match incoming message's attributes !\n");
00998 }
00999 }
01000 else{
01001 DiffPrint(DEBUG_IMPORTANT,
01002 "Report: Cannot find filter (possibly deleted ?)\n");
01003 }
01004
01005 ReleaseLock(dr_mtx_);
01006 }
|
Here is the call graph for this function:

|
|
Definition at line 1008 of file dr.cc. References HandleEntry::attrs_, CallbackEntry::cb_, HandleEntry::cb_, ClearAttrs(), CopyAttrs(), dr_mtx_, GetLock(), HandleEntry::hdl_, MatchAttrs(), Message::msg_attr_vec_, NR::Callback::recv(), ReleaseLock(), sub_list_, CallbackEntry::subscription_handle_, and HandleEntry::valid_. Referenced by recvMessage().
01009 {
01010 CallbackList cbl;
01011 CallbackEntry *aux;
01012 HandleList::iterator sub_itr;
01013 CallbackList::iterator cbl_itr;
01014 HandleEntry *entry;
01015 NRAttrVec *callback_attrs;
01016
01017 // First, acquire the lock
01018 GetLock(dr_mtx_);
01019
01020 for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){
01021 entry = *sub_itr;
01022 if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_)))
01023 if (entry->cb_){
01024 aux = new CallbackEntry(entry->cb_, entry->hdl_);
01025 cbl.push_back(aux);
01026 }
01027 }
01028
01029 // We can release the lock now
01030 ReleaseLock(dr_mtx_);
01031
01032 // Now we just call all callback functions
01033 for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){
01034 // Copy attributes
01035 callback_attrs = CopyAttrs(msg->msg_attr_vec_);
01036
01037 // Call app-specific callback function
01038 aux = *cbl_itr;
01039 aux->cb_->recv(callback_attrs, aux->subscription_handle_);
01040 delete aux;
01041
01042 // Clean up callback attributes
01043 ClearAttrs(callback_attrs);
01044 delete callback_attrs;
01045 }
01046
01047 // We are done
01048 cbl.clear();
01049 }
|
Here is the call graph for this function:

|
|
Implements NR. Definition at line 293 of file dr.cc. References HandleEntry::attrs_, checkPublication(), CopyAttrs(), DEBUG_ALWAYS, DiffPrint(), dr_mtx_, FAIL, GetLock(), handle, hasScope(), HandleEntry::hdl_, NRAttribute::IS, next_handle_, NRAttribute::NODE_LOCAL_SCOPE, NRScopeAttr, pub_list_, and ReleaseLock().
00294 {
00295 HandleEntry *my_handle;
00296 NRAttribute *scope_attr;
00297
00298 // Get the lock first
00299 GetLock(dr_mtx_);
00300
00301 // Check the published attributes
00302 if (!checkPublication(publish_attrs)){
00303 DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n");
00304 ReleaseLock(dr_mtx_);
00305 return FAIL;
00306 }
00307
00308 // Create and Initialize the handle_entry structute
00309 my_handle = new HandleEntry;
00310 my_handle->hdl_ = next_handle_;
00311 next_handle_++;
00312 pub_list_.push_back(my_handle);
00313
00314 // Copy the attributes
00315 my_handle->attrs_ = CopyAttrs(publish_attrs);
00316
00317 // For publications, scope is local if not specified
00318 if (!hasScope(publish_attrs)){
00319 scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE);
00320 my_handle->attrs_->push_back(scope_attr);
00321 }
00322
00323 // Release the lock
00324 ReleaseLock(dr_mtx_);
00325
00326 return my_handle->hdl_;
00327 }
|
Here is the call graph for this function:

|
|
Definition at line 934 of file dr.cc. References DIFFUSION_VERSION, LOCALHOST_ADDR, Message::msg_type_, Message::next_hop_, processControlMessage(), processMessage(), REDIRECT, and Message::version_. Referenced by recvPacket().
00935 {
00936 // Check version
00937 if (msg->version_ != DIFFUSION_VERSION)
00938 return;
00939
00940 // Check destination
00941 if (msg->next_hop_ != LOCALHOST_ADDR)
00942 return;
00943
00944 // Process the incoming message
00945 if (msg->msg_type_ == REDIRECT)
00946 processControlMessage(msg);
00947 else
00948 processMessage(msg);
00949 }
|
Here is the call graph for this function:

|
|
Definition at line 899 of file dr.cc. References DATA_LEN, DIFF_VER, HDR_DIFF, int32_t, int8_t, LAST_HOP, Message::msg_attr_vec_, MSG_TYPE, NEXT_HOP, NUM_ATTR, PKT_NUM, RDM_ID, recvMessage(), SRC_PORT, u_int16_t, and UnpackAttrs(). Referenced by run().
00900 {
00901 struct hdr_diff *dfh = HDR_DIFF(pkt);
00902 Message *rcv_message = NULL;
00903 int8_t version, msg_type;
00904 u_int16_t data_len, num_attr, source_port;
00905 int32_t pkt_num, rdm_id, next_hop, last_hop;
00906
00907 // Read header
00908 version = DIFF_VER(dfh);
00909 msg_type = MSG_TYPE(dfh);
00910 source_port = ntohs(SRC_PORT(dfh));
00911 pkt_num = ntohl(PKT_NUM(dfh));
00912 rdm_id = ntohl(RDM_ID(dfh));
00913 num_attr = ntohs(NUM_ATTR(dfh));
00914 next_hop = ntohl(NEXT_HOP(dfh));
00915 last_hop = ntohl(LAST_HOP(dfh));
00916 data_len = ntohs(DATA_LEN(dfh));
00917
00918 // Create a message structure from the incoming packet
00919 rcv_message = new Message(version, msg_type, source_port, data_len,
00920 num_attr, pkt_num, rdm_id, next_hop, last_hop);
00921
00922 // Read all attributes into the Message structure
00923 rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
00924
00925 // Process the incoming message
00926 recvMessage(rcv_message);
00927
00928 // We are done
00929 delete rcv_message;
00930 delete [] pkt;
00931 }
|
Here is the call graph for this function:

|
|
Definition at line 500 of file dr.cc. References agent_id_, CalculateSize(), CONTROL, ControlMsgAttr, Message::data_len_, DIFFUSION_VERSION, dr_mtx_, FAIL, findFilter(), GetLock(), FilterEntry::handle_, NRAttribute::IS, LOCALHOST_ADDR, Message::msg_attr_vec_, Message::num_attr_, OK, pkt_count_, random_id_, ReleaseLock(), REMOVE_FILTER, sendMessageToDiffusion(), and FilterEntry::valid_.
00501 {
00502 FilterEntry *filter_entry = NULL;
00503 ControlMessage *control_blob;
00504 NRAttribute *ctrl_msg_attr;
00505 NRAttrVec *attrs;
00506 Message *my_message;
00507
00508 // Get lock first
00509 GetLock(dr_mtx_);
00510
00511 filter_entry = findFilter(filter_handle);
00512 if (!filter_entry){
00513 // Handle doesn't exist, return FAIL
00514 ReleaseLock(dr_mtx_);
00515 return FAIL;
00516 }
00517
00518 control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0);
00519
00520 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00521 (void *)control_blob,
00522 sizeof(ControlMessage));
00523
00524 attrs = new NRAttrVec;
00525 attrs->push_back(ctrl_msg_attr);
00526
00527 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00528 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00529 LOCALHOST_ADDR);
00530
00531 // Increment pkt_counter
00532 pkt_count_++;
00533
00534 // Add attributes to the message
00535 my_message->msg_attr_vec_ = attrs;
00536 my_message->num_attr_ = attrs->size();
00537 my_message->data_len_ = CalculateSize(attrs);
00538
00539 // Handle will be destroyed when next keepalive timer happens
00540 filter_entry->valid_ = false;
00541
00542 // Send Packet
00543 sendMessageToDiffusion(my_message);
00544
00545 // Release the lock
00546 ReleaseLock(dr_mtx_);
00547
00548 // Delete message
00549 delete my_message;
00550 delete control_blob;
00551
00552 return OK;
00553 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 1051 of file dr.cc. References HandleEntry::hdl_. Referenced by interestTimeout(), and unpublish().
01052 {
01053 HandleList::iterator itr;
01054 HandleEntry *entry;
01055
01056 for (itr = hl->begin(); itr != hl->end(); ++itr){
01057 entry = *itr;
01058 if (entry->hdl_ == my_handle){
01059 hl->erase(itr);
01060 return entry;
01061 }
01062 }
01063 return NULL;
01064 }
|
|
|
Definition at line 569 of file dr.cc. References TimerManager::removeTimer(), and timers_manager_.
00570 {
00571 return (timers_manager_->removeTimer(hdl));
00572 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 765 of file dr.cc. References DEBUG_IMPORTANT, DiffPacket, DiffPrint(), TimerManager::executeAllExpiredTimers(), in_devices_, MAXVALUE, TimerManager::nextTimerTime(), POLLING_INTERVAL, recvPacket(), timers_manager_, TimevalCmp(), and WAIT_FOREVER. Referenced by doIt(), and doOne().
00766 {
00767 DeviceList::iterator itr;
00768 int status, max_sock, fd;
00769 bool flag;
00770 DiffPacket in_pkt;
00771 fd_set fds;
00772 struct timeval tv;
00773 struct timeval max_tv;
00774
00775 do{
00776 FD_ZERO(&fds);
00777 max_sock = 0;
00778
00779 // Set the maximum timeout value
00780 max_tv.tv_sec = (int) (max_timeout / 1000);
00781 max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000);
00782
00783 for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
00784 (*itr)->addInFDS(&fds, &max_sock);
00785 }
00786
00787 // Check for the next timer
00788 timers_manager_->nextTimerTime(&tv);
00789
00790 if (tv.tv_sec == MAXVALUE){
00791 // If we don't have any timers, we wait for POLLING_INTERVAL
00792 if (max_timeout == WAIT_FOREVER){
00793 tv.tv_sec = POLLING_INTERVAL;
00794 tv.tv_usec = 0;
00795 }
00796 else{
00797 tv = max_tv;
00798 }
00799 }
00800 else{
00801 if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){
00802 // max_timeout value is smaller than next timer's time, so we
00803 // use themax_timeout value instead
00804 tv = max_tv;
00805 }
00806 }
00807
00808 status = select(max_sock+1, &fds, NULL, NULL, &tv);
00809
00810 if (status == 0){
00811 // Process all timers that have expired
00812 timers_manager_->executeAllExpiredTimers();
00813 }
00814
00815 if (status > 0){
00816 do{
00817 flag = false;
00818 for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
00819 fd = (*itr)->checkInFDS(&fds);
00820 if (fd != 0){
00821 // Message waiting
00822 in_pkt = (*itr)->recvPacket(fd);
00823 recvPacket(in_pkt);
00824
00825 // Clear this fd
00826 FD_CLR(fd, &fds);
00827 status--;
00828 flag = true;
00829 }
00830 }
00831 } while ((status > 0) && (flag == true));
00832 }
00833 else
00834 if (status < 0){
00835 DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
00836 }
00837 } while (wait_condition);
00838 }
|
Here is the call graph for this function:

|
||||||||||||
|
Implements NR. Definition at line 352 of file dr.cc. References AddAttrs(), agent_id_, HandleEntry::attrs_, CalculateSize(), checkSend(), CopyAttrs(), DATA, Message::data_len_, DEBUG_ALWAYS, DiffPrint(), DIFFUSION_VERSION, dr_mtx_, EXPLORATORY_DATA, EXPLORATORY_DATA_DELAY, HandleEntry::exploratory_time_, FAIL, findHandle(), GetLock(), GetTime(), int8_t, isPushData(), LOCALHOST_ADDR, Message::msg_attr_vec_, Message::num_attr_, OK, pkt_count_, pub_list_, PUSH_EXPLORATORY_DATA, PUSH_EXPLORATORY_DELAY, random_id_, ReleaseLock(), sendMessageToDiffusion(), and TimevalCmp().
00354 {
00355 Message *my_message;
00356 HandleEntry *my_handle;
00357 int8_t send_message_type = DATA;
00358 struct timeval current_time;
00359
00360 // Get the lock first
00361 GetLock(dr_mtx_);
00362
00363 // Get attributes associated with handle
00364 my_handle = findHandle(publication_handle, &pub_list_);
00365 if (!my_handle){
00366 ReleaseLock(dr_mtx_);
00367 return FAIL;
00368 }
00369
00370 // Check the send attributes
00371 if (!checkSend(send_attrs)){
00372 DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the send attributes !\n");
00373 ReleaseLock(dr_mtx_);
00374 return FAIL;
00375 }
00376
00377 // Check if it is time to send another exploratory data message
00378 GetTime(¤t_time);
00379
00380 if (TimevalCmp(¤t_time, &(my_handle->exploratory_time_)) >= 0){
00381
00382 // Check if it is a push data message or a regular data message
00383 if (isPushData(my_handle->attrs_)){
00384 // Push data message
00385
00386 // Update time for the next push exploratory message
00387 GetTime(&(my_handle->exploratory_time_));
00388 my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY;
00389
00390 send_message_type = PUSH_EXPLORATORY_DATA;
00391 }
00392 else{
00393 // Regular data message
00394
00395 // Update time for the next exploratory message
00396 GetTime(&(my_handle->exploratory_time_));
00397 my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY;
00398
00399 send_message_type = EXPLORATORY_DATA;
00400 }
00401 }
00402
00403 // Initialize message structure
00404 my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_,
00405 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00406 LOCALHOST_ADDR);
00407 // Increment pkt_counter
00408 pkt_count_++;
00409
00410 // First, we duplicate the 'publish' attributes
00411 my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_);
00412
00413 // Now, we add the send attributes
00414 AddAttrs(my_message->msg_attr_vec_, send_attrs);
00415
00416 // Compute the total number and size of the joined attribute sets
00417 my_message->num_attr_ = my_message->msg_attr_vec_->size();
00418 my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_);
00419
00420 // Release the lock
00421 ReleaseLock(dr_mtx_);
00422
00423 // Send Packet
00424 sendMessageToDiffusion(my_message);
00425
00426 delete my_message;
00427
00428 return OK;
00429 }
|
Here is the call graph for this function:

|
||||||||||||||||
|
Definition at line 697 of file dr.cc. References agent_id_, CalculateSize(), CONTROL, ControlMsgAttr, CopyAttrs(), Message::data_len_, DIFFUSION_VERSION, FAIL, FILTER_KEEP_PRIORITY, FILTER_MIN_PRIORITY, NRAttribute::IS, Message::last_hop_, LOCALHOST_ADDR, Message::msg_attr_vec_, Message::msg_type_, Message::new_message_, Message::next_hop_, Message::next_port_, Message::num_attr_, OK, OriginalHdrAttr, pkt_count_, Message::pkt_num_, random_id_, Message::rdm_id_, SEND_MESSAGE, sendMessageToDiffusion(), and Message::source_port_.
00699 {
00700 RedirectMessage *original_hdr;
00701 NRAttribute *original_attr, *ctrl_msg_attr;
00702 ControlMessage *control_blob;
00703 NRAttrVec *attrs;
00704 Message *my_message;
00705
00706 if ((priority < FILTER_MIN_PRIORITY) ||
00707 (priority > FILTER_KEEP_PRIORITY))
00708 return FAIL;
00709
00710 // Create an attribute with the original header
00711 original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
00712 msg->source_port_, msg->data_len_,
00713 msg->num_attr_, msg->rdm_id_,
00714 msg->pkt_num_, msg->next_hop_,
00715 msg->last_hop_, 0,
00716 msg->next_port_);
00717
00718 original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr,
00719 sizeof(RedirectMessage));
00720
00721 // Create the attribute with the control message
00722 control_blob = new ControlMessage(SEND_MESSAGE, h, priority);
00723
00724 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob,
00725 sizeof(ControlMessage));
00726
00727 // Copy Attributes and add originalAttr and controlAttr
00728 attrs = CopyAttrs(msg->msg_attr_vec_);
00729 attrs->push_back(original_attr);
00730 attrs->push_back(ctrl_msg_attr);
00731
00732 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00733 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00734 LOCALHOST_ADDR);
00735
00736 // Increment pkt_counter
00737 pkt_count_++;
00738
00739 // Add attributes to the message
00740 my_message->msg_attr_vec_ = attrs;
00741 my_message->num_attr_ = attrs->size();
00742 my_message->data_len_ = CalculateSize(attrs);
00743
00744 // Send Packet
00745 sendMessageToDiffusion(my_message);
00746
00747 delete my_message;
00748 delete control_blob;
00749 delete original_hdr;
00750
00751 return OK;
00752 }
|
Here is the call graph for this function:

|
|
Definition at line 843 of file dr.cc. References AllocateBuffer(), DATA_LEN, DIFF_VER, DiffPacket, diffusion_port_, HDR_DIFF, LAST_HOP, Message::last_hop_, len, Message::msg_attr_vec_, MSG_TYPE, Message::msg_type_, NEXT_HOP, Message::next_hop_, NUM_ATTR, Message::num_attr_, PackAttrs(), PKT_NUM, Message::pkt_num_, RDM_ID, Message::rdm_id_, sendPacketToDiffusion(), Message::source_port_, SRC_PORT, and Message::version_. Referenced by addFilter(), filterKeepaliveTimeout(), interestTimeout(), removeFilter(), send(), and sendMessage().
00844 {
00845 DiffPacket out_pkt = NULL;
00846 struct hdr_diff *dfh;
00847 char *pos;
00848 int len;
00849
00850 out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00851 dfh = HDR_DIFF(out_pkt);
00852
00853 pos = (char *) out_pkt;
00854 pos = pos + sizeof(struct hdr_diff);
00855
00856 len = PackAttrs(msg->msg_attr_vec_, pos);
00857
00858 LAST_HOP(dfh) = htonl(msg->last_hop_);
00859 NEXT_HOP(dfh) = htonl(msg->next_hop_);
00860 DIFF_VER(dfh) = msg->version_;
00861 MSG_TYPE(dfh) = msg->msg_type_;
00862 DATA_LEN(dfh) = htons(len);
00863 PKT_NUM(dfh) = htonl(msg->pkt_num_);
00864 RDM_ID(dfh) = htonl(msg->rdm_id_);
00865 NUM_ATTR(dfh) = htons(msg->num_attr_);
00866 SRC_PORT(dfh) = htons(msg->source_port_);
00867
00868 sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_);
00869
00870 delete [] out_pkt;
00871 }
|
Here is the call graph for this function:

|
||||||||||||||||
|
Definition at line 889 of file dr.cc. References len, and local_out_devices_. Referenced by sendMessageToDiffusion().
00890 {
00891 DeviceList::iterator itr;
00892
00893 for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
00894 (*itr)->sendPacket(pkt, len, dst);
00895 }
00896 }
|
|
||||||||||||
|
Implements NR. Definition at line 228 of file dr.cc. References TimerManager::addTimer(), HandleEntry::attrs_, HandleEntry::cb_, checkSubscription(), CopyAttrs(), DEBUG_ALWAYS, DiffPrint(), dr_mtx_, FAIL, GetLock(), NRAttribute::GLOBAL_SCOPE, handle, hasScope(), HandleEntry::hdl_, NRAttribute::IS, next_handle_, NRScopeAttr, ReleaseLock(), SMALL_TIMEOUT, sub_list_, and timers_manager_.
00229 {
00230 HandleEntry *my_handle;
00231 NRAttribute *scope_attr;
00232 TimerCallback *timer_callback;
00233
00234 // Get lock first
00235 GetLock(dr_mtx_);
00236
00237 // Check the published attributes
00238 if (!checkSubscription(subscribe_attrs)){
00239 DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n");
00240 ReleaseLock(dr_mtx_);
00241 return FAIL;
00242 }
00243
00244 // Create and Initialize the handle_entry structute
00245 my_handle = new HandleEntry;
00246 my_handle->hdl_ = next_handle_;
00247 next_handle_++;
00248 my_handle->cb_ = (NR::Callback *) cb;
00249 sub_list_.push_back(my_handle);
00250
00251 // Copy the attributes
00252 my_handle->attrs_ = CopyAttrs(subscribe_attrs);
00253
00254 // For subscriptions, scope is global if not specified
00255 if (!hasScope(subscribe_attrs)){
00256 scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE);
00257 my_handle->attrs_->push_back(scope_attr);
00258 }
00259
00260 // Create Interest Timer and add it to the queue
00261 timer_callback = new InterestCallback(this, my_handle);
00262 timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback);
00263
00264 // Release lock
00265 ReleaseLock(dr_mtx_);
00266
00267 return my_handle->hdl_;
00268 }
|
Here is the call graph for this function:

|
|
Implements NR. Definition at line 329 of file dr.cc. References dr_mtx_, FAIL, GetLock(), OK, pub_list_, ReleaseLock(), and removeHandle().
00330 {
00331 HandleEntry *my_handle = NULL;
00332
00333 // Get the lock first
00334 GetLock(dr_mtx_);
00335
00336 my_handle = removeHandle(publication_handle, &pub_list_);
00337 if (!my_handle){
00338 // Handle doesn't exist, return FAIL
00339 ReleaseLock(dr_mtx_);
00340 return FAIL;
00341 }
00342
00343 // Free structures
00344 delete my_handle;
00345
00346 // Release the lock
00347 ReleaseLock(dr_mtx_);
00348
00349 return OK;
00350 }
|
Here is the call graph for this function:

|
|
Implements NR. Definition at line 270 of file dr.cc. References dr_mtx_, FAIL, findHandle(), GetLock(), OK, ReleaseLock(), sub_list_, and HandleEntry::valid_.
00271 {
00272 HandleEntry *my_handle = NULL;
00273
00274 // Get the lock first
00275 GetLock(dr_mtx_);
00276
00277 my_handle = findHandle(subscription_handle, &sub_list_);
00278 if (!my_handle){
00279 // Handle doesn't exist, return FAIL
00280 ReleaseLock(dr_mtx_);
00281 return FAIL;
00282 }
00283
00284 // Handle will be destroyed when next interest timeout happens
00285 my_handle->valid_ = false;
00286
00287 // Release the lock
00288 ReleaseLock(dr_mtx_);
00289
00290 return OK;
00291 }
|
Here is the call graph for this function:

|
|
Definition at line 219 of file dr.hh. Referenced by addFilter(), filterKeepaliveTimeout(), interestTimeout(), removeFilter(), send(), and sendMessage(). |
|
|
Definition at line 212 of file dr.hh. Referenced by sendMessageToDiffusion(). |
|
|
Definition at line 202 of file dr.hh. Referenced by addFilter(), filterKeepaliveTimeout(), interestTimeout(), processControlMessage(), processMessage(), publish(), removeFilter(), send(), subscribe(), unpublish(), and unsubscribe(). |
|
|
Definition at line 199 of file dr.hh. Referenced by addFilter(), deleteFilter(), and findFilter(). |
|
|
Definition at line 208 of file dr.hh. Referenced by run(). |
|
|
Definition at line 209 of file dr.hh. Referenced by sendPacketToDiffusion(). |
|
|
Definition at line 196 of file dr.hh. Referenced by addFilter(), publish(), and subscribe(). |
|
|
Definition at line 213 of file dr.hh. Referenced by addFilter(), filterKeepaliveTimeout(), interestTimeout(), removeFilter(), send(), and sendMessage(). |
|
|
Definition at line 197 of file dr.hh. Referenced by publish(), send(), unpublish(), and ~DiffusionRouting(). |
|
|
Definition at line 214 of file dr.hh. Referenced by addFilter(), filterKeepaliveTimeout(), interestTimeout(), removeFilter(), send(), and sendMessage(). |
|
|
Definition at line 198 of file dr.hh. Referenced by interestTimeout(), processMessage(), subscribe(), unsubscribe(), and ~DiffusionRouting(). |
|
|
Definition at line 205 of file dr.hh. Referenced by addFilter(), addTimer(), removeTimer(), run(), and subscribe(). |
1.3.3