Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

GradientFilter Class Reference

#include <gradient.hh>

Inheritance diagram for GradientFilter:

Inheritance graph
[legend]
Collaboration diagram for GradientFilter:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 GradientFilter (int argc, char **argv)
void run ()
virtual ~GradientFilter ()
void recv (Message *msg, handle h)
void messageTimeout (Message *msg)
void interestTimeout (Message *msg)
void gradientTimeout ()
void reinforcementTimeout ()
int subscriptionTimeout (NRAttrVec *attrs)

Protected Member Functions

handle setupFilter ()
RoutingEntryfindRoutingEntry (NRAttrVec *attrs)
void deleteRoutingEntry (RoutingEntry *routing_entry)
RoutingEntrymatchRoutingEntry (NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place)
AttributeEntryfindMatchingSubscription (RoutingEntry *routing_entry, NRAttrVec *attrs)
void updateGradient (RoutingEntry *routing_entry, int32_t last_hop, bool reinforced)
void updateAgent (RoutingEntry *routing_entry, u_int16_t source_port)
GradientEntryfindReinforcedGradients (GradientList *agents, GradientList::iterator start, GradientList::iterator *place)
GradientEntryfindReinforcedGradient (int32_t node_addr, RoutingEntry *routing_entry)
void deleteGradient (RoutingEntry *routing_entry, GradientEntry *gradient_entry)
void setReinforcementFlags (RoutingEntry *routing_entry, int32_t last_hop, int new_message)
void sendInterest (NRAttrVec *attrs, RoutingEntry *routing_entry)
void sendDisinterest (NRAttrVec *attrs, RoutingEntry *routing_entry)
void sendPositiveReinforcement (NRAttrVec *reinf_attrs, int32_t data_rdm_id, int32_t data_pkt_num, int32_t destination)
void forwardData (Message *msg, RoutingEntry *routing_entry, DataForwardingHistory *forwarding_history)
void forwardExploratoryData (Message *msg, RoutingEntry *routing_entry, DataForwardingHistory *forwarding_history)
void forwardPushExploratoryData (Message *msg, DataForwardingHistory *forwarding_history)
void processOldMessage (Message *msg)
void processNewMessage (Message *msg)
HashEntrygetHash (unsigned int pkt_num, unsigned int rdm_id)
void putHash (HashEntry *new_hash_entry, unsigned int pkt_num, unsigned int rdm_id)
void usage (char *s)
void parseCommandLine (int argc, char **argv)

Protected Attributes

handle filter_handle_
int pkt_count_
int random_id_
HashList hash_list_
Tcl_HashTable htable_
GradientFilterReceivefilter_callback_
RoutingTable routing_list_
NRdr_
u_int16_t diffusion_port_
char * config_file_

Constructor & Destructor Documentation

GradientFilter::GradientFilter int  argc,
char **  argv
 

Definition at line 1349 of file gradient.cc.

References agent, NR::createNR(), DEBUG_ALWAYS, DEBUG_IMPORTANT, DiffPrint(), DiffApp::diffusion_port_, DiffApp::dr_, filter_callback_, filter_handle_, GetRand(), GetTime(), GRADIENT_DELAY, htable_, DiffApp::parseCommandLine(), pkt_count_, random_id_, REINFORCEMENT_DELAY, SetSeed(), and setupFilter().

01350 {
01351 #endif // NS_DIFFUSION
01352   struct timeval tv;
01353   TimerCallback *reinforcement_timer, *gradient_timer;
01354 
01355   GetTime(&tv);
01356   SetSeed(&tv);
01357   pkt_count_ = GetRand();
01358   random_id_ = GetRand();
01359 
01360   // Create Diffusion Routing class
01361 #ifdef NS_DIFFUSION
01362   agent = (DiffAppAgent *)TclObject::lookup(diffrtg);
01363   dr_ = agent->dr();
01364 #else
01365   parseCommandLine(argc, argv);
01366   dr_ = NR::createNR(diffusion_port_);
01367 #endif // NS_DIFFUSION
01368 
01369   // Create callback classes and set up pointers
01370   filter_callback_ = new GradientFilterReceive(this);
01371 
01372   // Initialize Hashing structures
01373   Tcl_InitHashTable(&htable_, 2);
01374 
01375   // Set up the filter
01376   filter_handle_ = setupFilter();
01377 
01378   // Print filter information
01379   DiffPrint(DEBUG_IMPORTANT,
01380             "Gradient filter subscribed to *, received handle %d\n",
01381             filter_handle_);
01382 
01383   // Add timers for keeping state up-to-date
01384   gradient_timer = new GradientExpirationCheckTimer(this);
01385   ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer);
01386 
01387   reinforcement_timer = new ReinforcementCheckTimer(this);
01388   ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer);
01389 
01390   GetTime(&tv);
01391 
01392   DiffPrint(DEBUG_ALWAYS, "Gradient filter initialized at time %ld:%ld!\n",
01393             tv.tv_sec, tv.tv_usec);
01394 }

Here is the call graph for this function:

virtual GradientFilter::~GradientFilter  )  [inline, virtual]
 

Definition at line 251 of file gradient.hh.

00252   {
00253     // Nothing to do
00254   };


Member Function Documentation

void GradientFilter::deleteGradient RoutingEntry routing_entry,
GradientEntry gradient_entry
[protected]
 

Definition at line 778 of file gradient.cc.

References DEBUG_ALWAYS, DiffPrint(), and RoutingEntry::gradients_.

Referenced by processNewMessage().

00780 {
00781   GradientList::iterator gradient_itr;
00782   GradientEntry *current_entry;
00783 
00784   for (gradient_itr = routing_entry->gradients_.begin();
00785        gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00786     current_entry = *gradient_itr;
00787     if (current_entry == gradient_entry){
00788       gradient_itr = routing_entry->gradients_.erase(gradient_itr);
00789       delete gradient_entry;
00790       return;
00791     }
00792   }
00793   DiffPrint(DEBUG_ALWAYS,
00794             "Error: deleteGradient could not find gradient to delete !\n");
00795 }

Here is the call graph for this function:

void GradientFilter::deleteRoutingEntry RoutingEntry routing_entry  )  [protected]
 

Definition at line 301 of file gradient.cc.

References DEBUG_ALWAYS, DiffPrint(), and routing_list_.

00302 {
00303   RoutingTable::iterator routing_itr;
00304   RoutingEntry *current_entry;
00305 
00306   for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00307     current_entry = *routing_itr;
00308     if (current_entry == routing_entry){
00309       routing_itr = routing_list_.erase(routing_itr);
00310       delete routing_entry;
00311       return;
00312     }
00313   }
00314   DiffPrint(DEBUG_ALWAYS, "Error: deleteRoutingEntry could not find entry to delete !\n");
00315 }

Here is the call graph for this function:

AttributeEntry * GradientFilter::findMatchingSubscription RoutingEntry routing_entry,
NRAttrVec attrs
[protected]
 

Definition at line 345 of file gradient.cc.

References RoutingEntry::attr_list_, AttributeEntry::attrs_, and PerfectMatch().

Referenced by processNewMessage().

00347 {
00348   AttributeList::iterator attribute_itr;
00349   AttributeEntry *attribute_entry;
00350 
00351   for (attribute_itr = routing_entry->attr_list_.begin(); attribute_itr != routing_entry->attr_list_.end(); ++attribute_itr){
00352     attribute_entry = *attribute_itr;
00353     if (PerfectMatch(attribute_entry->attrs_, attrs))
00354       return attribute_entry;
00355   }
00356   return NULL;
00357 }

Here is the call graph for this function:

GradientEntry * GradientFilter::findReinforcedGradient int32_t  node_addr,
RoutingEntry routing_entry
[protected]
 

Definition at line 753 of file gradient.cc.

References findReinforcedGradients(), RoutingEntry::gradients_, and GradientEntry::node_addr_.

Referenced by processNewMessage().

00755 {
00756   GradientList::iterator gradient_itr;
00757   GradientEntry *gradient_entry;
00758 
00759   gradient_itr = routing_entry->gradients_.begin();
00760   gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00761                                            gradient_itr, &gradient_itr);
00762 
00763   if (gradient_entry){
00764     while(gradient_entry){
00765       if (gradient_entry->node_addr_ == node_addr)
00766         return gradient_entry;
00767 
00768       // This is not the gradient we are looking for, keep looking
00769       gradient_itr++;
00770       gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00771                                                gradient_itr, &gradient_itr);
00772     }
00773   }
00774 
00775   return NULL;
00776 }

Here is the call graph for this function:

GradientEntry * GradientFilter::findReinforcedGradients GradientList agents,
GradientList::iterator  start,
GradientList::iterator *  place
[protected]
 

Definition at line 735 of file gradient.cc.

References GradientEntry::reinforced_.

Referenced by findReinforcedGradient(), forwardData(), and processNewMessage().

00738 {
00739   GradientList::iterator gradient_itr;
00740   GradientEntry *gradient_entry;
00741 
00742   for (gradient_itr = start; gradient_itr != gradients->end(); ++gradient_itr){
00743     gradient_entry = *gradient_itr;
00744     if (gradient_entry->reinforced_){
00745       *place = gradient_itr;
00746       return gradient_entry;
00747     }
00748   }
00749 
00750   return NULL;
00751 }

RoutingEntry * GradientFilter::findRoutingEntry NRAttrVec attrs  )  [protected]
 

Definition at line 332 of file gradient.cc.

References RoutingEntry::attrs_, PerfectMatch(), and routing_list_.

Referenced by processNewMessage(), processOldMessage(), and subscriptionTimeout().

00333 {
00334   RoutingTable::iterator routing_itr;
00335   RoutingEntry *routing_entry;
00336 
00337   for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00338     routing_entry = *routing_itr;
00339     if (PerfectMatch(routing_entry->attrs_, attrs))
00340       return routing_entry;
00341   }
00342   return NULL;
00343 }

Here is the call graph for this function:

void GradientFilter::forwardData Message msg,
RoutingEntry routing_entry,
DataForwardingHistory forwarding_history
[protected]
 

Definition at line 597 of file gradient.cc.

References RoutingEntry::agents_, DataForwardingHistory::alreadyForwardedToLibrary(), DataForwardingHistory::alreadyForwardedToNetwork(), RoutingEntry::attrs_, CopyAttrs(), CopyMessage(), DEBUG_NO_DETAILS, DiffPrint(), DIFFUSION_VERSION, DiffApp::dr_, filter_handle_, findReinforcedGradients(), DataForwardingHistory::forwardingToLibrary(), DataForwardingHistory::forwardingToNetwork(), RoutingEntry::gradients_, Message::last_hop_, LOCALHOST_ADDR, Message::msg_attr_vec_, NEGATIVE_REINFORCEMENT, NEW_MESSAGE, Message::next_hop_, Message::next_port_, GradientEntry::node_addr_, pkt_count_, AgentEntry::port_, random_id_, and setReinforcementFlags().

Referenced by processNewMessage().

00599 {
00600   GradientList::iterator gradient_itr;
00601   AgentList::iterator agent_itr;
00602   GradientEntry *gradient_entry;
00603   AgentEntry *agent_entry;
00604   Message *sink_message, *negative_reinforcement_msg;
00605   bool has_sink = false;
00606 
00607   sink_message = CopyMessage(msg);
00608 
00609   // Step 1: Sink Processing
00610   for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00611     agent_entry = *agent_itr;
00612 
00613     has_sink = true;
00614 
00615     if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00616       // Forward DATA to local sinks
00617       sink_message->next_hop_ = LOCALHOST_ADDR;
00618       sink_message->next_port_ = agent_entry->port_;
00619 
00620       // Add agent to the forwarding list
00621       forwarding_history->forwardingToLibrary(agent_entry->port_);
00622 
00623       ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00624     }
00625   }
00626 
00627   delete sink_message;
00628 
00629   // Step 2: Intermediate Processing
00630 
00631   // Set reinforcement flags
00632   if (msg->last_hop_ != LOCALHOST_ADDR){
00633     setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00634   }
00635 
00636   // Forward DATA only to reinforced gradients
00637   gradient_itr = routing_entry->gradients_.begin();
00638   gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00639                                            gradient_itr, &gradient_itr);
00640 
00641   if (gradient_entry){
00642     while (gradient_entry){
00643 
00644       // Found reinforced gradient, forward data message to this
00645       // neighbor only if the messages comes from a different neighbor
00646       if (gradient_entry->node_addr_ != msg->last_hop_){
00647         msg->next_hop_ = gradient_entry->node_addr_;
00648 
00649         // Check if we have forwarded the message to this neighbor already
00650         if (!forwarding_history->alreadyForwardedToNetwork(msg->next_hop_)){
00651           DiffPrint(DEBUG_NO_DETAILS,
00652                     "Forwarding data using Reinforced Gradient to node %d !\n",
00653                     gradient_entry->node_addr_);
00654 
00655           ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00656 
00657           // Add the node to the forwarding history
00658           forwarding_history->forwardingToNetwork(msg->next_hop_);
00659         }
00660       }
00661 
00662       // Move to the next one
00663       gradient_itr++;
00664       gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00665                                                gradient_itr, &gradient_itr);
00666     }
00667   }
00668   else{
00669     // We could not find a reinforced path, so we send a negative
00670     // reinforcement to last_hop
00671     if ((!has_sink) && (msg->last_hop_ != LOCALHOST_ADDR)){
00672       negative_reinforcement_msg = new Message(DIFFUSION_VERSION,
00673                                                NEGATIVE_REINFORCEMENT,
00674                                                0, 0,
00675                                                routing_entry->attrs_->size(),
00676                                                pkt_count_,
00677                                                random_id_,
00678                                                msg->last_hop_,
00679                                                LOCALHOST_ADDR);
00680       negative_reinforcement_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00681 
00682       DiffPrint(DEBUG_NO_DETAILS,
00683                 "Sending Negative Reinforcement to node %d !\n",
00684                 msg->last_hop_);
00685 
00686       ((DiffusionRouting *)dr_)->sendMessage(negative_reinforcement_msg,
00687                                              filter_handle_);
00688 
00689       pkt_count_++;
00690       delete negative_reinforcement_msg;
00691     }
00692   }
00693 }

Here is the call graph for this function:

void GradientFilter::forwardExploratoryData Message msg,
RoutingEntry routing_entry,
DataForwardingHistory forwarding_history
[protected]
 

Definition at line 492 of file gradient.cc.

References RoutingEntry::agents_, DataForwardingHistory::alreadyForwardedToLibrary(), DataForwardingHistory::alreadyForwardedToNetwork(), DataForwardingHistory::alreadyReinforced(), RoutingEntry::attrs_, BROADCAST_ADDR, CopyMessage(), DiffApp::dr_, filter_handle_, DataForwardingHistory::forwardingToLibrary(), DataForwardingHistory::forwardingToNetwork(), GetRand(), RoutingEntry::gradients_, Message::last_hop_, LOCALHOST_ADDR, NEW_MESSAGE, Message::next_hop_, Message::next_port_, GradientEntry::node_addr_, Message::pkt_num_, AgentEntry::port_, putHash(), RAND_MAX, Message::rdm_id_, DataForwardingHistory::sendingReinforcement(), sendPositiveReinforcement(), and setReinforcementFlags().

Referenced by processNewMessage().

00495 {
00496 #ifdef USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00497   Message *data_msg;
00498   TimerCallback *data_timer;
00499 #else
00500   GradientList::iterator gradient_itr;
00501   GradientEntry *gradient_entry;
00502 #endif // USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00503   AgentList::iterator agent_itr;
00504   AgentEntry *agent_entry;
00505   Message *sink_message;
00506   unsigned int key[2];
00507   HashEntry *hash_entry;
00508 
00509   sink_message = CopyMessage(msg);
00510 
00511   // Step 1: Sink Processing
00512   for (agent_itr = routing_entry->agents_.begin();
00513        agent_itr != routing_entry->agents_.end(); ++agent_itr){
00514     agent_entry = *agent_itr;
00515 
00516     if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00517       // Forward the data message to local sinks
00518       sink_message->next_hop_ = LOCALHOST_ADDR;
00519       sink_message->next_port_ = agent_entry->port_;
00520 
00521       // Add agent to the forwarding list
00522       forwarding_history->forwardingToLibrary(agent_entry->port_);
00523 
00524       ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00525     }
00526   }
00527 
00528   delete sink_message;
00529 
00530   // Step 1A: Reinforcement Processing
00531   if ((!forwarding_history->alreadyReinforced()) &&
00532       (routing_entry->agents_.size() > 0) &&
00533       (msg->last_hop_ != LOCALHOST_ADDR)){
00534     // Send reinforcement to 'last_hop'
00535     sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00536                               msg->pkt_num_, msg->last_hop_);
00537     // Record reinforcement in the forwarding history so we do it only
00538     // once per received data message
00539     forwarding_history->sendingReinforcement();
00540   }
00541 
00542   // Step 2: Intermediate Processing
00543 
00544   // Set reinforcement flags
00545   if (msg->last_hop_ != LOCALHOST_ADDR){
00546     setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00547   }
00548 
00549   // Add message information to the hash table
00550   if (msg->last_hop_ != LOCALHOST_ADDR){
00551     key[0] = msg->pkt_num_;
00552     key[1] = msg->rdm_id_;
00553 
00554     hash_entry = new HashEntry(msg->last_hop_);
00555 
00556     putHash(hash_entry, key[0], key[1]);
00557   }
00558 
00559   // Forward the EXPLORATORY message
00560 #ifdef USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00561   if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00562     if (routing_entry->gradients_.size() > 0){
00563       // Broadcast DATA message
00564       data_msg = CopyMessage(msg);
00565       data_msg->next_hop_ = BROADCAST_ADDR;
00566 
00567       // Add to the forwarding history
00568       forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00569 
00570       data_timer = new MessageSendTimer(this, data_msg);
00571 
00572       // Add timer for forwarding the data packet
00573       ((DiffusionRouting *)dr_)->addTimer(DATA_FORWARD_DELAY +
00574                                           (int) ((DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (DATA_FORWARD_JITTER / 2))),
00575                                           data_timer);
00576     }
00577   }
00578 #else
00579   // Forward DATA to all output gradients
00580   for (gradient_itr = routing_entry->gradients_.begin();
00581        gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00582 
00583     gradient_entry = *gradient_itr;
00584 
00585     // Check forwarding history
00586     if (!forwarding_history->alreadyForwardedToNetwork(gradient_entry->node_addr_)){
00587       msg->next_hop_ = gradient_entry->node_addr_;
00588       ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00589 
00590       // Add to the forwarding history
00591       forwarding_history->forwardingToNetwork(gradient_entry->node_addr_);
00592     }
00593   }
00594 #endif // USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00595 }

Here is the call graph for this function:

void GradientFilter::forwardPushExploratoryData Message msg,
DataForwardingHistory forwarding_history
[protected]
 

Definition at line 405 of file gradient.cc.

References RoutingEntry::agents_, DataForwardingHistory::alreadyForwardedToLibrary(), DataForwardingHistory::alreadyForwardedToNetwork(), DataForwardingHistory::alreadyReinforced(), RoutingEntry::attrs_, BROADCAST_ADDR, CopyMessage(), DiffApp::dr_, filter_handle_, DataForwardingHistory::forwardingToLibrary(), DataForwardingHistory::forwardingToNetwork(), GetRand(), Message::last_hop_, LOCALHOST_ADDR, matchRoutingEntry(), Message::msg_attr_vec_, Message::next_hop_, Message::next_port_, Message::pkt_num_, AgentEntry::port_, putHash(), RAND_MAX, Message::rdm_id_, routing_list_, DataForwardingHistory::sendingReinforcement(), and sendPositiveReinforcement().

Referenced by processNewMessage().

00407 {
00408   RoutingTable::iterator routing_itr;
00409   RoutingEntry *routing_entry;
00410   AgentList::iterator agent_itr;
00411   AgentEntry *agent_entry;
00412   Message *data_msg, *sink_message;
00413   TimerCallback *data_timer;
00414   unsigned int key[2];
00415   HashEntry *hash_entry;
00416 
00417   // Sink processing
00418   routing_itr = routing_list_.begin();
00419   routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00420                                     &routing_itr);
00421 
00422   sink_message = CopyMessage(msg);
00423 
00424   while (routing_entry){
00425 
00426     // Forward message to all local sinks
00427     for (agent_itr = routing_entry->agents_.begin();
00428          agent_itr != routing_entry->agents_.end(); ++agent_itr){
00429       agent_entry = *agent_itr;
00430 
00431       if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00432         // Send DATA message to local sinks
00433         sink_message->next_hop_ = LOCALHOST_ADDR;
00434         sink_message->next_port_ = agent_entry->port_;
00435 
00436         ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00437 
00438         // Add agent to the forwarding history
00439         forwarding_history->forwardingToLibrary(agent_entry->port_);
00440       }
00441     }
00442 
00443     if ((!forwarding_history->alreadyReinforced()) &&
00444         (routing_entry->agents_.size() > 0) &&
00445         (msg->last_hop_ != LOCALHOST_ADDR)){
00446       // Send a positive reinforcement if we have sinks
00447       sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00448                                 msg->pkt_num_, msg->last_hop_);
00449       // Record reinforcement in the forwarding history so we do it
00450       // only once per received data message
00451       forwarding_history->sendingReinforcement();
00452     }
00453 
00454     // Look for other matching data types
00455     routing_itr++;
00456     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00457                                       &routing_itr);
00458   }
00459 
00460   // Delete sink_message after sink processing
00461   delete sink_message;
00462 
00463   // Intermediate node processing
00464 
00465   // Add message information to the hash table
00466   if (msg->last_hop_ != LOCALHOST_ADDR){
00467     key[0] = msg->pkt_num_;
00468     key[1] = msg->rdm_id_;
00469 
00470     hash_entry = new HashEntry(msg->last_hop_);
00471 
00472     putHash(hash_entry, key[0], key[1]);
00473   }
00474 
00475   // Rebroadcast the exploratory push data message
00476   if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00477     data_msg = CopyMessage(msg);
00478     data_msg->next_hop_ = BROADCAST_ADDR;
00479 
00480     data_timer = new MessageSendTimer(this, data_msg);
00481 
00482     // Add data timer to the queue
00483     ((DiffusionRouting *)dr_)->addTimer(PUSH_DATA_FORWARD_DELAY +
00484                                         (int) ((PUSH_DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (PUSH_DATA_FORWARD_JITTER / 2))),
00485                                         data_timer);
00486 
00487     // Add broadcast information to forwarding history
00488     forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00489   }
00490 }

Here is the call graph for this function:

HashEntry * GradientFilter::getHash unsigned int  pkt_num,
unsigned int  rdm_id
[protected]
 

Definition at line 1262 of file gradient.cc.

References htable_.

Referenced by processNewMessage().

01264 {
01265    unsigned int key[2];
01266    
01267    key[0] = pkt_num;
01268    key[1] = rdm_id;
01269    
01270    Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
01271    
01272    if (entryPtr == NULL)
01273       return NULL;
01274    
01275    return ((HashEntry *) Tcl_GetHashValue(entryPtr));
01276 }

void GradientFilter::gradientTimeout  ) 
 

Definition at line 125 of file gradient.cc.

References RoutingEntry::agents_, DEBUG_DETAILS, DEBUG_MORE_DETAILS, DEBUG_NO_DETAILS, DiffPrint(), GetTime(), GRADIENT_TIMEOUT, RoutingEntry::gradients_, GradientEntry::node_addr_, AgentEntry::port_, routing_list_, AgentEntry::tv_, and GradientEntry::tv_.

Referenced by GradientExpirationCheckTimer::expire().

00126 {
00127   RoutingTable::iterator routing_itr;
00128   GradientList::iterator grad_itr;
00129   AgentList::iterator agent_itr;
00130   RoutingEntry *routing_entry;
00131   GradientEntry *gradient_entry;
00132   AgentEntry *agent_entry;
00133   struct timeval tmv;
00134 
00135   DiffPrint(DEBUG_MORE_DETAILS, "Gradient Timeout !\n");
00136 
00137   GetTime(&tmv);
00138 
00139   routing_itr = routing_list_.begin();
00140 
00141   while (routing_itr != routing_list_.end()){
00142     routing_entry = *routing_itr;
00143 
00144     // Step 1: Delete expired gradients
00145     grad_itr = routing_entry->gradients_.begin();
00146     while (grad_itr != routing_entry->gradients_.end()){
00147       gradient_entry = *grad_itr;
00148       if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00149 
00150         DiffPrint(DEBUG_NO_DETAILS, "Deleting Gradient to node %d !\n",
00151                   gradient_entry->node_addr_);
00152 
00153         grad_itr = routing_entry->gradients_.erase(grad_itr);
00154         delete gradient_entry;
00155       }
00156       else{
00157         grad_itr++;
00158       }
00159     }
00160 
00161     // Step 2: Remove non-active agents
00162     agent_itr = routing_entry->agents_.begin();
00163     while (agent_itr != routing_entry->agents_.end()){
00164       agent_entry = *agent_itr;
00165       if (tmv.tv_sec > (agent_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00166 
00167         DiffPrint(DEBUG_NO_DETAILS,
00168                   "Deleting Gradient to agent %d !\n", agent_entry->port_);
00169 
00170         agent_itr = routing_entry->agents_.erase(agent_itr);
00171         delete agent_entry;
00172       }
00173       else{
00174         agent_itr++;
00175       }
00176     }
00177 
00178     // Remove the Routing Entry if no gradients and no agents
00179     if ((routing_entry->gradients_.size() == 0) &&
00180         (routing_entry->agents_.size() == 0)){
00181       // Deleting Routing Entry
00182       DiffPrint(DEBUG_DETAILS,
00183                 "Nothing left for this data type, cleaning up !\n");
00184       routing_itr = routing_list_.erase(routing_itr);
00185       delete routing_entry;
00186     }
00187     else{
00188       routing_itr++;
00189     }
00190   }
00191 }

Here is the call graph for this function:

void GradientFilter::interestTimeout Message msg  ) 
 

Definition at line 108 of file gradient.cc.

References BROADCAST_ADDR, DEBUG_MORE_DETAILS, DiffPrint(), DiffApp::dr_, filter_handle_, Message::last_hop_, LOCALHOST_ADDR, and Message::next_hop_.

Referenced by InterestForwardTimer::expire().

00109 {
00110   DiffPrint(DEBUG_MORE_DETAILS, "Interest Timeout !\n");
00111 
00112   msg->last_hop_ = LOCALHOST_ADDR;
00113   msg->next_hop_ = BROADCAST_ADDR;
00114  
00115   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00116 }

Here is the call graph for this function:

RoutingEntry * GradientFilter::matchRoutingEntry NRAttrVec attrs,
RoutingTable::iterator  start,
RoutingTable::iterator *  place
[protected]
 

Definition at line 317 of file gradient.cc.

References RoutingEntry::attrs_, MatchAttrs(), and routing_list_.

Referenced by forwardPushExploratoryData(), processNewMessage(), and processOldMessage().

00318 {
00319   RoutingTable::iterator routing_itr;
00320   RoutingEntry *routing_entry;
00321 
00322   for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){
00323     routing_entry = *routing_itr;
00324     if (MatchAttrs(routing_entry->attrs_, attrs)){
00325       *place = routing_itr;
00326       return routing_entry;
00327     }
00328   }
00329   return NULL;
00330 }

Here is the call graph for this function:

void GradientFilter::messageTimeout Message msg  ) 
 

Definition at line 118 of file gradient.cc.

References DEBUG_MORE_DETAILS, DiffPrint(), DiffApp::dr_, and filter_handle_.

Referenced by MessageSendTimer::expire().

00119 {
00120   DiffPrint(DEBUG_MORE_DETAILS, "Message Timeout !\n");
00121 
00122   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00123 }

Here is the call graph for this function:

void DiffApp::parseCommandLine int  argc,
char **  argv
[protected, inherited]
 

Definition at line 59 of file diffapp.cc.

References DiffApp::config_file_, DEBUG_ALWAYS, DEFAULT_DIFFUSION_PORT, DiffPrint(), DiffApp::diffusion_port_, global_debug_level, optarg, u_int16_t, and DiffApp::usage().

Referenced by GeoRoutingFilter::GeoRoutingFilter(), GradientFilter(), PingSenderApp::PingSenderApp(), PushSenderApp::PushSenderApp(), and SrcRtFilter::SrcRtFilter().

00060 {
00061   u_int16_t diff_port = DEFAULT_DIFFUSION_PORT;
00062   int debug_level;
00063   int opt;
00064 
00065   config_file_ = NULL;
00066   opterr = 0;
00067 
00068   while (1){
00069     opt = getopt(argc, argv, "f:hd:p:");
00070     switch (opt){
00071 
00072     case 'p':
00073 
00074       diff_port = (u_int16_t) atoi(optarg);
00075       if ((diff_port < 1024) || (diff_port >= 65535)){
00076         DiffPrint(DEBUG_ALWAYS, "Error: Diffusion port must be between 1024 and 65535 !\n");
00077         exit(-1);
00078       }
00079 
00080       break;
00081 
00082     case 'h':
00083 
00084       usage(argv[0]);
00085 
00086       break;
00087 
00088     case 'd':
00089 
00090       debug_level = atoi(optarg);
00091 
00092       if (debug_level < 1 || debug_level > 10){
00093         DiffPrint(DEBUG_ALWAYS, "Error: Debug level outside range or missing !\n");
00094         usage(argv[0]);
00095       }
00096 
00097       global_debug_level = debug_level;
00098 
00099       break;
00100 
00101     case 'f':
00102 
00103       if (!strncasecmp(optarg, "-", 1)){
00104         DiffPrint(DEBUG_ALWAYS, "Error: Parameter missing !\n");
00105         usage(argv[0]);
00106       }
00107 
00108       config_file_ = strdup(optarg);
00109 
00110       break;
00111 
00112     case '?':
00113 
00114       DiffPrint(DEBUG_ALWAYS,
00115                 "Error: %c isn't a valid option or its parameter is missing !\n", optopt);
00116       usage(argv[0]);
00117 
00118       break;
00119 
00120     case ':':
00121 
00122       DiffPrint(DEBUG_ALWAYS, "Parameter missing !\n");
00123       usage(argv[0]);
00124 
00125       break;
00126 
00127     }
00128 
00129     if (opt == -1)
00130       break;
00131   }
00132 
00133   diffusion_port_ = diff_port;
00134 }

Here is the call graph for this function:

void GradientFilter::processNewMessage Message msg  )  [protected]
 

Definition at line 971 of file gradient.cc.

References RoutingEntry::attr_list_, AttributeEntry::attrs_, RoutingEntry::attrs_, BROADCAST_ADDR, CopyAttrs(), CopyMessage(), DATA, DEBUG_ALWAYS, DEBUG_NO_DETAILS, deleteGradient(), DiffPrint(), DIFFUSION_VERSION, DiffApp::dr_, EXPLORATORY_DATA, filter_handle_, findMatchingSubscription(), findReinforcedGradient(), findReinforcedGradients(), findRoutingEntry(), forwardData(), forwardExploratoryData(), forwardPushExploratoryData(), getHash(), NRAttribute::getOp(), GetRand(), GetTime(), NRSimpleAttribute< T >::getVal(), NRAttribute::GLOBAL_SCOPE, RoutingEntry::gradients_, INTEREST, NRAttribute::INTEREST_CLASS, NRAttribute::IS, HashEntry::last_hop_, Message::last_hop_, LOCALHOST_ADDR, matchRoutingEntry(), Message::msg_attr_vec_, Message::msg_type_, NEGATIVE_REINFORCEMENT, Message::next_hop_, NRAttribute::NODE_LOCAL_SCOPE, NRClassAttr, NRScopeAttr, pkt_count_, ReinforcementBlob::pkt_num_, POSITIVE_REINFORCEMENT, PUSH_EXPLORATORY_DATA, RAND_MAX, random_id_, ReinforcementBlob::rdm_id_, ReinforcementAttr, routing_list_, sendInterest(), Message::source_port_, SUBSCRIPTION_DELAY, AttributeEntry::tv_, updateAgent(), and updateGradient().

Referenced by recv().

00972 {
00973   NRSimpleAttribute<void *> *reinforcement_attr = NULL;
00974   DataForwardingHistory *forwarding_history;
00975   NRSimpleAttribute<int> *nrclass = NULL;
00976   NRSimpleAttribute<int> *nrscope = NULL;
00977   ReinforcementBlob *reinforcement_blob;
00978   RoutingTable::iterator routing_itr;
00979   RoutingEntry *routing_entry;
00980   GradientList::iterator gradient_itr;
00981   GradientEntry *gradient_entry;
00982   NRAttrVec::iterator place;
00983   HashEntry *hash_entry;
00984   AttributeEntry *attribute_entry;
00985   Message *my_msg;
00986   TimerCallback *interest_timer, *subscription_timer;
00987   unsigned int key[2];
00988   bool new_data_type = false;
00989 
00990   switch (msg->msg_type_){
00991 
00992   case INTEREST:
00993 
00994     DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n");
00995 
00996     nrclass = NRClassAttr.find(msg->msg_attr_vec_);
00997     nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
00998 
00999     if (!nrclass || !nrscope){
01000       DiffPrint(DEBUG_ALWAYS,
01001                 "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01002       return;
01003     }
01004 
01005     // Step 1: Look for the same data type
01006     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01007 
01008     if (!routing_entry){
01009       // Create a new routing entry for this data type
01010       routing_entry = new RoutingEntry;
01011       routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01012       routing_list_.push_back(routing_entry);
01013       new_data_type = true;
01014     }
01015 
01016     if (msg->last_hop_ == LOCALHOST_ADDR){
01017       // From local agent
01018       updateAgent(routing_entry, msg->source_port_);
01019     }
01020     else{
01021       // From outside, we just add the new gradient
01022       updateGradient(routing_entry, msg->last_hop_, false);
01023     }
01024 
01025     if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01026         (nrclass->getOp() == NRAttribute::IS)){
01027 
01028       // Global interest messages should always be forwarded
01029       if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){
01030 
01031         interest_timer = new InterestForwardTimer(this, CopyMessage(msg));
01032 
01033         ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY +
01034                                             (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))),
01035                                             interest_timer);
01036       }
01037     }
01038     else{
01039       if ((nrclass->getOp() != NRAttribute::IS) &&
01040           (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) &&
01041           (new_data_type)){
01042 
01043         subscription_timer = new SubscriptionExpirationTimer(this,
01044                                                              CopyAttrs(msg->msg_attr_vec_));
01045         
01046         ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY +
01047                                             (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)),
01048                                             subscription_timer);
01049       }
01050 
01051       // Subscriptions don't have to match other subscriptions
01052       break;
01053     }
01054 
01055     // Step 2: Match other routing tables
01056     routing_itr = routing_list_.begin();
01057     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01058                                       &routing_itr);
01059 
01060     while (routing_entry){
01061       // Got a match
01062       attribute_entry = findMatchingSubscription(routing_entry,
01063                                                  msg->msg_attr_vec_);
01064 
01065       // Do we already have this subscription
01066       if (attribute_entry){
01067         GetTime(&(attribute_entry->tv_));
01068       }
01069       else{
01070         // Create a new attribute entry, add it to the attribute list
01071         // and send an interest message to the local agent
01072         attribute_entry = new AttributeEntry(CopyAttrs(msg->msg_attr_vec_));
01073         routing_entry->attr_list_.push_back(attribute_entry);
01074         sendInterest(attribute_entry->attrs_, routing_entry);
01075       }
01076       // Move to the next RoutingEntry
01077       routing_itr++;
01078       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01079                                         &routing_itr);
01080     }
01081 
01082       break;
01083 
01084   case DATA:
01085 
01086     DiffPrint(DEBUG_NO_DETAILS, "Received Data !\n");
01087 
01088     // Create data message forwarding cache
01089     forwarding_history = new DataForwardingHistory;
01090 
01091     // Find the correct routing entry
01092     routing_itr = routing_list_.begin();
01093     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01094                                       &routing_itr);
01095 
01096     while (routing_entry){
01097       forwardData(msg, routing_entry, forwarding_history);
01098       routing_itr++;
01099       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01100                                         &routing_itr);
01101     }
01102 
01103     delete forwarding_history;
01104 
01105     break;
01106 
01107   case EXPLORATORY_DATA:
01108 
01109     DiffPrint(DEBUG_NO_DETAILS, "Received Exploratory Data !\n");
01110 
01111     // Create data message forwarding cache
01112     forwarding_history = new DataForwardingHistory;
01113 
01114     // Find the correct routing entry
01115     routing_itr = routing_list_.begin();
01116     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01117                                       &routing_itr);
01118 
01119     while (routing_entry){
01120       forwardExploratoryData(msg, routing_entry, forwarding_history);
01121       routing_itr++;
01122       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01123                                         &routing_itr);
01124     }
01125 
01126     // Delete data forwarding cache
01127     delete forwarding_history;
01128 
01129     break;
01130 
01131   case PUSH_EXPLORATORY_DATA:
01132 
01133     DiffPrint(DEBUG_NO_DETAILS, "Received Push Exploratory Data !\n");
01134 
01135     // Create data message forwarding cache
01136     forwarding_history = new DataForwardingHistory;
01137 
01138     // Forward data message
01139     forwardPushExploratoryData(msg, forwarding_history);
01140 
01141     // Delete data forwarding cache
01142     delete forwarding_history;
01143 
01144     break;
01145 
01146   case POSITIVE_REINFORCEMENT:
01147 
01148     DiffPrint(DEBUG_NO_DETAILS, "Received a Positive Reinforcement !\n");
01149 
01150     // Step 0: Look for reinforcement attribute
01151     place = msg->msg_attr_vec_->begin();
01152     reinforcement_attr = ReinforcementAttr.find_from(msg->msg_attr_vec_,
01153                                                      place, &place);
01154     if (!reinforcement_attr){
01155       DiffPrint(DEBUG_ALWAYS,
01156                 "Error: Received an invalid Positive Reinforcement message !\n");
01157       return;
01158     }
01159 
01160     // Step 1: Extract reinforcement blob from message and look for an
01161     // entry in our hash table
01162     reinforcement_blob = (ReinforcementBlob *) reinforcement_attr->getVal();
01163 
01164     key[0] = reinforcement_blob->pkt_num_;
01165     key[1] = reinforcement_blob->rdm_id_;
01166 
01167     hash_entry = getHash(key[0], key[1]);
01168 
01169     // Step 2: Remove the reinforcement attribute from the message
01170     msg->msg_attr_vec_->erase(place);
01171 
01172     // Step 3: Find a routing entry that matches this message
01173     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01174 
01175     if (!routing_entry){
01176       // So, if we do not know about this data type, this must be a
01177       // reinforcement message to a PUSHED_EXPLORATORY_DATA message
01178 
01179       // Check for class/scope (all interest message should have it)
01180       nrclass = NRClassAttr.find(msg->msg_attr_vec_);
01181       nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
01182 
01183       if (!nrclass || !nrscope){
01184         DiffPrint(DEBUG_ALWAYS,
01185                   "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01186         return;
01187       }
01188 
01189       // Create new Routing Entry
01190       routing_entry = new RoutingEntry;
01191       routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01192       routing_list_.push_back(routing_entry);
01193     }
01194 
01195     // Add reinforced gradient to last_hop
01196     updateGradient(routing_entry, msg->last_hop_, true);
01197 
01198     // Add the reinforcement attribute back to the message
01199     msg->msg_attr_vec_->push_back(reinforcement_attr);
01200 
01201     // If we have no record of this message it is either because we
01202     // originated the message (in which case, no further action is
01203     // required) or because we dropped it a long time ago because of
01204     // our hashing configuration parameters (in this case, we can't do
01205     // anything)
01206     if (hash_entry){
01207       msg->next_hop_ = hash_entry->last_hop_;
01208 
01209       DiffPrint(DEBUG_NO_DETAILS,
01210                 "Forwarding Positive Reinforcement to node %d !\n",
01211                 hash_entry->last_hop_);
01212 
01213       ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
01214     }
01215 
01216     break;
01217 
01218   case NEGATIVE_REINFORCEMENT:
01219 
01220     DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n");
01221 
01222     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01223 
01224     if (routing_entry){
01225       gradient_entry = findReinforcedGradient(msg->last_hop_, routing_entry);
01226 
01227       if (gradient_entry){
01228         // Remove reinforced gradient to last_hop
01229         deleteGradient(routing_entry, gradient_entry);
01230 
01231         gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
01232                                                  routing_entry->gradients_.begin(),
01233                                                  &gradient_itr);
01234 
01235         // If there are no other reinforced outgoing gradients
01236         // we need to send our own negative reinforcement
01237         if (!gradient_entry){
01238           my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
01239                                0, 0, routing_entry->attrs_->size(), pkt_count_,
01240                                random_id_, BROADCAST_ADDR, LOCALHOST_ADDR);
01241           my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
01242 
01243           DiffPrint(DEBUG_NO_DETAILS,
01244                     "Forwarding Negative Reinforcement to ALL !\n");
01245 
01246           ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_);
01247 
01248           pkt_count_++;
01249           delete my_msg;
01250         }
01251       }
01252     }
01253 
01254     break;
01255 
01256   default:
01257 
01258     break;
01259   }
01260 }

Here is the call graph for this function:

void GradientFilter::processOldMessage Message msg  )  [protected]
 

Definition at line 884 of file gradient.cc.

References DATA, DEBUG_ALWAYS, DEBUG_IMPORTANT, DEBUG_NO_DETAILS, DiffPrint(), EXPLORATORY_DATA, findRoutingEntry(), INTEREST, Message::last_hop_, LOCALHOST_ADDR, matchRoutingEntry(), Message::msg_attr_vec_, Message::msg_type_, NEGATIVE_REINFORCEMENT, OLD_MESSAGE, Message::pkt_num_, POSITIVE_REINFORCEMENT, PUSH_EXPLORATORY_DATA, Message::rdm_id_, routing_list_, setReinforcementFlags(), and updateGradient().

Referenced by recv().

00885 {
00886   RoutingEntry *routing_entry;
00887   RoutingTable::iterator routing_itr;
00888 
00889   switch (msg->msg_type_){
00890 
00891   case INTEREST:
00892 
00893     DiffPrint(DEBUG_NO_DETAILS, "Received Old Interest !\n");
00894 
00895     if (msg->last_hop_ == LOCALHOST_ADDR){
00896       // Old interest should not come from local agent
00897       DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local agent !\n");
00898       break;
00899     }
00900 
00901     // Get the routing entry for these attrs      
00902     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
00903     if (routing_entry)
00904       updateGradient(routing_entry, msg->last_hop_, false);
00905 
00906     break;
00907 
00908   case DATA: 
00909 
00910     DiffPrint(DEBUG_NO_DETAILS, "Received an old Data message !\n");
00911 
00912     // Find the correct routing entry
00913     routing_itr = routing_list_.begin();
00914     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00915                                       &routing_itr);
00916 
00917     while (routing_entry){
00918       DiffPrint(DEBUG_NO_DETAILS,
00919                 "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_);
00920 
00921       // Set reinforcement flags
00922       if (msg->last_hop_ != LOCALHOST_ADDR){
00923         setReinforcementFlags(routing_entry, msg->last_hop_, OLD_MESSAGE);
00924       }
00925 
00926       // Continue going through other data types
00927       routing_itr++;
00928       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00929                                         &routing_itr);
00930     }
00931 
00932     break;
00933 
00934   case PUSH_EXPLORATORY_DATA:
00935 
00936     // Just drop it
00937     DiffPrint(DEBUG_NO_DETAILS,
00938               "Received an old Push Exploratory Data. Loop detected !\n");
00939     
00940     break;
00941 
00942   case EXPLORATORY_DATA:
00943     
00944     // Just drop it
00945     DiffPrint(DEBUG_NO_DETAILS,
00946               "Received an old Exploratory Data. Loop detected !\n");
00947 
00948     break;
00949 
00950   case POSITIVE_REINFORCEMENT:
00951 
00952     DiffPrint(DEBUG_IMPORTANT, "Received an old Positive Reinforcement !\n");
00953 
00954     break;
00955 
00956   case NEGATIVE_REINFORCEMENT:
00957 
00958     DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n");
00959 
00960     DiffPrint(DEBUG_IMPORTANT, "pkt_num = %d, rdm_id = %d !\n",
00961               msg->pkt_num_, msg->rdm_id_);
00962 
00963     break;
00964 
00965   default: 
00966 
00967     break;
00968   }
00969 }

Here is the call graph for this function:

void GradientFilter::putHash HashEntry new_hash_entry,
unsigned int  pkt_num,
unsigned int  rdm_id
[protected]
 

Definition at line 1278 of file gradient.cc.

References DEBUG_IMPORTANT, DiffPrint(), hash_list_, HASH_TABLE_DATA_MAX_SIZE, HASH_TABLE_DATA_REMOVE_AT_ONCE, and htable_.

Referenced by forwardExploratoryData(), and forwardPushExploratoryData().

01281 {
01282    Tcl_HashEntry *tcl_hash_entry;
01283    HashEntry *hash_entry;
01284    HashList::iterator hash_itr;
01285    unsigned int key[2];
01286    int new_hash_key;
01287  
01288    if (hash_list_.size() == HASH_TABLE_DATA_MAX_SIZE){
01289       // Hash table reached maximum size
01290       
01291       for (int i = 0; ((i < HASH_TABLE_DATA_REMOVE_AT_ONCE)
01292                        && (hash_list_.size() > 0)); i++){
01293          hash_itr = hash_list_.begin();
01294          tcl_hash_entry = *hash_itr;
01295          hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
01296          delete hash_entry;
01297          hash_list_.erase(hash_itr);
01298          Tcl_DeleteHashEntry(tcl_hash_entry);
01299       }
01300    }
01301   
01302    key[0] = pkt_num;
01303    key[1] = rdm_id;
01304    
01305    tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *) key, &new_hash_key);
01306 
01307    if (new_hash_key == 0){
01308       DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n");
01309       return;
01310    }
01311 
01312    Tcl_SetHashValue(tcl_hash_entry, new_hash_entry);
01313    hash_list_.push_back(tcl_hash_entry);
01314 }

Here is the call graph for this function:

void GradientFilter::recv Message msg,
handle  h
 

Definition at line 869 of file gradient.cc.

References DEBUG_ALWAYS, DiffPrint(), filter_handle_, Message::new_message_, processNewMessage(), and processOldMessage().

Referenced by GradientFilterReceive::recv().

00870 {
00871   if (h != filter_handle_){
00872     DiffPrint(DEBUG_ALWAYS,
00873               "Error: received msg for handle %d, subscribed to handle %d !\n",
00874               h, filter_handle_);
00875     return;
00876   }
00877 
00878   if (msg->new_message_ == 1)
00879     processNewMessage(msg);
00880   else
00881     processOldMessage(msg);
00882 }

Here is the call graph for this function:

void GradientFilter::reinforcementTimeout  ) 
 

Definition at line 193 of file gradient.cc.

References RoutingEntry::attrs_, CopyAttrs(), DataNeighborEntry::data_flag_, RoutingEntry::data_neighbors_, DEBUG_MORE_DETAILS, DEBUG_NO_DETAILS, DiffPrint(), DIFFUSION_VERSION, DiffApp::dr_, filter_handle_, LOCALHOST_ADDR, Message::msg_attr_vec_, NEGATIVE_REINFORCEMENT, DataNeighborEntry::neighbor_id_, NEW_MESSAGE, OLD_MESSAGE, pkt_count_, random_id_, and routing_list_.

Referenced by ReinforcementCheckTimer::expire().

00194 {
00195   DataNeighborList::iterator data_neighbor_itr;
00196   DataNeighborEntry *data_neighbor_entry;
00197   RoutingTable::iterator routing_itr;
00198   RoutingEntry *routing_entry;
00199   Message *my_message;
00200 
00201   DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n");
00202 
00203   routing_itr = routing_list_.begin();
00204 
00205   while (routing_itr != routing_list_.end()){
00206     routing_entry = *routing_itr;
00207 
00208     // Step 1: Delete expired gradients
00209     data_neighbor_itr = routing_entry->data_neighbors_.begin();
00210 
00211     while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00212       data_neighbor_entry = *data_neighbor_itr;
00213 
00214       if (data_neighbor_entry->data_flag_ == OLD_MESSAGE){
00215         my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00216                                  0, 0, routing_entry->attrs_->size(), pkt_count_,
00217                                  random_id_, data_neighbor_entry->neighbor_id_,
00218                                  LOCALHOST_ADDR);
00219         my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00220 
00221         DiffPrint(DEBUG_NO_DETAILS,
00222                   "Sending Negative Reinforcement to node %d !\n",
00223                   data_neighbor_entry->neighbor_id_);
00224 
00225         ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);
00226 
00227         pkt_count_++;
00228         delete my_message;
00229 
00230         // Done. Delete entry
00231         data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00232         delete data_neighbor_entry;
00233       }
00234       else{
00235         data_neighbor_itr++;
00236       }
00237     }
00238 
00239     // Step 2: Delete data neighbors with no activity, zero flags
00240     data_neighbor_itr = routing_entry->data_neighbors_.begin();
00241     while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00242       data_neighbor_entry = *data_neighbor_itr;
00243       if (data_neighbor_entry->data_flag_ == NEW_MESSAGE){
00244         data_neighbor_entry->data_flag_ = 0;
00245         data_neighbor_itr++;
00246       }
00247       else{
00248         // Delete entry
00249         data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00250         delete data_neighbor_entry;
00251       }
00252     }
00253 
00254     // Advance to the next routing entry
00255     routing_itr++;
00256   }
00257 }

Here is the call graph for this function:

void GradientFilter::run  )  [virtual]
 

Implements DiffApp.

Definition at line 1335 of file gradient.cc.

Referenced by main().

01336 {
01337   // Doesn't do anything
01338   while (1){
01339     sleep(1000);
01340   }
01341 }

void GradientFilter::sendDisinterest NRAttrVec attrs,
RoutingEntry routing_entry
[protected]
 

Definition at line 843 of file gradient.cc.

References ClearAttrs(), CopyAttrs(), DEBUG_ALWAYS, DiffPrint(), NRAttribute::DISINTEREST_CLASS, NRClassAttr, sendInterest(), and NRSimpleAttribute< T >::setVal().

Referenced by subscriptionTimeout().

00845 {
00846   NRAttrVec *newAttrs;
00847   NRSimpleAttribute<int> *nrclass = NULL;
00848 
00849   newAttrs = CopyAttrs(attrs);
00850 
00851   nrclass = NRClassAttr.find(newAttrs);
00852   if (!nrclass){
00853     DiffPrint(DEBUG_ALWAYS,
00854               "Error: sendDisinterest couldn't find the class attribute !\n");
00855     ClearAttrs(newAttrs);
00856     delete newAttrs;
00857     return;
00858   }
00859 
00860   // Change the class_key value
00861   nrclass->setVal(NRAttribute::DISINTEREST_CLASS);
00862 
00863   sendInterest(newAttrs, routing_entry);
00864    
00865   ClearAttrs(newAttrs);
00866   delete newAttrs;
00867 }

Here is the call graph for this function:

void GradientFilter::sendInterest NRAttrVec attrs,
RoutingEntry routing_entry
[protected]
 

Definition at line 821 of file gradient.cc.

References RoutingEntry::agents_, CopyAttrs(), DIFFUSION_VERSION, DiffApp::dr_, filter_handle_, INTEREST, LOCALHOST_ADDR, Message::msg_attr_vec_, Message::next_port_, and AgentEntry::port_.

Referenced by processNewMessage(), and sendDisinterest().

00822 {
00823   AgentList::iterator agent_itr;
00824   AgentEntry *agent_entry;
00825 
00826   Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,
00827                              attrs->size(), 0, 0, LOCALHOST_ADDR,
00828                              LOCALHOST_ADDR);
00829 
00830   msg->msg_attr_vec_ = CopyAttrs(attrs);
00831 
00832   for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00833     agent_entry = *agent_itr;
00834 
00835     msg->next_port_ = agent_entry->port_;
00836 
00837     ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00838   }
00839 
00840   delete msg;
00841 }

Here is the call graph for this function:

void GradientFilter::sendPositiveReinforcement NRAttrVec reinf_attrs,
int32_t  data_rdm_id,
int32_t  data_pkt_num,
int32_t  destination
[protected]
 

Definition at line 695 of file gradient.cc.

References ClearAttrs(), CopyAttrs(), DEBUG_NO_DETAILS, DiffPrint(), DIFFUSION_VERSION, DiffApp::dr_, GetRand(), NRAttribute::IS, LOCALHOST_ADDR, Message::msg_attr_vec_, pkt_count_, POS_REINFORCEMENT_JITTER, POS_REINFORCEMENT_SEND_DELAY, POSITIVE_REINFORCEMENT, RAND_MAX, random_id_, and ReinforcementAttr.

Referenced by forwardExploratoryData(), and forwardPushExploratoryData().

00699 {
00700   ReinforcementBlob *reinforcement_blob;
00701   NRAttribute *reinforcement_attr;
00702   TimerCallback *reinforcement_timer;
00703   Message *pos_reinf_message;
00704   NRAttrVec *attrs;
00705 
00706   reinforcement_blob = new ReinforcementBlob(data_rdm_id, data_pkt_num);
00707 
00708   reinforcement_attr = ReinforcementAttr.make(NRAttribute::IS,
00709                                               (void *) reinforcement_blob,
00710                                               sizeof(ReinforcementBlob));
00711 
00712   attrs = CopyAttrs(reinf_attrs);
00713   attrs->push_back(reinforcement_attr);
00714 
00715   pos_reinf_message = new Message(DIFFUSION_VERSION, POSITIVE_REINFORCEMENT,
00716                                   0, 0, attrs->size(), pkt_count_,
00717                                   random_id_, destination, LOCALHOST_ADDR);
00718   pos_reinf_message->msg_attr_vec_ = CopyAttrs(attrs);
00719 
00720   DiffPrint(DEBUG_NO_DETAILS, "Sending Positive Reinforcement to node %d !\n",
00721             destination);
00722 
00723   // Create timer for sending this message
00724   reinforcement_timer = new MessageSendTimer(this, pos_reinf_message);
00725 
00726   // Add timer to the event queue
00727   ((DiffusionRouting *)dr_)->addTimer(POS_REINFORCEMENT_SEND_DELAY +
00728                                       (int) ((POS_REINFORCEMENT_JITTER * (GetRand() * 1.0 / RAND_MAX) - (POS_REINFORCEMENT_JITTER / 2))),
00729                                       reinforcement_timer);
00730   pkt_count_++;
00731   ClearAttrs(attrs);
00732   delete reinforcement_blob;
00733 }

Here is the call graph for this function:

void GradientFilter::setReinforcementFlags RoutingEntry routing_entry,
int32_t  last_hop,
int  new_message
[protected]
 

Definition at line 797 of file gradient.cc.

References DataNeighborEntry::data_flag_, RoutingEntry::data_neighbors_, and DataNeighborEntry::neighbor_id_.

Referenced by forwardData(), forwardExploratoryData(), and processOldMessage().

00799 {
00800   DataNeighborList::iterator data_neighbor_itr;
00801   DataNeighborEntry *data_neighbor_entry;
00802 
00803   for (data_neighbor_itr = routing_entry->data_neighbors_.begin();
00804        data_neighbor_itr != routing_entry->data_neighbors_.end();
00805        ++data_neighbor_itr){
00806     data_neighbor_entry = *data_neighbor_itr;
00807     if (data_neighbor_entry->neighbor_id_ == last_hop){
00808       if (data_neighbor_entry->data_flag_ > 0)
00809         return;
00810       data_neighbor_entry->data_flag_ = new_message;
00811       return;
00812     }
00813   }
00814 
00815   // We need to add a new data neighbor
00816   data_neighbor_entry = new DataNeighborEntry(last_hop, new_message);
00817 
00818   routing_entry->data_neighbors_.push_back(data_neighbor_entry);
00819 }

handle GradientFilter::setupFilter  )  [protected]
 

Definition at line 1316 of file gradient.cc.

References ClearAttrs(), DiffApp::dr_, filter_callback_, GRADIENT_FILTER_PRIORITY, handle, NRAttribute::INTEREST_CLASS, NRAttribute::IS, and NRClassAttr.

Referenced by GradientFilter().

01317 {
01318   NRAttrVec attrs;
01319   handle h;
01320 
01321   // For the gradient filter, we use a single attribute with an "IS"
01322   // operator. This causes this filter to match every single packet
01323   // getting to diffusion
01324   attrs.push_back(NRClassAttr.make(NRAttribute::IS,
01325                                    NRAttribute::INTEREST_CLASS));
01326 
01327   h = ((DiffusionRouting *)dr_)->addFilter(&attrs,
01328                                            GRADIENT_FILTER_PRIORITY, filter_callback_);
01329 
01330   ClearAttrs(&attrs);
01331   return h;
01332 }

Here is the call graph for this function:

int GradientFilter::subscriptionTimeout NRAttrVec attrs  ) 
 

Definition at line 259 of file gradient.cc.

References RoutingEntry::attr_list_, AttributeEntry::attrs_, DEBUG_DETAILS, DEBUG_MORE_DETAILS, DiffPrint(), findRoutingEntry(), GetTime(), sendDisinterest(), SUBSCRIPTION_TIMEOUT, and AttributeEntry::tv_.

Referenced by SubscriptionExpirationTimer::expire().

00260 {
00261   AttributeList::iterator attribute_itr;
00262   AttributeEntry *attribute_entry;
00263   RoutingEntry *routing_entry;
00264   struct timeval tmv;
00265 
00266   DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n");
00267 
00268   GetTime(&tmv);
00269 
00270   // Find the correct Routing Entry
00271   routing_entry = findRoutingEntry(attrs);
00272 
00273   if (routing_entry){
00274     // Step 1: Check Timeouts
00275 
00276     attribute_itr = routing_entry->attr_list_.begin();
00277 
00278     while (attribute_itr != routing_entry->attr_list_.end()){
00279       attribute_entry = *attribute_itr;
00280       if (tmv.tv_sec > (attribute_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){
00281         sendDisinterest(attribute_entry->attrs_, routing_entry);
00282         attribute_itr = routing_entry->attr_list_.erase(attribute_itr);
00283         delete attribute_entry;
00284       }
00285       else{
00286         attribute_itr++;
00287       }
00288     }
00289   }
00290   else{
00291     DiffPrint(DEBUG_DETAILS, "Warning: SubscriptionTimeout could't find RE - maybe deleted by GradientTimeout ?\n");
00292 
00293     // Cancel Timer
00294     return -1;
00295   }
00296 
00297   // Keep Timer
00298   return 0;
00299 }

Here is the call graph for this function:

void GradientFilter::updateAgent RoutingEntry routing_entry,
u_int16_t  source_port
[protected]
 

Definition at line 384 of file gradient.cc.

References RoutingEntry::agents_, GetTime(), AgentEntry::port_, and AgentEntry::tv_.

Referenced by processNewMessage().

00386 {
00387   AgentList::iterator agent_itr;
00388   AgentEntry *agent_entry;
00389 
00390   for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00391     agent_entry = *agent_itr;
00392     if (agent_entry->port_ == source_port){
00393       // We already have this guy
00394       GetTime(&(agent_entry->tv_));
00395       return;
00396     }
00397   }
00398 
00399   // This is a new agent, so we create a new entry and add it to the
00400   // list of known agents
00401   agent_entry = new AgentEntry(source_port);
00402   routing_entry->agents_.push_back(agent_entry);
00403 }

Here is the call graph for this function:

void GradientFilter::updateGradient RoutingEntry routing_entry,
int32_t  last_hop,
bool  reinforced
[protected]
 

Definition at line 359 of file gradient.cc.

References GetTime(), RoutingEntry::gradients_, GradientEntry::node_addr_, GradientEntry::reinforced_, and GradientEntry::tv_.

Referenced by processNewMessage(), and processOldMessage().

00361 {
00362   GradientList::iterator gradient_itr;
00363   GradientEntry *gradient_entry;
00364 
00365   for (gradient_itr = routing_entry->gradients_.begin();
00366        gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00367     gradient_entry = *gradient_itr;
00368     if (gradient_entry->node_addr_ == last_hop){
00369       GetTime(&(gradient_entry->tv_));
00370       if (reinforced)
00371         gradient_entry->reinforced_ = true;
00372       return;
00373     }
00374   }
00375 
00376   // We need to add a new gradient
00377   gradient_entry = new GradientEntry(last_hop);
00378   if (reinforced)
00379     gradient_entry->reinforced_ = true;
00380 
00381   routing_entry->gradients_.push_back(gradient_entry);
00382 }

Here is the call graph for this function:

void DiffApp::usage char *  s  )  [protected, inherited]
 

Definition at line 49 of file diffapp.cc.

References DEBUG_ALWAYS, and DiffPrint().

Referenced by DiffApp::parseCommandLine().

00049                           {
00050   DiffPrint(DEBUG_ALWAYS, "Usage: %s [-d debug] [-p port] [-f file] [-h]\n\n", s);
00051   DiffPrint(DEBUG_ALWAYS, "\t-d - Sets debug level (0-10)\n");
00052   DiffPrint(DEBUG_ALWAYS, "\t-p - Uses port 'port' to talk to diffusion\n");
00053   DiffPrint(DEBUG_ALWAYS, "\t-f - Specifies a config file\n");
00054   DiffPrint(DEBUG_ALWAYS, "\t-h - Prints this information\n");
00055   DiffPrint(DEBUG_ALWAYS, "\n");
00056   exit(0);
00057 }

Here is the call graph for this function:


Member Data Documentation

char* DiffApp::config_file_ [protected, inherited]
 

Definition at line 57 of file diffapp.hh.

Referenced by DiffApp::parseCommandLine().

u_int16_t DiffApp::diffusion_port_ [protected, inherited]
 

Definition at line 56 of file diffapp.hh.

Referenced by GeoRoutingFilter::GeoRoutingFilter(), GradientFilter(), DiffApp::parseCommandLine(), PingSenderApp::PingSenderApp(), PushSenderApp::PushSenderApp(), and SrcRtFilter::SrcRtFilter().

NR* DiffApp::dr_ [protected, inherited]
 

Definition at line 55 of file diffapp.hh.

Referenced by GeoRoutingFilter::beaconTimeout(), GeoRoutingFilter::broadcastHeuristicValue(), forwardData(), forwardExploratoryData(), forwardPushExploratoryData(), GeoRoutingFilter::GeoRoutingFilter(), GeoRoutingFilter::getNodeLocation(), GradientFilter(), interestTimeout(), messageTimeout(), GeoRoutingFilter::messageTimeout(), PingSenderApp::PingSenderApp(), GeoRoutingFilter::postProcessFilter(), GeoRoutingFilter::preProcessFilter(), SrcRtFilter::ProcessMessage(), processNewMessage(), PushSenderApp::PushSenderApp(), TagFilter::recv(), SrcRtFilter::recv(), LogFilter::recv(), reinforcementTimeout(), PushSenderApp::run(), PingSenderApp::run(), GeoRoutingFilter::run(), sendInterest(), GeoRoutingFilter::sendNeighborRequest(), sendPositiveReinforcement(), TagFilter::setupFilter(), SrcRtFilter::setupFilter(), LogFilter::setupFilter(), setupFilter(), GeoRoutingFilter::setupPostFilter(), GeoRoutingFilter::setupPreFilter(), PushSenderApp::setupPublication(), PingSenderApp::setupPublication(), PushReceiverApp::setupSubscription(), PingSenderApp::setupSubscription(), PingReceiverApp::setupSubscription(), and SrcRtFilter::SrcRtFilter().

GradientFilterReceive* GradientFilter::filter_callback_ [protected]
 

Definition at line 277 of file gradient.hh.

Referenced by GradientFilter(), and setupFilter().

handle GradientFilter::filter_handle_ [protected]
 

Definition at line 268 of file gradient.hh.

Referenced by forwardData(), forwardExploratoryData(), forwardPushExploratoryData(), GradientFilter(), interestTimeout(), messageTimeout(), processNewMessage(), recv(), reinforcementTimeout(), and sendInterest().

HashList GradientFilter::hash_list_ [protected]
 

Definition at line 273 of file gradient.hh.

Referenced by putHash().

Tcl_HashTable GradientFilter::htable_ [protected]
 

Definition at line 274 of file gradient.hh.

Referenced by getHash(), GradientFilter(), and putHash().

int GradientFilter::pkt_count_ [protected]
 

Definition at line 269 of file gradient.hh.

Referenced by forwardData(), GradientFilter(), processNewMessage(), reinforcementTimeout(), and sendPositiveReinforcement().

int GradientFilter::random_id_ [protected]
 

Definition at line 270 of file gradient.hh.

Referenced by forwardData(), GradientFilter(), processNewMessage(), reinforcementTimeout(), and sendPositiveReinforcement().

RoutingTable GradientFilter::routing_list_ [protected]
 

Definition at line 280 of file gradient.hh.

Referenced by deleteRoutingEntry(), findRoutingEntry(), forwardPushExploratoryData(), gradientTimeout(), matchRoutingEntry(), processNewMessage(), processOldMessage(), and reinforcementTimeout().


The documentation for this class was generated from the following files:
Generated on Tue Apr 20 12:49:59 2004 for NS2.26SourcesOriginal by doxygen 1.3.3