#include <gradient.hh>
Inheritance diagram for GradientFilter:


Public Member Functions | |
| GradientFilter (int argc, char **argv) | |
| void | run () |
| virtual | ~GradientFilter () |
| void | recv (Message *msg, handle h) |
| void | messageTimeout (Message *msg) |
| void | interestTimeout (Message *msg) |
| void | gradientTimeout () |
| void | reinforcementTimeout () |
| int | subscriptionTimeout (NRAttrVec *attrs) |
Protected Member Functions | |
| handle | setupFilter () |
| RoutingEntry * | findRoutingEntry (NRAttrVec *attrs) |
| void | deleteRoutingEntry (RoutingEntry *routing_entry) |
| RoutingEntry * | matchRoutingEntry (NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place) |
| AttributeEntry * | findMatchingSubscription (RoutingEntry *routing_entry, NRAttrVec *attrs) |
| void | updateGradient (RoutingEntry *routing_entry, int32_t last_hop, bool reinforced) |
| void | updateAgent (RoutingEntry *routing_entry, u_int16_t source_port) |
| GradientEntry * | findReinforcedGradients (GradientList *agents, GradientList::iterator start, GradientList::iterator *place) |
| GradientEntry * | findReinforcedGradient (int32_t node_addr, RoutingEntry *routing_entry) |
| void | deleteGradient (RoutingEntry *routing_entry, GradientEntry *gradient_entry) |
| void | setReinforcementFlags (RoutingEntry *routing_entry, int32_t last_hop, int new_message) |
| void | sendInterest (NRAttrVec *attrs, RoutingEntry *routing_entry) |
| void | sendDisinterest (NRAttrVec *attrs, RoutingEntry *routing_entry) |
| void | sendPositiveReinforcement (NRAttrVec *reinf_attrs, int32_t data_rdm_id, int32_t data_pkt_num, int32_t destination) |
| void | forwardData (Message *msg, RoutingEntry *routing_entry, DataForwardingHistory *forwarding_history) |
| void | forwardExploratoryData (Message *msg, RoutingEntry *routing_entry, DataForwardingHistory *forwarding_history) |
| void | forwardPushExploratoryData (Message *msg, DataForwardingHistory *forwarding_history) |
| void | processOldMessage (Message *msg) |
| void | processNewMessage (Message *msg) |
| HashEntry * | getHash (unsigned int pkt_num, unsigned int rdm_id) |
| void | putHash (HashEntry *new_hash_entry, unsigned int pkt_num, unsigned int rdm_id) |
| void | usage (char *s) |
| void | parseCommandLine (int argc, char **argv) |
Protected Attributes | |
| handle | filter_handle_ |
| int | pkt_count_ |
| int | random_id_ |
| HashList | hash_list_ |
| Tcl_HashTable | htable_ |
| GradientFilterReceive * | filter_callback_ |
| RoutingTable | routing_list_ |
| NR * | dr_ |
| u_int16_t | diffusion_port_ |
| char * | config_file_ |
|
||||||||||||
|
Definition at line 1349 of file gradient.cc. References agent, NR::createNR(), DEBUG_ALWAYS, DEBUG_IMPORTANT, DiffPrint(), DiffApp::diffusion_port_, DiffApp::dr_, filter_callback_, filter_handle_, GetRand(), GetTime(), GRADIENT_DELAY, htable_, DiffApp::parseCommandLine(), pkt_count_, random_id_, REINFORCEMENT_DELAY, SetSeed(), and setupFilter().
01350 {
01351 #endif // NS_DIFFUSION
01352 struct timeval tv;
01353 TimerCallback *reinforcement_timer, *gradient_timer;
01354
01355 GetTime(&tv);
01356 SetSeed(&tv);
01357 pkt_count_ = GetRand();
01358 random_id_ = GetRand();
01359
01360 // Create Diffusion Routing class
01361 #ifdef NS_DIFFUSION
01362 agent = (DiffAppAgent *)TclObject::lookup(diffrtg);
01363 dr_ = agent->dr();
01364 #else
01365 parseCommandLine(argc, argv);
01366 dr_ = NR::createNR(diffusion_port_);
01367 #endif // NS_DIFFUSION
01368
01369 // Create callback classes and set up pointers
01370 filter_callback_ = new GradientFilterReceive(this);
01371
01372 // Initialize Hashing structures
01373 Tcl_InitHashTable(&htable_, 2);
01374
01375 // Set up the filter
01376 filter_handle_ = setupFilter();
01377
01378 // Print filter information
01379 DiffPrint(DEBUG_IMPORTANT,
01380 "Gradient filter subscribed to *, received handle %d\n",
01381 filter_handle_);
01382
01383 // Add timers for keeping state up-to-date
01384 gradient_timer = new GradientExpirationCheckTimer(this);
01385 ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer);
01386
01387 reinforcement_timer = new ReinforcementCheckTimer(this);
01388 ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer);
01389
01390 GetTime(&tv);
01391
01392 DiffPrint(DEBUG_ALWAYS, "Gradient filter initialized at time %ld:%ld!\n",
01393 tv.tv_sec, tv.tv_usec);
01394 }
|
Here is the call graph for this function:

|
|
Definition at line 251 of file gradient.hh.
00252 {
00253 // Nothing to do
00254 };
|
|
||||||||||||
|
Definition at line 778 of file gradient.cc. References DEBUG_ALWAYS, DiffPrint(), and RoutingEntry::gradients_. Referenced by processNewMessage().
00780 {
00781 GradientList::iterator gradient_itr;
00782 GradientEntry *current_entry;
00783
00784 for (gradient_itr = routing_entry->gradients_.begin();
00785 gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00786 current_entry = *gradient_itr;
00787 if (current_entry == gradient_entry){
00788 gradient_itr = routing_entry->gradients_.erase(gradient_itr);
00789 delete gradient_entry;
00790 return;
00791 }
00792 }
00793 DiffPrint(DEBUG_ALWAYS,
00794 "Error: deleteGradient could not find gradient to delete !\n");
00795 }
|
Here is the call graph for this function:

|
|
Definition at line 301 of file gradient.cc. References DEBUG_ALWAYS, DiffPrint(), and routing_list_.
00302 {
00303 RoutingTable::iterator routing_itr;
00304 RoutingEntry *current_entry;
00305
00306 for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00307 current_entry = *routing_itr;
00308 if (current_entry == routing_entry){
00309 routing_itr = routing_list_.erase(routing_itr);
00310 delete routing_entry;
00311 return;
00312 }
00313 }
00314 DiffPrint(DEBUG_ALWAYS, "Error: deleteRoutingEntry could not find entry to delete !\n");
00315 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 345 of file gradient.cc. References RoutingEntry::attr_list_, AttributeEntry::attrs_, and PerfectMatch(). Referenced by processNewMessage().
00347 {
00348 AttributeList::iterator attribute_itr;
00349 AttributeEntry *attribute_entry;
00350
00351 for (attribute_itr = routing_entry->attr_list_.begin(); attribute_itr != routing_entry->attr_list_.end(); ++attribute_itr){
00352 attribute_entry = *attribute_itr;
00353 if (PerfectMatch(attribute_entry->attrs_, attrs))
00354 return attribute_entry;
00355 }
00356 return NULL;
00357 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 753 of file gradient.cc. References findReinforcedGradients(), RoutingEntry::gradients_, and GradientEntry::node_addr_. Referenced by processNewMessage().
00755 {
00756 GradientList::iterator gradient_itr;
00757 GradientEntry *gradient_entry;
00758
00759 gradient_itr = routing_entry->gradients_.begin();
00760 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00761 gradient_itr, &gradient_itr);
00762
00763 if (gradient_entry){
00764 while(gradient_entry){
00765 if (gradient_entry->node_addr_ == node_addr)
00766 return gradient_entry;
00767
00768 // This is not the gradient we are looking for, keep looking
00769 gradient_itr++;
00770 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00771 gradient_itr, &gradient_itr);
00772 }
00773 }
00774
00775 return NULL;
00776 }
|
Here is the call graph for this function:

|
||||||||||||||||
|
Definition at line 735 of file gradient.cc. References GradientEntry::reinforced_. Referenced by findReinforcedGradient(), forwardData(), and processNewMessage().
00738 {
00739 GradientList::iterator gradient_itr;
00740 GradientEntry *gradient_entry;
00741
00742 for (gradient_itr = start; gradient_itr != gradients->end(); ++gradient_itr){
00743 gradient_entry = *gradient_itr;
00744 if (gradient_entry->reinforced_){
00745 *place = gradient_itr;
00746 return gradient_entry;
00747 }
00748 }
00749
00750 return NULL;
00751 }
|
|
|
Definition at line 332 of file gradient.cc. References RoutingEntry::attrs_, PerfectMatch(), and routing_list_. Referenced by processNewMessage(), processOldMessage(), and subscriptionTimeout().
00333 {
00334 RoutingTable::iterator routing_itr;
00335 RoutingEntry *routing_entry;
00336
00337 for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00338 routing_entry = *routing_itr;
00339 if (PerfectMatch(routing_entry->attrs_, attrs))
00340 return routing_entry;
00341 }
00342 return NULL;
00343 }
|
Here is the call graph for this function:

|
||||||||||||||||
|
Definition at line 597 of file gradient.cc. References RoutingEntry::agents_, DataForwardingHistory::alreadyForwardedToLibrary(), DataForwardingHistory::alreadyForwardedToNetwork(), RoutingEntry::attrs_, CopyAttrs(), CopyMessage(), DEBUG_NO_DETAILS, DiffPrint(), DIFFUSION_VERSION, DiffApp::dr_, filter_handle_, findReinforcedGradients(), DataForwardingHistory::forwardingToLibrary(), DataForwardingHistory::forwardingToNetwork(), RoutingEntry::gradients_, Message::last_hop_, LOCALHOST_ADDR, Message::msg_attr_vec_, NEGATIVE_REINFORCEMENT, NEW_MESSAGE, Message::next_hop_, Message::next_port_, GradientEntry::node_addr_, pkt_count_, AgentEntry::port_, random_id_, and setReinforcementFlags(). Referenced by processNewMessage().
00599 {
00600 GradientList::iterator gradient_itr;
00601 AgentList::iterator agent_itr;
00602 GradientEntry *gradient_entry;
00603 AgentEntry *agent_entry;
00604 Message *sink_message, *negative_reinforcement_msg;
00605 bool has_sink = false;
00606
00607 sink_message = CopyMessage(msg);
00608
00609 // Step 1: Sink Processing
00610 for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00611 agent_entry = *agent_itr;
00612
00613 has_sink = true;
00614
00615 if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00616 // Forward DATA to local sinks
00617 sink_message->next_hop_ = LOCALHOST_ADDR;
00618 sink_message->next_port_ = agent_entry->port_;
00619
00620 // Add agent to the forwarding list
00621 forwarding_history->forwardingToLibrary(agent_entry->port_);
00622
00623 ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00624 }
00625 }
00626
00627 delete sink_message;
00628
00629 // Step 2: Intermediate Processing
00630
00631 // Set reinforcement flags
00632 if (msg->last_hop_ != LOCALHOST_ADDR){
00633 setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00634 }
00635
00636 // Forward DATA only to reinforced gradients
00637 gradient_itr = routing_entry->gradients_.begin();
00638 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00639 gradient_itr, &gradient_itr);
00640
00641 if (gradient_entry){
00642 while (gradient_entry){
00643
00644 // Found reinforced gradient, forward data message to this
00645 // neighbor only if the messages comes from a different neighbor
00646 if (gradient_entry->node_addr_ != msg->last_hop_){
00647 msg->next_hop_ = gradient_entry->node_addr_;
00648
00649 // Check if we have forwarded the message to this neighbor already
00650 if (!forwarding_history->alreadyForwardedToNetwork(msg->next_hop_)){
00651 DiffPrint(DEBUG_NO_DETAILS,
00652 "Forwarding data using Reinforced Gradient to node %d !\n",
00653 gradient_entry->node_addr_);
00654
00655 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00656
00657 // Add the node to the forwarding history
00658 forwarding_history->forwardingToNetwork(msg->next_hop_);
00659 }
00660 }
00661
00662 // Move to the next one
00663 gradient_itr++;
00664 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00665 gradient_itr, &gradient_itr);
00666 }
00667 }
00668 else{
00669 // We could not find a reinforced path, so we send a negative
00670 // reinforcement to last_hop
00671 if ((!has_sink) && (msg->last_hop_ != LOCALHOST_ADDR)){
00672 negative_reinforcement_msg = new Message(DIFFUSION_VERSION,
00673 NEGATIVE_REINFORCEMENT,
00674 0, 0,
00675 routing_entry->attrs_->size(),
00676 pkt_count_,
00677 random_id_,
00678 msg->last_hop_,
00679 LOCALHOST_ADDR);
00680 negative_reinforcement_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00681
00682 DiffPrint(DEBUG_NO_DETAILS,
00683 "Sending Negative Reinforcement to node %d !\n",
00684 msg->last_hop_);
00685
00686 ((DiffusionRouting *)dr_)->sendMessage(negative_reinforcement_msg,
00687 filter_handle_);
00688
00689 pkt_count_++;
00690 delete negative_reinforcement_msg;
00691 }
00692 }
00693 }
|
Here is the call graph for this function:

|
||||||||||||||||
|
Definition at line 492 of file gradient.cc. References RoutingEntry::agents_, DataForwardingHistory::alreadyForwardedToLibrary(), DataForwardingHistory::alreadyForwardedToNetwork(), DataForwardingHistory::alreadyReinforced(), RoutingEntry::attrs_, BROADCAST_ADDR, CopyMessage(), DiffApp::dr_, filter_handle_, DataForwardingHistory::forwardingToLibrary(), DataForwardingHistory::forwardingToNetwork(), GetRand(), RoutingEntry::gradients_, Message::last_hop_, LOCALHOST_ADDR, NEW_MESSAGE, Message::next_hop_, Message::next_port_, GradientEntry::node_addr_, Message::pkt_num_, AgentEntry::port_, putHash(), RAND_MAX, Message::rdm_id_, DataForwardingHistory::sendingReinforcement(), sendPositiveReinforcement(), and setReinforcementFlags(). Referenced by processNewMessage().
00495 {
00496 #ifdef USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00497 Message *data_msg;
00498 TimerCallback *data_timer;
00499 #else
00500 GradientList::iterator gradient_itr;
00501 GradientEntry *gradient_entry;
00502 #endif // USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00503 AgentList::iterator agent_itr;
00504 AgentEntry *agent_entry;
00505 Message *sink_message;
00506 unsigned int key[2];
00507 HashEntry *hash_entry;
00508
00509 sink_message = CopyMessage(msg);
00510
00511 // Step 1: Sink Processing
00512 for (agent_itr = routing_entry->agents_.begin();
00513 agent_itr != routing_entry->agents_.end(); ++agent_itr){
00514 agent_entry = *agent_itr;
00515
00516 if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00517 // Forward the data message to local sinks
00518 sink_message->next_hop_ = LOCALHOST_ADDR;
00519 sink_message->next_port_ = agent_entry->port_;
00520
00521 // Add agent to the forwarding list
00522 forwarding_history->forwardingToLibrary(agent_entry->port_);
00523
00524 ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00525 }
00526 }
00527
00528 delete sink_message;
00529
00530 // Step 1A: Reinforcement Processing
00531 if ((!forwarding_history->alreadyReinforced()) &&
00532 (routing_entry->agents_.size() > 0) &&
00533 (msg->last_hop_ != LOCALHOST_ADDR)){
00534 // Send reinforcement to 'last_hop'
00535 sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00536 msg->pkt_num_, msg->last_hop_);
00537 // Record reinforcement in the forwarding history so we do it only
00538 // once per received data message
00539 forwarding_history->sendingReinforcement();
00540 }
00541
00542 // Step 2: Intermediate Processing
00543
00544 // Set reinforcement flags
00545 if (msg->last_hop_ != LOCALHOST_ADDR){
00546 setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00547 }
00548
00549 // Add message information to the hash table
00550 if (msg->last_hop_ != LOCALHOST_ADDR){
00551 key[0] = msg->pkt_num_;
00552 key[1] = msg->rdm_id_;
00553
00554 hash_entry = new HashEntry(msg->last_hop_);
00555
00556 putHash(hash_entry, key[0], key[1]);
00557 }
00558
00559 // Forward the EXPLORATORY message
00560 #ifdef USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00561 if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00562 if (routing_entry->gradients_.size() > 0){
00563 // Broadcast DATA message
00564 data_msg = CopyMessage(msg);
00565 data_msg->next_hop_ = BROADCAST_ADDR;
00566
00567 // Add to the forwarding history
00568 forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00569
00570 data_timer = new MessageSendTimer(this, data_msg);
00571
00572 // Add timer for forwarding the data packet
00573 ((DiffusionRouting *)dr_)->addTimer(DATA_FORWARD_DELAY +
00574 (int) ((DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (DATA_FORWARD_JITTER / 2))),
00575 data_timer);
00576 }
00577 }
00578 #else
00579 // Forward DATA to all output gradients
00580 for (gradient_itr = routing_entry->gradients_.begin();
00581 gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00582
00583 gradient_entry = *gradient_itr;
00584
00585 // Check forwarding history
00586 if (!forwarding_history->alreadyForwardedToNetwork(gradient_entry->node_addr_)){
00587 msg->next_hop_ = gradient_entry->node_addr_;
00588 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00589
00590 // Add to the forwarding history
00591 forwarding_history->forwardingToNetwork(gradient_entry->node_addr_);
00592 }
00593 }
00594 #endif // USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00595 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 405 of file gradient.cc. References RoutingEntry::agents_, DataForwardingHistory::alreadyForwardedToLibrary(), DataForwardingHistory::alreadyForwardedToNetwork(), DataForwardingHistory::alreadyReinforced(), RoutingEntry::attrs_, BROADCAST_ADDR, CopyMessage(), DiffApp::dr_, filter_handle_, DataForwardingHistory::forwardingToLibrary(), DataForwardingHistory::forwardingToNetwork(), GetRand(), Message::last_hop_, LOCALHOST_ADDR, matchRoutingEntry(), Message::msg_attr_vec_, Message::next_hop_, Message::next_port_, Message::pkt_num_, AgentEntry::port_, putHash(), RAND_MAX, Message::rdm_id_, routing_list_, DataForwardingHistory::sendingReinforcement(), and sendPositiveReinforcement(). Referenced by processNewMessage().
00407 {
00408 RoutingTable::iterator routing_itr;
00409 RoutingEntry *routing_entry;
00410 AgentList::iterator agent_itr;
00411 AgentEntry *agent_entry;
00412 Message *data_msg, *sink_message;
00413 TimerCallback *data_timer;
00414 unsigned int key[2];
00415 HashEntry *hash_entry;
00416
00417 // Sink processing
00418 routing_itr = routing_list_.begin();
00419 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00420 &routing_itr);
00421
00422 sink_message = CopyMessage(msg);
00423
00424 while (routing_entry){
00425
00426 // Forward message to all local sinks
00427 for (agent_itr = routing_entry->agents_.begin();
00428 agent_itr != routing_entry->agents_.end(); ++agent_itr){
00429 agent_entry = *agent_itr;
00430
00431 if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00432 // Send DATA message to local sinks
00433 sink_message->next_hop_ = LOCALHOST_ADDR;
00434 sink_message->next_port_ = agent_entry->port_;
00435
00436 ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00437
00438 // Add agent to the forwarding history
00439 forwarding_history->forwardingToLibrary(agent_entry->port_);
00440 }
00441 }
00442
00443 if ((!forwarding_history->alreadyReinforced()) &&
00444 (routing_entry->agents_.size() > 0) &&
00445 (msg->last_hop_ != LOCALHOST_ADDR)){
00446 // Send a positive reinforcement if we have sinks
00447 sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00448 msg->pkt_num_, msg->last_hop_);
00449 // Record reinforcement in the forwarding history so we do it
00450 // only once per received data message
00451 forwarding_history->sendingReinforcement();
00452 }
00453
00454 // Look for other matching data types
00455 routing_itr++;
00456 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00457 &routing_itr);
00458 }
00459
00460 // Delete sink_message after sink processing
00461 delete sink_message;
00462
00463 // Intermediate node processing
00464
00465 // Add message information to the hash table
00466 if (msg->last_hop_ != LOCALHOST_ADDR){
00467 key[0] = msg->pkt_num_;
00468 key[1] = msg->rdm_id_;
00469
00470 hash_entry = new HashEntry(msg->last_hop_);
00471
00472 putHash(hash_entry, key[0], key[1]);
00473 }
00474
00475 // Rebroadcast the exploratory push data message
00476 if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00477 data_msg = CopyMessage(msg);
00478 data_msg->next_hop_ = BROADCAST_ADDR;
00479
00480 data_timer = new MessageSendTimer(this, data_msg);
00481
00482 // Add data timer to the queue
00483 ((DiffusionRouting *)dr_)->addTimer(PUSH_DATA_FORWARD_DELAY +
00484 (int) ((PUSH_DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (PUSH_DATA_FORWARD_JITTER / 2))),
00485 data_timer);
00486
00487 // Add broadcast information to forwarding history
00488 forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00489 }
00490 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 1262 of file gradient.cc. References htable_. Referenced by processNewMessage().
01264 {
01265 unsigned int key[2];
01266
01267 key[0] = pkt_num;
01268 key[1] = rdm_id;
01269
01270 Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
01271
01272 if (entryPtr == NULL)
01273 return NULL;
01274
01275 return ((HashEntry *) Tcl_GetHashValue(entryPtr));
01276 }
|
|
|
Definition at line 125 of file gradient.cc. References RoutingEntry::agents_, DEBUG_DETAILS, DEBUG_MORE_DETAILS, DEBUG_NO_DETAILS, DiffPrint(), GetTime(), GRADIENT_TIMEOUT, RoutingEntry::gradients_, GradientEntry::node_addr_, AgentEntry::port_, routing_list_, AgentEntry::tv_, and GradientEntry::tv_. Referenced by GradientExpirationCheckTimer::expire().
00126 {
00127 RoutingTable::iterator routing_itr;
00128 GradientList::iterator grad_itr;
00129 AgentList::iterator agent_itr;
00130 RoutingEntry *routing_entry;
00131 GradientEntry *gradient_entry;
00132 AgentEntry *agent_entry;
00133 struct timeval tmv;
00134
00135 DiffPrint(DEBUG_MORE_DETAILS, "Gradient Timeout !\n");
00136
00137 GetTime(&tmv);
00138
00139 routing_itr = routing_list_.begin();
00140
00141 while (routing_itr != routing_list_.end()){
00142 routing_entry = *routing_itr;
00143
00144 // Step 1: Delete expired gradients
00145 grad_itr = routing_entry->gradients_.begin();
00146 while (grad_itr != routing_entry->gradients_.end()){
00147 gradient_entry = *grad_itr;
00148 if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00149
00150 DiffPrint(DEBUG_NO_DETAILS, "Deleting Gradient to node %d !\n",
00151 gradient_entry->node_addr_);
00152
00153 grad_itr = routing_entry->gradients_.erase(grad_itr);
00154 delete gradient_entry;
00155 }
00156 else{
00157 grad_itr++;
00158 }
00159 }
00160
00161 // Step 2: Remove non-active agents
00162 agent_itr = routing_entry->agents_.begin();
00163 while (agent_itr != routing_entry->agents_.end()){
00164 agent_entry = *agent_itr;
00165 if (tmv.tv_sec > (agent_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00166
00167 DiffPrint(DEBUG_NO_DETAILS,
00168 "Deleting Gradient to agent %d !\n", agent_entry->port_);
00169
00170 agent_itr = routing_entry->agents_.erase(agent_itr);
00171 delete agent_entry;
00172 }
00173 else{
00174 agent_itr++;
00175 }
00176 }
00177
00178 // Remove the Routing Entry if no gradients and no agents
00179 if ((routing_entry->gradients_.size() == 0) &&
00180 (routing_entry->agents_.size() == 0)){
00181 // Deleting Routing Entry
00182 DiffPrint(DEBUG_DETAILS,
00183 "Nothing left for this data type, cleaning up !\n");
00184 routing_itr = routing_list_.erase(routing_itr);
00185 delete routing_entry;
00186 }
00187 else{
00188 routing_itr++;
00189 }
00190 }
00191 }
|
Here is the call graph for this function:

|
|
Definition at line 108 of file gradient.cc. References BROADCAST_ADDR, DEBUG_MORE_DETAILS, DiffPrint(), DiffApp::dr_, filter_handle_, Message::last_hop_, LOCALHOST_ADDR, and Message::next_hop_. Referenced by InterestForwardTimer::expire().
00109 {
00110 DiffPrint(DEBUG_MORE_DETAILS, "Interest Timeout !\n");
00111
00112 msg->last_hop_ = LOCALHOST_ADDR;
00113 msg->next_hop_ = BROADCAST_ADDR;
00114
00115 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00116 }
|
Here is the call graph for this function:

|
||||||||||||||||
|
Definition at line 317 of file gradient.cc. References RoutingEntry::attrs_, MatchAttrs(), and routing_list_. Referenced by forwardPushExploratoryData(), processNewMessage(), and processOldMessage().
00318 {
00319 RoutingTable::iterator routing_itr;
00320 RoutingEntry *routing_entry;
00321
00322 for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){
00323 routing_entry = *routing_itr;
00324 if (MatchAttrs(routing_entry->attrs_, attrs)){
00325 *place = routing_itr;
00326 return routing_entry;
00327 }
00328 }
00329 return NULL;
00330 }
|
Here is the call graph for this function:

|
|
Definition at line 118 of file gradient.cc. References DEBUG_MORE_DETAILS, DiffPrint(), DiffApp::dr_, and filter_handle_. Referenced by MessageSendTimer::expire().
00119 {
00120 DiffPrint(DEBUG_MORE_DETAILS, "Message Timeout !\n");
00121
00122 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00123 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 59 of file diffapp.cc. References DiffApp::config_file_, DEBUG_ALWAYS, DEFAULT_DIFFUSION_PORT, DiffPrint(), DiffApp::diffusion_port_, global_debug_level, optarg, u_int16_t, and DiffApp::usage(). Referenced by GeoRoutingFilter::GeoRoutingFilter(), GradientFilter(), PingSenderApp::PingSenderApp(), PushSenderApp::PushSenderApp(), and SrcRtFilter::SrcRtFilter().
00060 {
00061 u_int16_t diff_port = DEFAULT_DIFFUSION_PORT;
00062 int debug_level;
00063 int opt;
00064
00065 config_file_ = NULL;
00066 opterr = 0;
00067
00068 while (1){
00069 opt = getopt(argc, argv, "f:hd:p:");
00070 switch (opt){
00071
00072 case 'p':
00073
00074 diff_port = (u_int16_t) atoi(optarg);
00075 if ((diff_port < 1024) || (diff_port >= 65535)){
00076 DiffPrint(DEBUG_ALWAYS, "Error: Diffusion port must be between 1024 and 65535 !\n");
00077 exit(-1);
00078 }
00079
00080 break;
00081
00082 case 'h':
00083
00084 usage(argv[0]);
00085
00086 break;
00087
00088 case 'd':
00089
00090 debug_level = atoi(optarg);
00091
00092 if (debug_level < 1 || debug_level > 10){
00093 DiffPrint(DEBUG_ALWAYS, "Error: Debug level outside range or missing !\n");
00094 usage(argv[0]);
00095 }
00096
00097 global_debug_level = debug_level;
00098
00099 break;
00100
00101 case 'f':
00102
00103 if (!strncasecmp(optarg, "-", 1)){
00104 DiffPrint(DEBUG_ALWAYS, "Error: Parameter missing !\n");
00105 usage(argv[0]);
00106 }
00107
00108 config_file_ = strdup(optarg);
00109
00110 break;
00111
00112 case '?':
00113
00114 DiffPrint(DEBUG_ALWAYS,
00115 "Error: %c isn't a valid option or its parameter is missing !\n", optopt);
00116 usage(argv[0]);
00117
00118 break;
00119
00120 case ':':
00121
00122 DiffPrint(DEBUG_ALWAYS, "Parameter missing !\n");
00123 usage(argv[0]);
00124
00125 break;
00126
00127 }
00128
00129 if (opt == -1)
00130 break;
00131 }
00132
00133 diffusion_port_ = diff_port;
00134 }
|
Here is the call graph for this function:

|
|
Definition at line 971 of file gradient.cc. References RoutingEntry::attr_list_, AttributeEntry::attrs_, RoutingEntry::attrs_, BROADCAST_ADDR, CopyAttrs(), CopyMessage(), DATA, DEBUG_ALWAYS, DEBUG_NO_DETAILS, deleteGradient(), DiffPrint(), DIFFUSION_VERSION, DiffApp::dr_, EXPLORATORY_DATA, filter_handle_, findMatchingSubscription(), findReinforcedGradient(), findReinforcedGradients(), findRoutingEntry(), forwardData(), forwardExploratoryData(), forwardPushExploratoryData(), getHash(), NRAttribute::getOp(), GetRand(), GetTime(), NRSimpleAttribute< T >::getVal(), NRAttribute::GLOBAL_SCOPE, RoutingEntry::gradients_, INTEREST, NRAttribute::INTEREST_CLASS, NRAttribute::IS, HashEntry::last_hop_, Message::last_hop_, LOCALHOST_ADDR, matchRoutingEntry(), Message::msg_attr_vec_, Message::msg_type_, NEGATIVE_REINFORCEMENT, Message::next_hop_, NRAttribute::NODE_LOCAL_SCOPE, NRClassAttr, NRScopeAttr, pkt_count_, ReinforcementBlob::pkt_num_, POSITIVE_REINFORCEMENT, PUSH_EXPLORATORY_DATA, RAND_MAX, random_id_, ReinforcementBlob::rdm_id_, ReinforcementAttr, routing_list_, sendInterest(), Message::source_port_, SUBSCRIPTION_DELAY, AttributeEntry::tv_, updateAgent(), and updateGradient(). Referenced by recv().
00972 {
00973 NRSimpleAttribute<void *> *reinforcement_attr = NULL;
00974 DataForwardingHistory *forwarding_history;
00975 NRSimpleAttribute<int> *nrclass = NULL;
00976 NRSimpleAttribute<int> *nrscope = NULL;
00977 ReinforcementBlob *reinforcement_blob;
00978 RoutingTable::iterator routing_itr;
00979 RoutingEntry *routing_entry;
00980 GradientList::iterator gradient_itr;
00981 GradientEntry *gradient_entry;
00982 NRAttrVec::iterator place;
00983 HashEntry *hash_entry;
00984 AttributeEntry *attribute_entry;
00985 Message *my_msg;
00986 TimerCallback *interest_timer, *subscription_timer;
00987 unsigned int key[2];
00988 bool new_data_type = false;
00989
00990 switch (msg->msg_type_){
00991
00992 case INTEREST:
00993
00994 DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n");
00995
00996 nrclass = NRClassAttr.find(msg->msg_attr_vec_);
00997 nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
00998
00999 if (!nrclass || !nrscope){
01000 DiffPrint(DEBUG_ALWAYS,
01001 "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01002 return;
01003 }
01004
01005 // Step 1: Look for the same data type
01006 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01007
01008 if (!routing_entry){
01009 // Create a new routing entry for this data type
01010 routing_entry = new RoutingEntry;
01011 routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01012 routing_list_.push_back(routing_entry);
01013 new_data_type = true;
01014 }
01015
01016 if (msg->last_hop_ == LOCALHOST_ADDR){
01017 // From local agent
01018 updateAgent(routing_entry, msg->source_port_);
01019 }
01020 else{
01021 // From outside, we just add the new gradient
01022 updateGradient(routing_entry, msg->last_hop_, false);
01023 }
01024
01025 if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01026 (nrclass->getOp() == NRAttribute::IS)){
01027
01028 // Global interest messages should always be forwarded
01029 if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){
01030
01031 interest_timer = new InterestForwardTimer(this, CopyMessage(msg));
01032
01033 ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY +
01034 (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))),
01035 interest_timer);
01036 }
01037 }
01038 else{
01039 if ((nrclass->getOp() != NRAttribute::IS) &&
01040 (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) &&
01041 (new_data_type)){
01042
01043 subscription_timer = new SubscriptionExpirationTimer(this,
01044 CopyAttrs(msg->msg_attr_vec_));
01045
01046 ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY +
01047 (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)),
01048 subscription_timer);
01049 }
01050
01051 // Subscriptions don't have to match other subscriptions
01052 break;
01053 }
01054
01055 // Step 2: Match other routing tables
01056 routing_itr = routing_list_.begin();
01057 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01058 &routing_itr);
01059
01060 while (routing_entry){
01061 // Got a match
01062 attribute_entry = findMatchingSubscription(routing_entry,
01063 msg->msg_attr_vec_);
01064
01065 // Do we already have this subscription
01066 if (attribute_entry){
01067 GetTime(&(attribute_entry->tv_));
01068 }
01069 else{
01070 // Create a new attribute entry, add it to the attribute list
01071 // and send an interest message to the local agent
01072 attribute_entry = new AttributeEntry(CopyAttrs(msg->msg_attr_vec_));
01073 routing_entry->attr_list_.push_back(attribute_entry);
01074 sendInterest(attribute_entry->attrs_, routing_entry);
01075 }
01076 // Move to the next RoutingEntry
01077 routing_itr++;
01078 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01079 &routing_itr);
01080 }
01081
01082 break;
01083
01084 case DATA:
01085
01086 DiffPrint(DEBUG_NO_DETAILS, "Received Data !\n");
01087
01088 // Create data message forwarding cache
01089 forwarding_history = new DataForwardingHistory;
01090
01091 // Find the correct routing entry
01092 routing_itr = routing_list_.begin();
01093 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01094 &routing_itr);
01095
01096 while (routing_entry){
01097 forwardData(msg, routing_entry, forwarding_history);
01098 routing_itr++;
01099 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01100 &routing_itr);
01101 }
01102
01103 delete forwarding_history;
01104
01105 break;
01106
01107 case EXPLORATORY_DATA:
01108
01109 DiffPrint(DEBUG_NO_DETAILS, "Received Exploratory Data !\n");
01110
01111 // Create data message forwarding cache
01112 forwarding_history = new DataForwardingHistory;
01113
01114 // Find the correct routing entry
01115 routing_itr = routing_list_.begin();
01116 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01117 &routing_itr);
01118
01119 while (routing_entry){
01120 forwardExploratoryData(msg, routing_entry, forwarding_history);
01121 routing_itr++;
01122 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01123 &routing_itr);
01124 }
01125
01126 // Delete data forwarding cache
01127 delete forwarding_history;
01128
01129 break;
01130
01131 case PUSH_EXPLORATORY_DATA:
01132
01133 DiffPrint(DEBUG_NO_DETAILS, "Received Push Exploratory Data !\n");
01134
01135 // Create data message forwarding cache
01136 forwarding_history = new DataForwardingHistory;
01137
01138 // Forward data message
01139 forwardPushExploratoryData(msg, forwarding_history);
01140
01141 // Delete data forwarding cache
01142 delete forwarding_history;
01143
01144 break;
01145
01146 case POSITIVE_REINFORCEMENT:
01147
01148 DiffPrint(DEBUG_NO_DETAILS, "Received a Positive Reinforcement !\n");
01149
01150 // Step 0: Look for reinforcement attribute
01151 place = msg->msg_attr_vec_->begin();
01152 reinforcement_attr = ReinforcementAttr.find_from(msg->msg_attr_vec_,
01153 place, &place);
01154 if (!reinforcement_attr){
01155 DiffPrint(DEBUG_ALWAYS,
01156 "Error: Received an invalid Positive Reinforcement message !\n");
01157 return;
01158 }
01159
01160 // Step 1: Extract reinforcement blob from message and look for an
01161 // entry in our hash table
01162 reinforcement_blob = (ReinforcementBlob *) reinforcement_attr->getVal();
01163
01164 key[0] = reinforcement_blob->pkt_num_;
01165 key[1] = reinforcement_blob->rdm_id_;
01166
01167 hash_entry = getHash(key[0], key[1]);
01168
01169 // Step 2: Remove the reinforcement attribute from the message
01170 msg->msg_attr_vec_->erase(place);
01171
01172 // Step 3: Find a routing entry that matches this message
01173 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01174
01175 if (!routing_entry){
01176 // So, if we do not know about this data type, this must be a
01177 // reinforcement message to a PUSHED_EXPLORATORY_DATA message
01178
01179 // Check for class/scope (all interest message should have it)
01180 nrclass = NRClassAttr.find(msg->msg_attr_vec_);
01181 nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
01182
01183 if (!nrclass || !nrscope){
01184 DiffPrint(DEBUG_ALWAYS,
01185 "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01186 return;
01187 }
01188
01189 // Create new Routing Entry
01190 routing_entry = new RoutingEntry;
01191 routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01192 routing_list_.push_back(routing_entry);
01193 }
01194
01195 // Add reinforced gradient to last_hop
01196 updateGradient(routing_entry, msg->last_hop_, true);
01197
01198 // Add the reinforcement attribute back to the message
01199 msg->msg_attr_vec_->push_back(reinforcement_attr);
01200
01201 // If we have no record of this message it is either because we
01202 // originated the message (in which case, no further action is
01203 // required) or because we dropped it a long time ago because of
01204 // our hashing configuration parameters (in this case, we can't do
01205 // anything)
01206 if (hash_entry){
01207 msg->next_hop_ = hash_entry->last_hop_;
01208
01209 DiffPrint(DEBUG_NO_DETAILS,
01210 "Forwarding Positive Reinforcement to node %d !\n",
01211 hash_entry->last_hop_);
01212
01213 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
01214 }
01215
01216 break;
01217
01218 case NEGATIVE_REINFORCEMENT:
01219
01220 DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n");
01221
01222 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01223
01224 if (routing_entry){
01225 gradient_entry = findReinforcedGradient(msg->last_hop_, routing_entry);
01226
01227 if (gradient_entry){
01228 // Remove reinforced gradient to last_hop
01229 deleteGradient(routing_entry, gradient_entry);
01230
01231 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
01232 routing_entry->gradients_.begin(),
01233 &gradient_itr);
01234
01235 // If there are no other reinforced outgoing gradients
01236 // we need to send our own negative reinforcement
01237 if (!gradient_entry){
01238 my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
01239 0, 0, routing_entry->attrs_->size(), pkt_count_,
01240 random_id_, BROADCAST_ADDR, LOCALHOST_ADDR);
01241 my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
01242
01243 DiffPrint(DEBUG_NO_DETAILS,
01244 "Forwarding Negative Reinforcement to ALL !\n");
01245
01246 ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_);
01247
01248 pkt_count_++;
01249 delete my_msg;
01250 }
01251 }
01252 }
01253
01254 break;
01255
01256 default:
01257
01258 break;
01259 }
01260 }
|
Here is the call graph for this function:

|
|
Definition at line 884 of file gradient.cc. References DATA, DEBUG_ALWAYS, DEBUG_IMPORTANT, DEBUG_NO_DETAILS, DiffPrint(), EXPLORATORY_DATA, findRoutingEntry(), INTEREST, Message::last_hop_, LOCALHOST_ADDR, matchRoutingEntry(), Message::msg_attr_vec_, Message::msg_type_, NEGATIVE_REINFORCEMENT, OLD_MESSAGE, Message::pkt_num_, POSITIVE_REINFORCEMENT, PUSH_EXPLORATORY_DATA, Message::rdm_id_, routing_list_, setReinforcementFlags(), and updateGradient(). Referenced by recv().
00885 {
00886 RoutingEntry *routing_entry;
00887 RoutingTable::iterator routing_itr;
00888
00889 switch (msg->msg_type_){
00890
00891 case INTEREST:
00892
00893 DiffPrint(DEBUG_NO_DETAILS, "Received Old Interest !\n");
00894
00895 if (msg->last_hop_ == LOCALHOST_ADDR){
00896 // Old interest should not come from local agent
00897 DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local agent !\n");
00898 break;
00899 }
00900
00901 // Get the routing entry for these attrs
00902 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
00903 if (routing_entry)
00904 updateGradient(routing_entry, msg->last_hop_, false);
00905
00906 break;
00907
00908 case DATA:
00909
00910 DiffPrint(DEBUG_NO_DETAILS, "Received an old Data message !\n");
00911
00912 // Find the correct routing entry
00913 routing_itr = routing_list_.begin();
00914 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00915 &routing_itr);
00916
00917 while (routing_entry){
00918 DiffPrint(DEBUG_NO_DETAILS,
00919 "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_);
00920
00921 // Set reinforcement flags
00922 if (msg->last_hop_ != LOCALHOST_ADDR){
00923 setReinforcementFlags(routing_entry, msg->last_hop_, OLD_MESSAGE);
00924 }
00925
00926 // Continue going through other data types
00927 routing_itr++;
00928 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00929 &routing_itr);
00930 }
00931
00932 break;
00933
00934 case PUSH_EXPLORATORY_DATA:
00935
00936 // Just drop it
00937 DiffPrint(DEBUG_NO_DETAILS,
00938 "Received an old Push Exploratory Data. Loop detected !\n");
00939
00940 break;
00941
00942 case EXPLORATORY_DATA:
00943
00944 // Just drop it
00945 DiffPrint(DEBUG_NO_DETAILS,
00946 "Received an old Exploratory Data. Loop detected !\n");
00947
00948 break;
00949
00950 case POSITIVE_REINFORCEMENT:
00951
00952 DiffPrint(DEBUG_IMPORTANT, "Received an old Positive Reinforcement !\n");
00953
00954 break;
00955
00956 case NEGATIVE_REINFORCEMENT:
00957
00958 DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n");
00959
00960 DiffPrint(DEBUG_IMPORTANT, "pkt_num = %d, rdm_id = %d !\n",
00961 msg->pkt_num_, msg->rdm_id_);
00962
00963 break;
00964
00965 default:
00966
00967 break;
00968 }
00969 }
|
Here is the call graph for this function:

|
||||||||||||||||
|
Definition at line 1278 of file gradient.cc. References DEBUG_IMPORTANT, DiffPrint(), hash_list_, HASH_TABLE_DATA_MAX_SIZE, HASH_TABLE_DATA_REMOVE_AT_ONCE, and htable_. Referenced by forwardExploratoryData(), and forwardPushExploratoryData().
01281 {
01282 Tcl_HashEntry *tcl_hash_entry;
01283 HashEntry *hash_entry;
01284 HashList::iterator hash_itr;
01285 unsigned int key[2];
01286 int new_hash_key;
01287
01288 if (hash_list_.size() == HASH_TABLE_DATA_MAX_SIZE){
01289 // Hash table reached maximum size
01290
01291 for (int i = 0; ((i < HASH_TABLE_DATA_REMOVE_AT_ONCE)
01292 && (hash_list_.size() > 0)); i++){
01293 hash_itr = hash_list_.begin();
01294 tcl_hash_entry = *hash_itr;
01295 hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
01296 delete hash_entry;
01297 hash_list_.erase(hash_itr);
01298 Tcl_DeleteHashEntry(tcl_hash_entry);
01299 }
01300 }
01301
01302 key[0] = pkt_num;
01303 key[1] = rdm_id;
01304
01305 tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *) key, &new_hash_key);
01306
01307 if (new_hash_key == 0){
01308 DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n");
01309 return;
01310 }
01311
01312 Tcl_SetHashValue(tcl_hash_entry, new_hash_entry);
01313 hash_list_.push_back(tcl_hash_entry);
01314 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 869 of file gradient.cc. References DEBUG_ALWAYS, DiffPrint(), filter_handle_, Message::new_message_, processNewMessage(), and processOldMessage(). Referenced by GradientFilterReceive::recv().
00870 {
00871 if (h != filter_handle_){
00872 DiffPrint(DEBUG_ALWAYS,
00873 "Error: received msg for handle %d, subscribed to handle %d !\n",
00874 h, filter_handle_);
00875 return;
00876 }
00877
00878 if (msg->new_message_ == 1)
00879 processNewMessage(msg);
00880 else
00881 processOldMessage(msg);
00882 }
|
Here is the call graph for this function:

|
|
Definition at line 193 of file gradient.cc. References RoutingEntry::attrs_, CopyAttrs(), DataNeighborEntry::data_flag_, RoutingEntry::data_neighbors_, DEBUG_MORE_DETAILS, DEBUG_NO_DETAILS, DiffPrint(), DIFFUSION_VERSION, DiffApp::dr_, filter_handle_, LOCALHOST_ADDR, Message::msg_attr_vec_, NEGATIVE_REINFORCEMENT, DataNeighborEntry::neighbor_id_, NEW_MESSAGE, OLD_MESSAGE, pkt_count_, random_id_, and routing_list_. Referenced by ReinforcementCheckTimer::expire().
00194 {
00195 DataNeighborList::iterator data_neighbor_itr;
00196 DataNeighborEntry *data_neighbor_entry;
00197 RoutingTable::iterator routing_itr;
00198 RoutingEntry *routing_entry;
00199 Message *my_message;
00200
00201 DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n");
00202
00203 routing_itr = routing_list_.begin();
00204
00205 while (routing_itr != routing_list_.end()){
00206 routing_entry = *routing_itr;
00207
00208 // Step 1: Delete expired gradients
00209 data_neighbor_itr = routing_entry->data_neighbors_.begin();
00210
00211 while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00212 data_neighbor_entry = *data_neighbor_itr;
00213
00214 if (data_neighbor_entry->data_flag_ == OLD_MESSAGE){
00215 my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00216 0, 0, routing_entry->attrs_->size(), pkt_count_,
00217 random_id_, data_neighbor_entry->neighbor_id_,
00218 LOCALHOST_ADDR);
00219 my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00220
00221 DiffPrint(DEBUG_NO_DETAILS,
00222 "Sending Negative Reinforcement to node %d !\n",
00223 data_neighbor_entry->neighbor_id_);
00224
00225 ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);
00226
00227 pkt_count_++;
00228 delete my_message;
00229
00230 // Done. Delete entry
00231 data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00232 delete data_neighbor_entry;
00233 }
00234 else{
00235 data_neighbor_itr++;
00236 }
00237 }
00238
00239 // Step 2: Delete data neighbors with no activity, zero flags
00240 data_neighbor_itr = routing_entry->data_neighbors_.begin();
00241 while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00242 data_neighbor_entry = *data_neighbor_itr;
00243 if (data_neighbor_entry->data_flag_ == NEW_MESSAGE){
00244 data_neighbor_entry->data_flag_ = 0;
00245 data_neighbor_itr++;
00246 }
00247 else{
00248 // Delete entry
00249 data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00250 delete data_neighbor_entry;
00251 }
00252 }
00253
00254 // Advance to the next routing entry
00255 routing_itr++;
00256 }
00257 }
|
Here is the call graph for this function:

|
|
Implements DiffApp. Definition at line 1335 of file gradient.cc. Referenced by main().
01336 {
01337 // Doesn't do anything
01338 while (1){
01339 sleep(1000);
01340 }
01341 }
|
|
||||||||||||
|
Definition at line 843 of file gradient.cc. References ClearAttrs(), CopyAttrs(), DEBUG_ALWAYS, DiffPrint(), NRAttribute::DISINTEREST_CLASS, NRClassAttr, sendInterest(), and NRSimpleAttribute< T >::setVal(). Referenced by subscriptionTimeout().
00845 {
00846 NRAttrVec *newAttrs;
00847 NRSimpleAttribute<int> *nrclass = NULL;
00848
00849 newAttrs = CopyAttrs(attrs);
00850
00851 nrclass = NRClassAttr.find(newAttrs);
00852 if (!nrclass){
00853 DiffPrint(DEBUG_ALWAYS,
00854 "Error: sendDisinterest couldn't find the class attribute !\n");
00855 ClearAttrs(newAttrs);
00856 delete newAttrs;
00857 return;
00858 }
00859
00860 // Change the class_key value
00861 nrclass->setVal(NRAttribute::DISINTEREST_CLASS);
00862
00863 sendInterest(newAttrs, routing_entry);
00864
00865 ClearAttrs(newAttrs);
00866 delete newAttrs;
00867 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 821 of file gradient.cc. References RoutingEntry::agents_, CopyAttrs(), DIFFUSION_VERSION, DiffApp::dr_, filter_handle_, INTEREST, LOCALHOST_ADDR, Message::msg_attr_vec_, Message::next_port_, and AgentEntry::port_. Referenced by processNewMessage(), and sendDisinterest().
00822 {
00823 AgentList::iterator agent_itr;
00824 AgentEntry *agent_entry;
00825
00826 Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,
00827 attrs->size(), 0, 0, LOCALHOST_ADDR,
00828 LOCALHOST_ADDR);
00829
00830 msg->msg_attr_vec_ = CopyAttrs(attrs);
00831
00832 for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00833 agent_entry = *agent_itr;
00834
00835 msg->next_port_ = agent_entry->port_;
00836
00837 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00838 }
00839
00840 delete msg;
00841 }
|
Here is the call graph for this function:

|
||||||||||||||||||||
|
Definition at line 695 of file gradient.cc. References ClearAttrs(), CopyAttrs(), DEBUG_NO_DETAILS, DiffPrint(), DIFFUSION_VERSION, DiffApp::dr_, GetRand(), NRAttribute::IS, LOCALHOST_ADDR, Message::msg_attr_vec_, pkt_count_, POS_REINFORCEMENT_JITTER, POS_REINFORCEMENT_SEND_DELAY, POSITIVE_REINFORCEMENT, RAND_MAX, random_id_, and ReinforcementAttr. Referenced by forwardExploratoryData(), and forwardPushExploratoryData().
00699 {
00700 ReinforcementBlob *reinforcement_blob;
00701 NRAttribute *reinforcement_attr;
00702 TimerCallback *reinforcement_timer;
00703 Message *pos_reinf_message;
00704 NRAttrVec *attrs;
00705
00706 reinforcement_blob = new ReinforcementBlob(data_rdm_id, data_pkt_num);
00707
00708 reinforcement_attr = ReinforcementAttr.make(NRAttribute::IS,
00709 (void *) reinforcement_blob,
00710 sizeof(ReinforcementBlob));
00711
00712 attrs = CopyAttrs(reinf_attrs);
00713 attrs->push_back(reinforcement_attr);
00714
00715 pos_reinf_message = new Message(DIFFUSION_VERSION, POSITIVE_REINFORCEMENT,
00716 0, 0, attrs->size(), pkt_count_,
00717 random_id_, destination, LOCALHOST_ADDR);
00718 pos_reinf_message->msg_attr_vec_ = CopyAttrs(attrs);
00719
00720 DiffPrint(DEBUG_NO_DETAILS, "Sending Positive Reinforcement to node %d !\n",
00721 destination);
00722
00723 // Create timer for sending this message
00724 reinforcement_timer = new MessageSendTimer(this, pos_reinf_message);
00725
00726 // Add timer to the event queue
00727 ((DiffusionRouting *)dr_)->addTimer(POS_REINFORCEMENT_SEND_DELAY +
00728 (int) ((POS_REINFORCEMENT_JITTER * (GetRand() * 1.0 / RAND_MAX) - (POS_REINFORCEMENT_JITTER / 2))),
00729 reinforcement_timer);
00730 pkt_count_++;
00731 ClearAttrs(attrs);
00732 delete reinforcement_blob;
00733 }
|
Here is the call graph for this function:

|
||||||||||||||||
|
Definition at line 797 of file gradient.cc. References DataNeighborEntry::data_flag_, RoutingEntry::data_neighbors_, and DataNeighborEntry::neighbor_id_. Referenced by forwardData(), forwardExploratoryData(), and processOldMessage().
00799 {
00800 DataNeighborList::iterator data_neighbor_itr;
00801 DataNeighborEntry *data_neighbor_entry;
00802
00803 for (data_neighbor_itr = routing_entry->data_neighbors_.begin();
00804 data_neighbor_itr != routing_entry->data_neighbors_.end();
00805 ++data_neighbor_itr){
00806 data_neighbor_entry = *data_neighbor_itr;
00807 if (data_neighbor_entry->neighbor_id_ == last_hop){
00808 if (data_neighbor_entry->data_flag_ > 0)
00809 return;
00810 data_neighbor_entry->data_flag_ = new_message;
00811 return;
00812 }
00813 }
00814
00815 // We need to add a new data neighbor
00816 data_neighbor_entry = new DataNeighborEntry(last_hop, new_message);
00817
00818 routing_entry->data_neighbors_.push_back(data_neighbor_entry);
00819 }
|
|
|
Definition at line 1316 of file gradient.cc. References ClearAttrs(), DiffApp::dr_, filter_callback_, GRADIENT_FILTER_PRIORITY, handle, NRAttribute::INTEREST_CLASS, NRAttribute::IS, and NRClassAttr. Referenced by GradientFilter().
01317 {
01318 NRAttrVec attrs;
01319 handle h;
01320
01321 // For the gradient filter, we use a single attribute with an "IS"
01322 // operator. This causes this filter to match every single packet
01323 // getting to diffusion
01324 attrs.push_back(NRClassAttr.make(NRAttribute::IS,
01325 NRAttribute::INTEREST_CLASS));
01326
01327 h = ((DiffusionRouting *)dr_)->addFilter(&attrs,
01328 GRADIENT_FILTER_PRIORITY, filter_callback_);
01329
01330 ClearAttrs(&attrs);
01331 return h;
01332 }
|
Here is the call graph for this function:

|
|
Definition at line 259 of file gradient.cc. References RoutingEntry::attr_list_, AttributeEntry::attrs_, DEBUG_DETAILS, DEBUG_MORE_DETAILS, DiffPrint(), findRoutingEntry(), GetTime(), sendDisinterest(), SUBSCRIPTION_TIMEOUT, and AttributeEntry::tv_. Referenced by SubscriptionExpirationTimer::expire().
00260 {
00261 AttributeList::iterator attribute_itr;
00262 AttributeEntry *attribute_entry;
00263 RoutingEntry *routing_entry;
00264 struct timeval tmv;
00265
00266 DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n");
00267
00268 GetTime(&tmv);
00269
00270 // Find the correct Routing Entry
00271 routing_entry = findRoutingEntry(attrs);
00272
00273 if (routing_entry){
00274 // Step 1: Check Timeouts
00275
00276 attribute_itr = routing_entry->attr_list_.begin();
00277
00278 while (attribute_itr != routing_entry->attr_list_.end()){
00279 attribute_entry = *attribute_itr;
00280 if (tmv.tv_sec > (attribute_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){
00281 sendDisinterest(attribute_entry->attrs_, routing_entry);
00282 attribute_itr = routing_entry->attr_list_.erase(attribute_itr);
00283 delete attribute_entry;
00284 }
00285 else{
00286 attribute_itr++;
00287 }
00288 }
00289 }
00290 else{
00291 DiffPrint(DEBUG_DETAILS, "Warning: SubscriptionTimeout could't find RE - maybe deleted by GradientTimeout ?\n");
00292
00293 // Cancel Timer
00294 return -1;
00295 }
00296
00297 // Keep Timer
00298 return 0;
00299 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 384 of file gradient.cc. References RoutingEntry::agents_, GetTime(), AgentEntry::port_, and AgentEntry::tv_. Referenced by processNewMessage().
00386 {
00387 AgentList::iterator agent_itr;
00388 AgentEntry *agent_entry;
00389
00390 for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00391 agent_entry = *agent_itr;
00392 if (agent_entry->port_ == source_port){
00393 // We already have this guy
00394 GetTime(&(agent_entry->tv_));
00395 return;
00396 }
00397 }
00398
00399 // This is a new agent, so we create a new entry and add it to the
00400 // list of known agents
00401 agent_entry = new AgentEntry(source_port);
00402 routing_entry->agents_.push_back(agent_entry);
00403 }
|
Here is the call graph for this function:

|
||||||||||||||||
|
Definition at line 359 of file gradient.cc. References GetTime(), RoutingEntry::gradients_, GradientEntry::node_addr_, GradientEntry::reinforced_, and GradientEntry::tv_. Referenced by processNewMessage(), and processOldMessage().
00361 {
00362 GradientList::iterator gradient_itr;
00363 GradientEntry *gradient_entry;
00364
00365 for (gradient_itr = routing_entry->gradients_.begin();
00366 gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00367 gradient_entry = *gradient_itr;
00368 if (gradient_entry->node_addr_ == last_hop){
00369 GetTime(&(gradient_entry->tv_));
00370 if (reinforced)
00371 gradient_entry->reinforced_ = true;
00372 return;
00373 }
00374 }
00375
00376 // We need to add a new gradient
00377 gradient_entry = new GradientEntry(last_hop);
00378 if (reinforced)
00379 gradient_entry->reinforced_ = true;
00380
00381 routing_entry->gradients_.push_back(gradient_entry);
00382 }
|
Here is the call graph for this function:

|
|
Definition at line 49 of file diffapp.cc. References DEBUG_ALWAYS, and DiffPrint(). Referenced by DiffApp::parseCommandLine().
00049 {
00050 DiffPrint(DEBUG_ALWAYS, "Usage: %s [-d debug] [-p port] [-f file] [-h]\n\n", s);
00051 DiffPrint(DEBUG_ALWAYS, "\t-d - Sets debug level (0-10)\n");
00052 DiffPrint(DEBUG_ALWAYS, "\t-p - Uses port 'port' to talk to diffusion\n");
00053 DiffPrint(DEBUG_ALWAYS, "\t-f - Specifies a config file\n");
00054 DiffPrint(DEBUG_ALWAYS, "\t-h - Prints this information\n");
00055 DiffPrint(DEBUG_ALWAYS, "\n");
00056 exit(0);
00057 }
|
Here is the call graph for this function:

|
|
Definition at line 57 of file diffapp.hh. Referenced by DiffApp::parseCommandLine(). |
|
|
Definition at line 56 of file diffapp.hh. Referenced by GeoRoutingFilter::GeoRoutingFilter(), GradientFilter(), DiffApp::parseCommandLine(), PingSenderApp::PingSenderApp(), PushSenderApp::PushSenderApp(), and SrcRtFilter::SrcRtFilter(). |
|
|
|
Definition at line 277 of file gradient.hh. Referenced by GradientFilter(), and setupFilter(). |
|
|
Definition at line 268 of file gradient.hh. Referenced by forwardData(), forwardExploratoryData(), forwardPushExploratoryData(), GradientFilter(), interestTimeout(), messageTimeout(), processNewMessage(), recv(), reinforcementTimeout(), and sendInterest(). |
|
|
Definition at line 273 of file gradient.hh. Referenced by putHash(). |
|
|
Definition at line 274 of file gradient.hh. Referenced by getHash(), GradientFilter(), and putHash(). |
|
|
Definition at line 269 of file gradient.hh. Referenced by forwardData(), GradientFilter(), processNewMessage(), reinforcementTimeout(), and sendPositiveReinforcement(). |
|
|
Definition at line 270 of file gradient.hh. Referenced by forwardData(), GradientFilter(), processNewMessage(), reinforcementTimeout(), and sendPositiveReinforcement(). |
|
|
Definition at line 280 of file gradient.hh. Referenced by deleteRoutingEntry(), findRoutingEntry(), forwardPushExploratoryData(), gradientTimeout(), matchRoutingEntry(), processNewMessage(), processOldMessage(), and reinforcementTimeout(). |
1.3.3