Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

gradient.cc

Go to the documentation of this file.
00001 //
00002 // gradient.cc    : Gradient Filter
00003 // author         : Fabio Silva and Chalermek Intanagonwiwat
00004 //
00005 // Copyright (C) 2000-2002 by the University of Southern California
00006 // $Id: gradient.cc,v 1.6 2002/11/26 22:45:38 haldar Exp $
00007 //
00008 // This program is free software; you can redistribute it and/or
00009 // modify it under the terms of the GNU General Public License,
00010 // version 2, as published by the Free Software Foundation.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License along
00018 // with this program; if not, write to the Free Software Foundation, Inc.,
00019 // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00020 //
00021 //
00022 
00023 #include "gradient.hh"
00024 
00025 NRSimpleAttributeFactory<void *> ReinforcementAttr(NRAttribute::REINFORCEMENT_KEY, NRAttribute::BLOB_TYPE);
00026 
00027 #ifdef NS_DIFFUSION
00028 static class GradientFilterClass : public TclClass {
00029 public:
00030   GradientFilterClass() : TclClass("Application/GradientFilter") {}
00031   TclObject* create(int argc, const char*const* argv) {
00032     if (argc == 5)
00033       return(new GradientFilter(argv[4]));
00034     else
00035       fprintf(stderr, "Insufficient number of args for creating GradientFilter");
00036     return (NULL);
00037   }
00038 } class_gradient_filter;
00039 
00040 int GradientFilter::command(int argc, const char*const* argv) {
00041   if (argc == 3) {
00042     if (strcasecmp(argv[1], "dr") == 0) {
00043       DiffAppAgent *agent;
00044       agent = (DiffAppAgent *) TclObject::lookup(argv[2]);
00045       dr_ = agent->dr();
00046       start();
00047       return TCL_OK;
00048     }
00049     if (strcasecmp(argv[1], "debug") == 0) {
00050       global_debug_level = atoi(argv[2]);
00051       if (global_debug_level < 1 || global_debug_level > 10) {
00052         global_debug_level = DEBUG_DEFAULT;
00053         printf("Error: Debug level outside range(1-10) or missing !\n");
00054       }
00055     }
00056   }
00057   return Application::command(argc, argv);
00058 }
00059 
00060 #endif // NS_DIFFUSION
00061 
00062 void GradientFilterReceive::recv(Message *msg, handle h)
00063 {
00064   app_->recv(msg, h);
00065 }
00066 
00067 int MessageSendTimer::expire()
00068 {
00069   // Call timeout function
00070   agent_->messageTimeout(msg_);
00071 
00072   // Do not reschedule this timer
00073   return -1;
00074 }
00075 
00076 int InterestForwardTimer::expire()
00077 {
00078   // Call timeout function
00079   agent_->interestTimeout(msg_);
00080 
00081   // Do not reschedule this timer
00082   return -1;
00083 }
00084 
00085 int SubscriptionExpirationTimer::expire()
00086 {
00087   return(agent_->subscriptionTimeout(attrs_));
00088 }
00089 
00090 int GradientExpirationCheckTimer::expire()
00091 {
00092   // Call the callback function
00093   agent_->gradientTimeout();
00094 
00095   // Reschedule this timer
00096   return 0;
00097 }
00098 
00099 int ReinforcementCheckTimer::expire()
00100 {
00101   // Call the callback function
00102   agent_->reinforcementTimeout();
00103 
00104   // Reschedule this timer
00105   return 0;
00106 }
00107 
00108 void GradientFilter::interestTimeout(Message *msg)
00109 {
00110   DiffPrint(DEBUG_MORE_DETAILS, "Interest Timeout !\n");
00111 
00112   msg->last_hop_ = LOCALHOST_ADDR;
00113   msg->next_hop_ = BROADCAST_ADDR;
00114  
00115   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00116 }
00117 
00118 void GradientFilter::messageTimeout(Message *msg)
00119 {
00120   DiffPrint(DEBUG_MORE_DETAILS, "Message Timeout !\n");
00121 
00122   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00123 }
00124 
00125 void GradientFilter::gradientTimeout()
00126 {
00127   RoutingTable::iterator routing_itr;
00128   GradientList::iterator grad_itr;
00129   AgentList::iterator agent_itr;
00130   RoutingEntry *routing_entry;
00131   GradientEntry *gradient_entry;
00132   AgentEntry *agent_entry;
00133   struct timeval tmv;
00134 
00135   DiffPrint(DEBUG_MORE_DETAILS, "Gradient Timeout !\n");
00136 
00137   GetTime(&tmv);
00138 
00139   routing_itr = routing_list_.begin();
00140 
00141   while (routing_itr != routing_list_.end()){
00142     routing_entry = *routing_itr;
00143 
00144     // Step 1: Delete expired gradients
00145     grad_itr = routing_entry->gradients_.begin();
00146     while (grad_itr != routing_entry->gradients_.end()){
00147       gradient_entry = *grad_itr;
00148       if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00149 
00150         DiffPrint(DEBUG_NO_DETAILS, "Deleting Gradient to node %d !\n",
00151                   gradient_entry->node_addr_);
00152 
00153         grad_itr = routing_entry->gradients_.erase(grad_itr);
00154         delete gradient_entry;
00155       }
00156       else{
00157         grad_itr++;
00158       }
00159     }
00160 
00161     // Step 2: Remove non-active agents
00162     agent_itr = routing_entry->agents_.begin();
00163     while (agent_itr != routing_entry->agents_.end()){
00164       agent_entry = *agent_itr;
00165       if (tmv.tv_sec > (agent_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00166 
00167         DiffPrint(DEBUG_NO_DETAILS,
00168                   "Deleting Gradient to agent %d !\n", agent_entry->port_);
00169 
00170         agent_itr = routing_entry->agents_.erase(agent_itr);
00171         delete agent_entry;
00172       }
00173       else{
00174         agent_itr++;
00175       }
00176     }
00177 
00178     // Remove the Routing Entry if no gradients and no agents
00179     if ((routing_entry->gradients_.size() == 0) &&
00180         (routing_entry->agents_.size() == 0)){
00181       // Deleting Routing Entry
00182       DiffPrint(DEBUG_DETAILS,
00183                 "Nothing left for this data type, cleaning up !\n");
00184       routing_itr = routing_list_.erase(routing_itr);
00185       delete routing_entry;
00186     }
00187     else{
00188       routing_itr++;
00189     }
00190   }
00191 }
00192 
00193 void GradientFilter::reinforcementTimeout()
00194 {
00195   DataNeighborList::iterator data_neighbor_itr;
00196   DataNeighborEntry *data_neighbor_entry;
00197   RoutingTable::iterator routing_itr;
00198   RoutingEntry *routing_entry;
00199   Message *my_message;
00200 
00201   DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n");
00202 
00203   routing_itr = routing_list_.begin();
00204 
00205   while (routing_itr != routing_list_.end()){
00206     routing_entry = *routing_itr;
00207 
00208     // Step 1: Delete expired gradients
00209     data_neighbor_itr = routing_entry->data_neighbors_.begin();
00210 
00211     while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00212       data_neighbor_entry = *data_neighbor_itr;
00213 
00214       if (data_neighbor_entry->data_flag_ == OLD_MESSAGE){
00215         my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00216                                  0, 0, routing_entry->attrs_->size(), pkt_count_,
00217                                  random_id_, data_neighbor_entry->neighbor_id_,
00218                                  LOCALHOST_ADDR);
00219         my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00220 
00221         DiffPrint(DEBUG_NO_DETAILS,
00222                   "Sending Negative Reinforcement to node %d !\n",
00223                   data_neighbor_entry->neighbor_id_);
00224 
00225         ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);
00226 
00227         pkt_count_++;
00228         delete my_message;
00229 
00230         // Done. Delete entry
00231         data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00232         delete data_neighbor_entry;
00233       }
00234       else{
00235         data_neighbor_itr++;
00236       }
00237     }
00238 
00239     // Step 2: Delete data neighbors with no activity, zero flags
00240     data_neighbor_itr = routing_entry->data_neighbors_.begin();
00241     while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00242       data_neighbor_entry = *data_neighbor_itr;
00243       if (data_neighbor_entry->data_flag_ == NEW_MESSAGE){
00244         data_neighbor_entry->data_flag_ = 0;
00245         data_neighbor_itr++;
00246       }
00247       else{
00248         // Delete entry
00249         data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00250         delete data_neighbor_entry;
00251       }
00252     }
00253 
00254     // Advance to the next routing entry
00255     routing_itr++;
00256   }
00257 }
00258 
00259 int GradientFilter::subscriptionTimeout(NRAttrVec *attrs)
00260 {
00261   AttributeList::iterator attribute_itr;
00262   AttributeEntry *attribute_entry;
00263   RoutingEntry *routing_entry;
00264   struct timeval tmv;
00265 
00266   DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n");
00267 
00268   GetTime(&tmv);
00269 
00270   // Find the correct Routing Entry
00271   routing_entry = findRoutingEntry(attrs);
00272 
00273   if (routing_entry){
00274     // Step 1: Check Timeouts
00275 
00276     attribute_itr = routing_entry->attr_list_.begin();
00277 
00278     while (attribute_itr != routing_entry->attr_list_.end()){
00279       attribute_entry = *attribute_itr;
00280       if (tmv.tv_sec > (attribute_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){
00281         sendDisinterest(attribute_entry->attrs_, routing_entry);
00282         attribute_itr = routing_entry->attr_list_.erase(attribute_itr);
00283         delete attribute_entry;
00284       }
00285       else{
00286         attribute_itr++;
00287       }
00288     }
00289   }
00290   else{
00291     DiffPrint(DEBUG_DETAILS, "Warning: SubscriptionTimeout could't find RE - maybe deleted by GradientTimeout ?\n");
00292 
00293     // Cancel Timer
00294     return -1;
00295   }
00296 
00297   // Keep Timer
00298   return 0;
00299 }
00300 
00301 void GradientFilter::deleteRoutingEntry(RoutingEntry *routing_entry)
00302 {
00303   RoutingTable::iterator routing_itr;
00304   RoutingEntry *current_entry;
00305 
00306   for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00307     current_entry = *routing_itr;
00308     if (current_entry == routing_entry){
00309       routing_itr = routing_list_.erase(routing_itr);
00310       delete routing_entry;
00311       return;
00312     }
00313   }
00314   DiffPrint(DEBUG_ALWAYS, "Error: deleteRoutingEntry could not find entry to delete !\n");
00315 }
00316 
00317 RoutingEntry * GradientFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place)
00318 {
00319   RoutingTable::iterator routing_itr;
00320   RoutingEntry *routing_entry;
00321 
00322   for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){
00323     routing_entry = *routing_itr;
00324     if (MatchAttrs(routing_entry->attrs_, attrs)){
00325       *place = routing_itr;
00326       return routing_entry;
00327     }
00328   }
00329   return NULL;
00330 }
00331 
00332 RoutingEntry * GradientFilter::findRoutingEntry(NRAttrVec *attrs)
00333 {
00334   RoutingTable::iterator routing_itr;
00335   RoutingEntry *routing_entry;
00336 
00337   for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00338     routing_entry = *routing_itr;
00339     if (PerfectMatch(routing_entry->attrs_, attrs))
00340       return routing_entry;
00341   }
00342   return NULL;
00343 }
00344 
00345 AttributeEntry * GradientFilter::findMatchingSubscription(RoutingEntry *routing_entry,
00346                                                           NRAttrVec *attrs)
00347 {
00348   AttributeList::iterator attribute_itr;
00349   AttributeEntry *attribute_entry;
00350 
00351   for (attribute_itr = routing_entry->attr_list_.begin(); attribute_itr != routing_entry->attr_list_.end(); ++attribute_itr){
00352     attribute_entry = *attribute_itr;
00353     if (PerfectMatch(attribute_entry->attrs_, attrs))
00354       return attribute_entry;
00355   }
00356   return NULL;
00357 }
00358 
00359 void GradientFilter::updateGradient(RoutingEntry *routing_entry,
00360                                     int32_t last_hop, bool reinforced)
00361 {
00362   GradientList::iterator gradient_itr;
00363   GradientEntry *gradient_entry;
00364 
00365   for (gradient_itr = routing_entry->gradients_.begin();
00366        gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00367     gradient_entry = *gradient_itr;
00368     if (gradient_entry->node_addr_ == last_hop){
00369       GetTime(&(gradient_entry->tv_));
00370       if (reinforced)
00371         gradient_entry->reinforced_ = true;
00372       return;
00373     }
00374   }
00375 
00376   // We need to add a new gradient
00377   gradient_entry = new GradientEntry(last_hop);
00378   if (reinforced)
00379     gradient_entry->reinforced_ = true;
00380 
00381   routing_entry->gradients_.push_back(gradient_entry);
00382 }
00383 
00384 void GradientFilter::updateAgent(RoutingEntry *routing_entry,
00385                                  u_int16_t source_port)
00386 {
00387   AgentList::iterator agent_itr;
00388   AgentEntry *agent_entry;
00389 
00390   for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00391     agent_entry = *agent_itr;
00392     if (agent_entry->port_ == source_port){
00393       // We already have this guy
00394       GetTime(&(agent_entry->tv_));
00395       return;
00396     }
00397   }
00398 
00399   // This is a new agent, so we create a new entry and add it to the
00400   // list of known agents
00401   agent_entry = new AgentEntry(source_port);
00402   routing_entry->agents_.push_back(agent_entry);
00403 }
00404 
00405 void GradientFilter::forwardPushExploratoryData(Message *msg,
00406                                                 DataForwardingHistory *forwarding_history)
00407 {
00408   RoutingTable::iterator routing_itr;
00409   RoutingEntry *routing_entry;
00410   AgentList::iterator agent_itr;
00411   AgentEntry *agent_entry;
00412   Message *data_msg, *sink_message;
00413   TimerCallback *data_timer;
00414   unsigned int key[2];
00415   HashEntry *hash_entry;
00416 
00417   // Sink processing
00418   routing_itr = routing_list_.begin();
00419   routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00420                                     &routing_itr);
00421 
00422   sink_message = CopyMessage(msg);
00423 
00424   while (routing_entry){
00425 
00426     // Forward message to all local sinks
00427     for (agent_itr = routing_entry->agents_.begin();
00428          agent_itr != routing_entry->agents_.end(); ++agent_itr){
00429       agent_entry = *agent_itr;
00430 
00431       if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00432         // Send DATA message to local sinks
00433         sink_message->next_hop_ = LOCALHOST_ADDR;
00434         sink_message->next_port_ = agent_entry->port_;
00435 
00436         ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00437 
00438         // Add agent to the forwarding history
00439         forwarding_history->forwardingToLibrary(agent_entry->port_);
00440       }
00441     }
00442 
00443     if ((!forwarding_history->alreadyReinforced()) &&
00444         (routing_entry->agents_.size() > 0) &&
00445         (msg->last_hop_ != LOCALHOST_ADDR)){
00446       // Send a positive reinforcement if we have sinks
00447       sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00448                                 msg->pkt_num_, msg->last_hop_);
00449       // Record reinforcement in the forwarding history so we do it
00450       // only once per received data message
00451       forwarding_history->sendingReinforcement();
00452     }
00453 
00454     // Look for other matching data types
00455     routing_itr++;
00456     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00457                                       &routing_itr);
00458   }
00459 
00460   // Delete sink_message after sink processing
00461   delete sink_message;
00462 
00463   // Intermediate node processing
00464 
00465   // Add message information to the hash table
00466   if (msg->last_hop_ != LOCALHOST_ADDR){
00467     key[0] = msg->pkt_num_;
00468     key[1] = msg->rdm_id_;
00469 
00470     hash_entry = new HashEntry(msg->last_hop_);
00471 
00472     putHash(hash_entry, key[0], key[1]);
00473   }
00474 
00475   // Rebroadcast the exploratory push data message
00476   if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00477     data_msg = CopyMessage(msg);
00478     data_msg->next_hop_ = BROADCAST_ADDR;
00479 
00480     data_timer = new MessageSendTimer(this, data_msg);
00481 
00482     // Add data timer to the queue
00483     ((DiffusionRouting *)dr_)->addTimer(PUSH_DATA_FORWARD_DELAY +
00484                                         (int) ((PUSH_DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (PUSH_DATA_FORWARD_JITTER / 2))),
00485                                         data_timer);
00486 
00487     // Add broadcast information to forwarding history
00488     forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00489   }
00490 }
00491 
00492 void GradientFilter::forwardExploratoryData(Message *msg,
00493                                             RoutingEntry *routing_entry,
00494                                             DataForwardingHistory *forwarding_history)
00495 {
00496 #ifdef USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00497   Message *data_msg;
00498   TimerCallback *data_timer;
00499 #else
00500   GradientList::iterator gradient_itr;
00501   GradientEntry *gradient_entry;
00502 #endif // USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00503   AgentList::iterator agent_itr;
00504   AgentEntry *agent_entry;
00505   Message *sink_message;
00506   unsigned int key[2];
00507   HashEntry *hash_entry;
00508 
00509   sink_message = CopyMessage(msg);
00510 
00511   // Step 1: Sink Processing
00512   for (agent_itr = routing_entry->agents_.begin();
00513        agent_itr != routing_entry->agents_.end(); ++agent_itr){
00514     agent_entry = *agent_itr;
00515 
00516     if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00517       // Forward the data message to local sinks
00518       sink_message->next_hop_ = LOCALHOST_ADDR;
00519       sink_message->next_port_ = agent_entry->port_;
00520 
00521       // Add agent to the forwarding list
00522       forwarding_history->forwardingToLibrary(agent_entry->port_);
00523 
00524       ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00525     }
00526   }
00527 
00528   delete sink_message;
00529 
00530   // Step 1A: Reinforcement Processing
00531   if ((!forwarding_history->alreadyReinforced()) &&
00532       (routing_entry->agents_.size() > 0) &&
00533       (msg->last_hop_ != LOCALHOST_ADDR)){
00534     // Send reinforcement to 'last_hop'
00535     sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00536                               msg->pkt_num_, msg->last_hop_);
00537     // Record reinforcement in the forwarding history so we do it only
00538     // once per received data message
00539     forwarding_history->sendingReinforcement();
00540   }
00541 
00542   // Step 2: Intermediate Processing
00543 
00544   // Set reinforcement flags
00545   if (msg->last_hop_ != LOCALHOST_ADDR){
00546     setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00547   }
00548 
00549   // Add message information to the hash table
00550   if (msg->last_hop_ != LOCALHOST_ADDR){
00551     key[0] = msg->pkt_num_;
00552     key[1] = msg->rdm_id_;
00553 
00554     hash_entry = new HashEntry(msg->last_hop_);
00555 
00556     putHash(hash_entry, key[0], key[1]);
00557   }
00558 
00559   // Forward the EXPLORATORY message
00560 #ifdef USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00561   if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00562     if (routing_entry->gradients_.size() > 0){
00563       // Broadcast DATA message
00564       data_msg = CopyMessage(msg);
00565       data_msg->next_hop_ = BROADCAST_ADDR;
00566 
00567       // Add to the forwarding history
00568       forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00569 
00570       data_timer = new MessageSendTimer(this, data_msg);
00571 
00572       // Add timer for forwarding the data packet
00573       ((DiffusionRouting *)dr_)->addTimer(DATA_FORWARD_DELAY +
00574                                           (int) ((DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (DATA_FORWARD_JITTER / 2))),
00575                                           data_timer);
00576     }
00577   }
00578 #else
00579   // Forward DATA to all output gradients
00580   for (gradient_itr = routing_entry->gradients_.begin();
00581        gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00582 
00583     gradient_entry = *gradient_itr;
00584 
00585     // Check forwarding history
00586     if (!forwarding_history->alreadyForwardedToNetwork(gradient_entry->node_addr_)){
00587       msg->next_hop_ = gradient_entry->node_addr_;
00588       ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00589 
00590       // Add to the forwarding history
00591       forwarding_history->forwardingToNetwork(gradient_entry->node_addr_);
00592     }
00593   }
00594 #endif // USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00595 }
00596 
00597 void GradientFilter::forwardData(Message *msg, RoutingEntry *routing_entry,
00598                                  DataForwardingHistory *forwarding_history)
00599 {
00600   GradientList::iterator gradient_itr;
00601   AgentList::iterator agent_itr;
00602   GradientEntry *gradient_entry;
00603   AgentEntry *agent_entry;
00604   Message *sink_message, *negative_reinforcement_msg;
00605   bool has_sink = false;
00606 
00607   sink_message = CopyMessage(msg);
00608 
00609   // Step 1: Sink Processing
00610   for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00611     agent_entry = *agent_itr;
00612 
00613     has_sink = true;
00614 
00615     if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00616       // Forward DATA to local sinks
00617       sink_message->next_hop_ = LOCALHOST_ADDR;
00618       sink_message->next_port_ = agent_entry->port_;
00619 
00620       // Add agent to the forwarding list
00621       forwarding_history->forwardingToLibrary(agent_entry->port_);
00622 
00623       ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00624     }
00625   }
00626 
00627   delete sink_message;
00628 
00629   // Step 2: Intermediate Processing
00630 
00631   // Set reinforcement flags
00632   if (msg->last_hop_ != LOCALHOST_ADDR){
00633     setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00634   }
00635 
00636   // Forward DATA only to reinforced gradients
00637   gradient_itr = routing_entry->gradients_.begin();
00638   gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00639                                            gradient_itr, &gradient_itr);
00640 
00641   if (gradient_entry){
00642     while (gradient_entry){
00643 
00644       // Found reinforced gradient, forward data message to this
00645       // neighbor only if the messages comes from a different neighbor
00646       if (gradient_entry->node_addr_ != msg->last_hop_){
00647         msg->next_hop_ = gradient_entry->node_addr_;
00648 
00649         // Check if we have forwarded the message to this neighbor already
00650         if (!forwarding_history->alreadyForwardedToNetwork(msg->next_hop_)){
00651           DiffPrint(DEBUG_NO_DETAILS,
00652                     "Forwarding data using Reinforced Gradient to node %d !\n",
00653                     gradient_entry->node_addr_);
00654 
00655           ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00656 
00657           // Add the node to the forwarding history
00658           forwarding_history->forwardingToNetwork(msg->next_hop_);
00659         }
00660       }
00661 
00662       // Move to the next one
00663       gradient_itr++;
00664       gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00665                                                gradient_itr, &gradient_itr);
00666     }
00667   }
00668   else{
00669     // We could not find a reinforced path, so we send a negative
00670     // reinforcement to last_hop
00671     if ((!has_sink) && (msg->last_hop_ != LOCALHOST_ADDR)){
00672       negative_reinforcement_msg = new Message(DIFFUSION_VERSION,
00673                                                NEGATIVE_REINFORCEMENT,
00674                                                0, 0,
00675                                                routing_entry->attrs_->size(),
00676                                                pkt_count_,
00677                                                random_id_,
00678                                                msg->last_hop_,
00679                                                LOCALHOST_ADDR);
00680       negative_reinforcement_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00681 
00682       DiffPrint(DEBUG_NO_DETAILS,
00683                 "Sending Negative Reinforcement to node %d !\n",
00684                 msg->last_hop_);
00685 
00686       ((DiffusionRouting *)dr_)->sendMessage(negative_reinforcement_msg,
00687                                              filter_handle_);
00688 
00689       pkt_count_++;
00690       delete negative_reinforcement_msg;
00691     }
00692   }
00693 }
00694 
00695 void GradientFilter::sendPositiveReinforcement(NRAttrVec *reinf_attrs,
00696                                                int32_t data_rdm_id,
00697                                                int32_t data_pkt_num,
00698                                                int32_t destination)
00699 {
00700   ReinforcementBlob *reinforcement_blob;
00701   NRAttribute *reinforcement_attr;
00702   TimerCallback *reinforcement_timer;
00703   Message *pos_reinf_message;
00704   NRAttrVec *attrs;
00705 
00706   reinforcement_blob = new ReinforcementBlob(data_rdm_id, data_pkt_num);
00707 
00708   reinforcement_attr = ReinforcementAttr.make(NRAttribute::IS,
00709                                               (void *) reinforcement_blob,
00710                                               sizeof(ReinforcementBlob));
00711 
00712   attrs = CopyAttrs(reinf_attrs);
00713   attrs->push_back(reinforcement_attr);
00714 
00715   pos_reinf_message = new Message(DIFFUSION_VERSION, POSITIVE_REINFORCEMENT,
00716                                   0, 0, attrs->size(), pkt_count_,
00717                                   random_id_, destination, LOCALHOST_ADDR);
00718   pos_reinf_message->msg_attr_vec_ = CopyAttrs(attrs);
00719 
00720   DiffPrint(DEBUG_NO_DETAILS, "Sending Positive Reinforcement to node %d !\n",
00721             destination);
00722 
00723   // Create timer for sending this message
00724   reinforcement_timer = new MessageSendTimer(this, pos_reinf_message);
00725 
00726   // Add timer to the event queue
00727   ((DiffusionRouting *)dr_)->addTimer(POS_REINFORCEMENT_SEND_DELAY +
00728                                       (int) ((POS_REINFORCEMENT_JITTER * (GetRand() * 1.0 / RAND_MAX) - (POS_REINFORCEMENT_JITTER / 2))),
00729                                       reinforcement_timer);
00730   pkt_count_++;
00731   ClearAttrs(attrs);
00732   delete reinforcement_blob;
00733 }
00734 
00735 GradientEntry * GradientFilter::findReinforcedGradients(GradientList *gradients,
00736                                                         GradientList::iterator start,
00737                                                         GradientList::iterator *place)
00738 {
00739   GradientList::iterator gradient_itr;
00740   GradientEntry *gradient_entry;
00741 
00742   for (gradient_itr = start; gradient_itr != gradients->end(); ++gradient_itr){
00743     gradient_entry = *gradient_itr;
00744     if (gradient_entry->reinforced_){
00745       *place = gradient_itr;
00746       return gradient_entry;
00747     }
00748   }
00749 
00750   return NULL;
00751 }
00752 
00753 GradientEntry * GradientFilter::findReinforcedGradient(int32_t node_addr,
00754                                                        RoutingEntry *routing_entry)
00755 {
00756   GradientList::iterator gradient_itr;
00757   GradientEntry *gradient_entry;
00758 
00759   gradient_itr = routing_entry->gradients_.begin();
00760   gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00761                                            gradient_itr, &gradient_itr);
00762 
00763   if (gradient_entry){
00764     while(gradient_entry){
00765       if (gradient_entry->node_addr_ == node_addr)
00766         return gradient_entry;
00767 
00768       // This is not the gradient we are looking for, keep looking
00769       gradient_itr++;
00770       gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00771                                                gradient_itr, &gradient_itr);
00772     }
00773   }
00774 
00775   return NULL;
00776 }
00777 
00778 void GradientFilter::deleteGradient(RoutingEntry *routing_entry,
00779                                     GradientEntry *gradient_entry)
00780 {
00781   GradientList::iterator gradient_itr;
00782   GradientEntry *current_entry;
00783 
00784   for (gradient_itr = routing_entry->gradients_.begin();
00785        gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00786     current_entry = *gradient_itr;
00787     if (current_entry == gradient_entry){
00788       gradient_itr = routing_entry->gradients_.erase(gradient_itr);
00789       delete gradient_entry;
00790       return;
00791     }
00792   }
00793   DiffPrint(DEBUG_ALWAYS,
00794             "Error: deleteGradient could not find gradient to delete !\n");
00795 }
00796 
00797 void GradientFilter::setReinforcementFlags(RoutingEntry *routing_entry,
00798                                            int32_t last_hop, int new_message)
00799 {
00800   DataNeighborList::iterator data_neighbor_itr;
00801   DataNeighborEntry *data_neighbor_entry;
00802 
00803   for (data_neighbor_itr = routing_entry->data_neighbors_.begin();
00804        data_neighbor_itr != routing_entry->data_neighbors_.end();
00805        ++data_neighbor_itr){
00806     data_neighbor_entry = *data_neighbor_itr;
00807     if (data_neighbor_entry->neighbor_id_ == last_hop){
00808       if (data_neighbor_entry->data_flag_ > 0)
00809         return;
00810       data_neighbor_entry->data_flag_ = new_message;
00811       return;
00812     }
00813   }
00814 
00815   // We need to add a new data neighbor
00816   data_neighbor_entry = new DataNeighborEntry(last_hop, new_message);
00817 
00818   routing_entry->data_neighbors_.push_back(data_neighbor_entry);
00819 }
00820 
00821 void GradientFilter::sendInterest(NRAttrVec *attrs, RoutingEntry *routing_entry)
00822 {
00823   AgentList::iterator agent_itr;
00824   AgentEntry *agent_entry;
00825 
00826   Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,
00827                              attrs->size(), 0, 0, LOCALHOST_ADDR,
00828                              LOCALHOST_ADDR);
00829 
00830   msg->msg_attr_vec_ = CopyAttrs(attrs);
00831 
00832   for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00833     agent_entry = *agent_itr;
00834 
00835     msg->next_port_ = agent_entry->port_;
00836 
00837     ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00838   }
00839 
00840   delete msg;
00841 }
00842 
00843 void GradientFilter::sendDisinterest(NRAttrVec *attrs,
00844                                      RoutingEntry *routing_entry)
00845 {
00846   NRAttrVec *newAttrs;
00847   NRSimpleAttribute<int> *nrclass = NULL;
00848 
00849   newAttrs = CopyAttrs(attrs);
00850 
00851   nrclass = NRClassAttr.find(newAttrs);
00852   if (!nrclass){
00853     DiffPrint(DEBUG_ALWAYS,
00854               "Error: sendDisinterest couldn't find the class attribute !\n");
00855     ClearAttrs(newAttrs);
00856     delete newAttrs;
00857     return;
00858   }
00859 
00860   // Change the class_key value
00861   nrclass->setVal(NRAttribute::DISINTEREST_CLASS);
00862 
00863   sendInterest(newAttrs, routing_entry);
00864    
00865   ClearAttrs(newAttrs);
00866   delete newAttrs;
00867 }
00868 
00869 void GradientFilter::recv(Message *msg, handle h)
00870 {
00871   if (h != filter_handle_){
00872     DiffPrint(DEBUG_ALWAYS,
00873               "Error: received msg for handle %d, subscribed to handle %d !\n",
00874               h, filter_handle_);
00875     return;
00876   }
00877 
00878   if (msg->new_message_ == 1)
00879     processNewMessage(msg);
00880   else
00881     processOldMessage(msg);
00882 }
00883 
00884 void GradientFilter::processOldMessage(Message *msg)
00885 {
00886   RoutingEntry *routing_entry;
00887   RoutingTable::iterator routing_itr;
00888 
00889   switch (msg->msg_type_){
00890 
00891   case INTEREST:
00892 
00893     DiffPrint(DEBUG_NO_DETAILS, "Received Old Interest !\n");
00894 
00895     if (msg->last_hop_ == LOCALHOST_ADDR){
00896       // Old interest should not come from local agent
00897       DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local agent !\n");
00898       break;
00899     }
00900 
00901     // Get the routing entry for these attrs      
00902     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
00903     if (routing_entry)
00904       updateGradient(routing_entry, msg->last_hop_, false);
00905 
00906     break;
00907 
00908   case DATA: 
00909 
00910     DiffPrint(DEBUG_NO_DETAILS, "Received an old Data message !\n");
00911 
00912     // Find the correct routing entry
00913     routing_itr = routing_list_.begin();
00914     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00915                                       &routing_itr);
00916 
00917     while (routing_entry){
00918       DiffPrint(DEBUG_NO_DETAILS,
00919                 "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_);
00920 
00921       // Set reinforcement flags
00922       if (msg->last_hop_ != LOCALHOST_ADDR){
00923         setReinforcementFlags(routing_entry, msg->last_hop_, OLD_MESSAGE);
00924       }
00925 
00926       // Continue going through other data types
00927       routing_itr++;
00928       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00929                                         &routing_itr);
00930     }
00931 
00932     break;
00933 
00934   case PUSH_EXPLORATORY_DATA:
00935 
00936     // Just drop it
00937     DiffPrint(DEBUG_NO_DETAILS,
00938               "Received an old Push Exploratory Data. Loop detected !\n");
00939     
00940     break;
00941 
00942   case EXPLORATORY_DATA:
00943     
00944     // Just drop it
00945     DiffPrint(DEBUG_NO_DETAILS,
00946               "Received an old Exploratory Data. Loop detected !\n");
00947 
00948     break;
00949 
00950   case POSITIVE_REINFORCEMENT:
00951 
00952     DiffPrint(DEBUG_IMPORTANT, "Received an old Positive Reinforcement !\n");
00953 
00954     break;
00955 
00956   case NEGATIVE_REINFORCEMENT:
00957 
00958     DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n");
00959 
00960     DiffPrint(DEBUG_IMPORTANT, "pkt_num = %d, rdm_id = %d !\n",
00961               msg->pkt_num_, msg->rdm_id_);
00962 
00963     break;
00964 
00965   default: 
00966 
00967     break;
00968   }
00969 }
00970 
00971 void GradientFilter::processNewMessage(Message *msg)
00972 {
00973   NRSimpleAttribute<void *> *reinforcement_attr = NULL;
00974   DataForwardingHistory *forwarding_history;
00975   NRSimpleAttribute<int> *nrclass = NULL;
00976   NRSimpleAttribute<int> *nrscope = NULL;
00977   ReinforcementBlob *reinforcement_blob;
00978   RoutingTable::iterator routing_itr;
00979   RoutingEntry *routing_entry;
00980   GradientList::iterator gradient_itr;
00981   GradientEntry *gradient_entry;
00982   NRAttrVec::iterator place;
00983   HashEntry *hash_entry;
00984   AttributeEntry *attribute_entry;
00985   Message *my_msg;
00986   TimerCallback *interest_timer, *subscription_timer;
00987   unsigned int key[2];
00988   bool new_data_type = false;
00989 
00990   switch (msg->msg_type_){
00991 
00992   case INTEREST:
00993 
00994     DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n");
00995 
00996     nrclass = NRClassAttr.find(msg->msg_attr_vec_);
00997     nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
00998 
00999     if (!nrclass || !nrscope){
01000       DiffPrint(DEBUG_ALWAYS,
01001                 "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01002       return;
01003     }
01004 
01005     // Step 1: Look for the same data type
01006     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01007 
01008     if (!routing_entry){
01009       // Create a new routing entry for this data type
01010       routing_entry = new RoutingEntry;
01011       routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01012       routing_list_.push_back(routing_entry);
01013       new_data_type = true;
01014     }
01015 
01016     if (msg->last_hop_ == LOCALHOST_ADDR){
01017       // From local agent
01018       updateAgent(routing_entry, msg->source_port_);
01019     }
01020     else{
01021       // From outside, we just add the new gradient
01022       updateGradient(routing_entry, msg->last_hop_, false);
01023     }
01024 
01025     if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01026         (nrclass->getOp() == NRAttribute::IS)){
01027 
01028       // Global interest messages should always be forwarded
01029       if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){
01030 
01031         interest_timer = new InterestForwardTimer(this, CopyMessage(msg));
01032 
01033         ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY +
01034                                             (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))),
01035                                             interest_timer);
01036       }
01037     }
01038     else{
01039       if ((nrclass->getOp() != NRAttribute::IS) &&
01040           (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) &&
01041           (new_data_type)){
01042 
01043         subscription_timer = new SubscriptionExpirationTimer(this,
01044                                                              CopyAttrs(msg->msg_attr_vec_));
01045         
01046         ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY +
01047                                             (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)),
01048                                             subscription_timer);
01049       }
01050 
01051       // Subscriptions don't have to match other subscriptions
01052       break;
01053     }
01054 
01055     // Step 2: Match other routing tables
01056     routing_itr = routing_list_.begin();
01057     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01058                                       &routing_itr);
01059 
01060     while (routing_entry){
01061       // Got a match
01062       attribute_entry = findMatchingSubscription(routing_entry,
01063                                                  msg->msg_attr_vec_);
01064 
01065       // Do we already have this subscription
01066       if (attribute_entry){
01067         GetTime(&(attribute_entry->tv_));
01068       }
01069       else{
01070         // Create a new attribute entry, add it to the attribute list
01071         // and send an interest message to the local agent
01072         attribute_entry = new AttributeEntry(CopyAttrs(msg->msg_attr_vec_));
01073         routing_entry->attr_list_.push_back(attribute_entry);
01074         sendInterest(attribute_entry->attrs_, routing_entry);
01075       }
01076       // Move to the next RoutingEntry
01077       routing_itr++;
01078       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01079                                         &routing_itr);
01080     }
01081 
01082       break;
01083 
01084   case DATA:
01085 
01086     DiffPrint(DEBUG_NO_DETAILS, "Received Data !\n");
01087 
01088     // Create data message forwarding cache
01089     forwarding_history = new DataForwardingHistory;
01090 
01091     // Find the correct routing entry
01092     routing_itr = routing_list_.begin();
01093     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01094                                       &routing_itr);
01095 
01096     while (routing_entry){
01097       forwardData(msg, routing_entry, forwarding_history);
01098       routing_itr++;
01099       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01100                                         &routing_itr);
01101     }
01102 
01103     delete forwarding_history;
01104 
01105     break;
01106 
01107   case EXPLORATORY_DATA:
01108 
01109     DiffPrint(DEBUG_NO_DETAILS, "Received Exploratory Data !\n");
01110 
01111     // Create data message forwarding cache
01112     forwarding_history = new DataForwardingHistory;
01113 
01114     // Find the correct routing entry
01115     routing_itr = routing_list_.begin();
01116     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01117                                       &routing_itr);
01118 
01119     while (routing_entry){
01120       forwardExploratoryData(msg, routing_entry, forwarding_history);
01121       routing_itr++;
01122       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01123                                         &routing_itr);
01124     }
01125 
01126     // Delete data forwarding cache
01127     delete forwarding_history;
01128 
01129     break;
01130 
01131   case PUSH_EXPLORATORY_DATA:
01132 
01133     DiffPrint(DEBUG_NO_DETAILS, "Received Push Exploratory Data !\n");
01134 
01135     // Create data message forwarding cache
01136     forwarding_history = new DataForwardingHistory;
01137 
01138     // Forward data message
01139     forwardPushExploratoryData(msg, forwarding_history);
01140 
01141     // Delete data forwarding cache
01142     delete forwarding_history;
01143 
01144     break;
01145 
01146   case POSITIVE_REINFORCEMENT:
01147 
01148     DiffPrint(DEBUG_NO_DETAILS, "Received a Positive Reinforcement !\n");
01149 
01150     // Step 0: Look for reinforcement attribute
01151     place = msg->msg_attr_vec_->begin();
01152     reinforcement_attr = ReinforcementAttr.find_from(msg->msg_attr_vec_,
01153                                                      place, &place);
01154     if (!reinforcement_attr){
01155       DiffPrint(DEBUG_ALWAYS,
01156                 "Error: Received an invalid Positive Reinforcement message !\n");
01157       return;
01158     }
01159 
01160     // Step 1: Extract reinforcement blob from message and look for an
01161     // entry in our hash table
01162     reinforcement_blob = (ReinforcementBlob *) reinforcement_attr->getVal();
01163 
01164     key[0] = reinforcement_blob->pkt_num_;
01165     key[1] = reinforcement_blob->rdm_id_;
01166 
01167     hash_entry = getHash(key[0], key[1]);
01168 
01169     // Step 2: Remove the reinforcement attribute from the message
01170     msg->msg_attr_vec_->erase(place);
01171 
01172     // Step 3: Find a routing entry that matches this message
01173     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01174 
01175     if (!routing_entry){
01176       // So, if we do not know about this data type, this must be a
01177       // reinforcement message to a PUSHED_EXPLORATORY_DATA message
01178 
01179       // Check for class/scope (all interest message should have it)
01180       nrclass = NRClassAttr.find(msg->msg_attr_vec_);
01181       nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
01182 
01183       if (!nrclass || !nrscope){
01184         DiffPrint(DEBUG_ALWAYS,
01185                   "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01186         return;
01187       }
01188 
01189       // Create new Routing Entry
01190       routing_entry = new RoutingEntry;
01191       routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01192       routing_list_.push_back(routing_entry);
01193     }
01194 
01195     // Add reinforced gradient to last_hop
01196     updateGradient(routing_entry, msg->last_hop_, true);
01197 
01198     // Add the reinforcement attribute back to the message
01199     msg->msg_attr_vec_->push_back(reinforcement_attr);
01200 
01201     // If we have no record of this message it is either because we
01202     // originated the message (in which case, no further action is
01203     // required) or because we dropped it a long time ago because of
01204     // our hashing configuration parameters (in this case, we can't do
01205     // anything)
01206     if (hash_entry){
01207       msg->next_hop_ = hash_entry->last_hop_;
01208 
01209       DiffPrint(DEBUG_NO_DETAILS,
01210                 "Forwarding Positive Reinforcement to node %d !\n",
01211                 hash_entry->last_hop_);
01212 
01213       ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
01214     }
01215 
01216     break;
01217 
01218   case NEGATIVE_REINFORCEMENT:
01219 
01220     DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n");
01221 
01222     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01223 
01224     if (routing_entry){
01225       gradient_entry = findReinforcedGradient(msg->last_hop_, routing_entry);
01226 
01227       if (gradient_entry){
01228         // Remove reinforced gradient to last_hop
01229         deleteGradient(routing_entry, gradient_entry);
01230 
01231         gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
01232                                                  routing_entry->gradients_.begin(),
01233                                                  &gradient_itr);
01234 
01235         // If there are no other reinforced outgoing gradients
01236         // we need to send our own negative reinforcement
01237         if (!gradient_entry){
01238           my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
01239                                0, 0, routing_entry->attrs_->size(), pkt_count_,
01240                                random_id_, BROADCAST_ADDR, LOCALHOST_ADDR);
01241           my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
01242 
01243           DiffPrint(DEBUG_NO_DETAILS,
01244                     "Forwarding Negative Reinforcement to ALL !\n");
01245 
01246           ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_);
01247 
01248           pkt_count_++;
01249           delete my_msg;
01250         }
01251       }
01252     }
01253 
01254     break;
01255 
01256   default:
01257 
01258     break;
01259   }
01260 }
01261 
01262 HashEntry * GradientFilter::getHash(unsigned int pkt_num,
01263                                     unsigned int rdm_id)
01264 {
01265    unsigned int key[2];
01266    
01267    key[0] = pkt_num;
01268    key[1] = rdm_id;
01269    
01270    Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
01271    
01272    if (entryPtr == NULL)
01273       return NULL;
01274    
01275    return ((HashEntry *) Tcl_GetHashValue(entryPtr));
01276 }
01277 
01278 void GradientFilter::putHash(HashEntry *new_hash_entry,
01279                              unsigned int pkt_num,
01280                              unsigned int rdm_id)
01281 {
01282    Tcl_HashEntry *tcl_hash_entry;
01283    HashEntry *hash_entry;
01284    HashList::iterator hash_itr;
01285    unsigned int key[2];
01286    int new_hash_key;
01287  
01288    if (hash_list_.size() == HASH_TABLE_DATA_MAX_SIZE){
01289       // Hash table reached maximum size
01290       
01291       for (int i = 0; ((i < HASH_TABLE_DATA_REMOVE_AT_ONCE)
01292                        && (hash_list_.size() > 0)); i++){
01293          hash_itr = hash_list_.begin();
01294          tcl_hash_entry = *hash_itr;
01295          hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
01296          delete hash_entry;
01297          hash_list_.erase(hash_itr);
01298          Tcl_DeleteHashEntry(tcl_hash_entry);
01299       }
01300    }
01301   
01302    key[0] = pkt_num;
01303    key[1] = rdm_id;
01304    
01305    tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *) key, &new_hash_key);
01306 
01307    if (new_hash_key == 0){
01308       DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n");
01309       return;
01310    }
01311 
01312    Tcl_SetHashValue(tcl_hash_entry, new_hash_entry);
01313    hash_list_.push_back(tcl_hash_entry);
01314 }
01315 
01316 handle GradientFilter::setupFilter()
01317 {
01318   NRAttrVec attrs;
01319   handle h;
01320 
01321   // For the gradient filter, we use a single attribute with an "IS"
01322   // operator. This causes this filter to match every single packet
01323   // getting to diffusion
01324   attrs.push_back(NRClassAttr.make(NRAttribute::IS,
01325                                    NRAttribute::INTEREST_CLASS));
01326 
01327   h = ((DiffusionRouting *)dr_)->addFilter(&attrs,
01328                                            GRADIENT_FILTER_PRIORITY, filter_callback_);
01329 
01330   ClearAttrs(&attrs);
01331   return h;
01332 }
01333 
01334 #ifndef NS_DIFFUSION
01335 void GradientFilter::run()
01336 {
01337   // Doesn't do anything
01338   while (1){
01339     sleep(1000);
01340   }
01341 }
01342 #endif // !NS_DIFFUSION
01343 
01344 #ifdef NS_DIFFUSION
01345 GradientFilter::GradientFilter(const char *diffrtg)
01346 {
01347   DiffAppAgent *agent;
01348 #else
01349 GradientFilter::GradientFilter(int argc, char **argv)
01350 {
01351 #endif // NS_DIFFUSION
01352   struct timeval tv;
01353   TimerCallback *reinforcement_timer, *gradient_timer;
01354 
01355   GetTime(&tv);
01356   SetSeed(&tv);
01357   pkt_count_ = GetRand();
01358   random_id_ = GetRand();
01359 
01360   // Create Diffusion Routing class
01361 #ifdef NS_DIFFUSION
01362   agent = (DiffAppAgent *)TclObject::lookup(diffrtg);
01363   dr_ = agent->dr();
01364 #else
01365   parseCommandLine(argc, argv);
01366   dr_ = NR::createNR(diffusion_port_);
01367 #endif // NS_DIFFUSION
01368 
01369   // Create callback classes and set up pointers
01370   filter_callback_ = new GradientFilterReceive(this);
01371 
01372   // Initialize Hashing structures
01373   Tcl_InitHashTable(&htable_, 2);
01374 
01375   // Set up the filter
01376   filter_handle_ = setupFilter();
01377 
01378   // Print filter information
01379   DiffPrint(DEBUG_IMPORTANT,
01380             "Gradient filter subscribed to *, received handle %d\n",
01381             filter_handle_);
01382 
01383   // Add timers for keeping state up-to-date
01384   gradient_timer = new GradientExpirationCheckTimer(this);
01385   ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer);
01386 
01387   reinforcement_timer = new ReinforcementCheckTimer(this);
01388   ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer);
01389 
01390   GetTime(&tv);
01391 
01392   DiffPrint(DEBUG_ALWAYS, "Gradient filter initialized at time %ld:%ld!\n",
01393             tv.tv_sec, tv.tv_usec);
01394 }
01395 
01396 #ifndef USE_SINGLE_ADDRESS_SPACE
01397 int main(int argc, char **argv)
01398 {
01399   GradientFilter *app;
01400 
01401   // Initialize and run the Gradient Filter
01402   app = new GradientFilter(argc, argv);
01403   app->run();
01404 
01405   return 0;
01406 }
01407 #endif // !USE_SINGLE_ADDRESS_SPACE

Generated on Tue Apr 20 12:14:18 2004 for NS2.26SourcesOriginal by doxygen 1.3.3