00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "gradient.hh"
00024
00025 NRSimpleAttributeFactory<void *> ReinforcementAttr(NRAttribute::REINFORCEMENT_KEY, NRAttribute::BLOB_TYPE);
00026
00027 #ifdef NS_DIFFUSION
00028 static class GradientFilterClass : public TclClass {
00029 public:
00030 GradientFilterClass() : TclClass("Application/GradientFilter") {}
00031 TclObject* create(int argc, const char*const* argv) {
00032 if (argc == 5)
00033 return(new GradientFilter(argv[4]));
00034 else
00035 fprintf(stderr, "Insufficient number of args for creating GradientFilter");
00036 return (NULL);
00037 }
00038 } class_gradient_filter;
00039
00040 int GradientFilter::command(int argc, const char*const* argv) {
00041 if (argc == 3) {
00042 if (strcasecmp(argv[1], "dr") == 0) {
00043 DiffAppAgent *agent;
00044 agent = (DiffAppAgent *) TclObject::lookup(argv[2]);
00045 dr_ = agent->dr();
00046 start();
00047 return TCL_OK;
00048 }
00049 if (strcasecmp(argv[1], "debug") == 0) {
00050 global_debug_level = atoi(argv[2]);
00051 if (global_debug_level < 1 || global_debug_level > 10) {
00052 global_debug_level = DEBUG_DEFAULT;
00053 printf("Error: Debug level outside range(1-10) or missing !\n");
00054 }
00055 }
00056 }
00057 return Application::command(argc, argv);
00058 }
00059
00060 #endif // NS_DIFFUSION
00061
00062 void GradientFilterReceive::recv(Message *msg, handle h)
00063 {
00064 app_->recv(msg, h);
00065 }
00066
00067 int MessageSendTimer::expire()
00068 {
00069
00070 agent_->messageTimeout(msg_);
00071
00072
00073 return -1;
00074 }
00075
00076 int InterestForwardTimer::expire()
00077 {
00078
00079 agent_->interestTimeout(msg_);
00080
00081
00082 return -1;
00083 }
00084
00085 int SubscriptionExpirationTimer::expire()
00086 {
00087 return(agent_->subscriptionTimeout(attrs_));
00088 }
00089
00090 int GradientExpirationCheckTimer::expire()
00091 {
00092
00093 agent_->gradientTimeout();
00094
00095
00096 return 0;
00097 }
00098
00099 int ReinforcementCheckTimer::expire()
00100 {
00101
00102 agent_->reinforcementTimeout();
00103
00104
00105 return 0;
00106 }
00107
00108 void GradientFilter::interestTimeout(Message *msg)
00109 {
00110 DiffPrint(DEBUG_MORE_DETAILS, "Interest Timeout !\n");
00111
00112 msg->last_hop_ = LOCALHOST_ADDR;
00113 msg->next_hop_ = BROADCAST_ADDR;
00114
00115 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00116 }
00117
00118 void GradientFilter::messageTimeout(Message *msg)
00119 {
00120 DiffPrint(DEBUG_MORE_DETAILS, "Message Timeout !\n");
00121
00122 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00123 }
00124
00125 void GradientFilter::gradientTimeout()
00126 {
00127 RoutingTable::iterator routing_itr;
00128 GradientList::iterator grad_itr;
00129 AgentList::iterator agent_itr;
00130 RoutingEntry *routing_entry;
00131 GradientEntry *gradient_entry;
00132 AgentEntry *agent_entry;
00133 struct timeval tmv;
00134
00135 DiffPrint(DEBUG_MORE_DETAILS, "Gradient Timeout !\n");
00136
00137 GetTime(&tmv);
00138
00139 routing_itr = routing_list_.begin();
00140
00141 while (routing_itr != routing_list_.end()){
00142 routing_entry = *routing_itr;
00143
00144
00145 grad_itr = routing_entry->gradients_.begin();
00146 while (grad_itr != routing_entry->gradients_.end()){
00147 gradient_entry = *grad_itr;
00148 if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00149
00150 DiffPrint(DEBUG_NO_DETAILS, "Deleting Gradient to node %d !\n",
00151 gradient_entry->node_addr_);
00152
00153 grad_itr = routing_entry->gradients_.erase(grad_itr);
00154 delete gradient_entry;
00155 }
00156 else{
00157 grad_itr++;
00158 }
00159 }
00160
00161
00162 agent_itr = routing_entry->agents_.begin();
00163 while (agent_itr != routing_entry->agents_.end()){
00164 agent_entry = *agent_itr;
00165 if (tmv.tv_sec > (agent_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00166
00167 DiffPrint(DEBUG_NO_DETAILS,
00168 "Deleting Gradient to agent %d !\n", agent_entry->port_);
00169
00170 agent_itr = routing_entry->agents_.erase(agent_itr);
00171 delete agent_entry;
00172 }
00173 else{
00174 agent_itr++;
00175 }
00176 }
00177
00178
00179 if ((routing_entry->gradients_.size() == 0) &&
00180 (routing_entry->agents_.size() == 0)){
00181
00182 DiffPrint(DEBUG_DETAILS,
00183 "Nothing left for this data type, cleaning up !\n");
00184 routing_itr = routing_list_.erase(routing_itr);
00185 delete routing_entry;
00186 }
00187 else{
00188 routing_itr++;
00189 }
00190 }
00191 }
00192
00193 void GradientFilter::reinforcementTimeout()
00194 {
00195 DataNeighborList::iterator data_neighbor_itr;
00196 DataNeighborEntry *data_neighbor_entry;
00197 RoutingTable::iterator routing_itr;
00198 RoutingEntry *routing_entry;
00199 Message *my_message;
00200
00201 DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n");
00202
00203 routing_itr = routing_list_.begin();
00204
00205 while (routing_itr != routing_list_.end()){
00206 routing_entry = *routing_itr;
00207
00208
00209 data_neighbor_itr = routing_entry->data_neighbors_.begin();
00210
00211 while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00212 data_neighbor_entry = *data_neighbor_itr;
00213
00214 if (data_neighbor_entry->data_flag_ == OLD_MESSAGE){
00215 my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00216 0, 0, routing_entry->attrs_->size(), pkt_count_,
00217 random_id_, data_neighbor_entry->neighbor_id_,
00218 LOCALHOST_ADDR);
00219 my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00220
00221 DiffPrint(DEBUG_NO_DETAILS,
00222 "Sending Negative Reinforcement to node %d !\n",
00223 data_neighbor_entry->neighbor_id_);
00224
00225 ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);
00226
00227 pkt_count_++;
00228 delete my_message;
00229
00230
00231 data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00232 delete data_neighbor_entry;
00233 }
00234 else{
00235 data_neighbor_itr++;
00236 }
00237 }
00238
00239
00240 data_neighbor_itr = routing_entry->data_neighbors_.begin();
00241 while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00242 data_neighbor_entry = *data_neighbor_itr;
00243 if (data_neighbor_entry->data_flag_ == NEW_MESSAGE){
00244 data_neighbor_entry->data_flag_ = 0;
00245 data_neighbor_itr++;
00246 }
00247 else{
00248
00249 data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00250 delete data_neighbor_entry;
00251 }
00252 }
00253
00254
00255 routing_itr++;
00256 }
00257 }
00258
00259 int GradientFilter::subscriptionTimeout(NRAttrVec *attrs)
00260 {
00261 AttributeList::iterator attribute_itr;
00262 AttributeEntry *attribute_entry;
00263 RoutingEntry *routing_entry;
00264 struct timeval tmv;
00265
00266 DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n");
00267
00268 GetTime(&tmv);
00269
00270
00271 routing_entry = findRoutingEntry(attrs);
00272
00273 if (routing_entry){
00274
00275
00276 attribute_itr = routing_entry->attr_list_.begin();
00277
00278 while (attribute_itr != routing_entry->attr_list_.end()){
00279 attribute_entry = *attribute_itr;
00280 if (tmv.tv_sec > (attribute_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){
00281 sendDisinterest(attribute_entry->attrs_, routing_entry);
00282 attribute_itr = routing_entry->attr_list_.erase(attribute_itr);
00283 delete attribute_entry;
00284 }
00285 else{
00286 attribute_itr++;
00287 }
00288 }
00289 }
00290 else{
00291 DiffPrint(DEBUG_DETAILS, "Warning: SubscriptionTimeout could't find RE - maybe deleted by GradientTimeout ?\n");
00292
00293
00294 return -1;
00295 }
00296
00297
00298 return 0;
00299 }
00300
00301 void GradientFilter::deleteRoutingEntry(RoutingEntry *routing_entry)
00302 {
00303 RoutingTable::iterator routing_itr;
00304 RoutingEntry *current_entry;
00305
00306 for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00307 current_entry = *routing_itr;
00308 if (current_entry == routing_entry){
00309 routing_itr = routing_list_.erase(routing_itr);
00310 delete routing_entry;
00311 return;
00312 }
00313 }
00314 DiffPrint(DEBUG_ALWAYS, "Error: deleteRoutingEntry could not find entry to delete !\n");
00315 }
00316
00317 RoutingEntry * GradientFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place)
00318 {
00319 RoutingTable::iterator routing_itr;
00320 RoutingEntry *routing_entry;
00321
00322 for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){
00323 routing_entry = *routing_itr;
00324 if (MatchAttrs(routing_entry->attrs_, attrs)){
00325 *place = routing_itr;
00326 return routing_entry;
00327 }
00328 }
00329 return NULL;
00330 }
00331
00332 RoutingEntry * GradientFilter::findRoutingEntry(NRAttrVec *attrs)
00333 {
00334 RoutingTable::iterator routing_itr;
00335 RoutingEntry *routing_entry;
00336
00337 for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00338 routing_entry = *routing_itr;
00339 if (PerfectMatch(routing_entry->attrs_, attrs))
00340 return routing_entry;
00341 }
00342 return NULL;
00343 }
00344
00345 AttributeEntry * GradientFilter::findMatchingSubscription(RoutingEntry *routing_entry,
00346 NRAttrVec *attrs)
00347 {
00348 AttributeList::iterator attribute_itr;
00349 AttributeEntry *attribute_entry;
00350
00351 for (attribute_itr = routing_entry->attr_list_.begin(); attribute_itr != routing_entry->attr_list_.end(); ++attribute_itr){
00352 attribute_entry = *attribute_itr;
00353 if (PerfectMatch(attribute_entry->attrs_, attrs))
00354 return attribute_entry;
00355 }
00356 return NULL;
00357 }
00358
00359 void GradientFilter::updateGradient(RoutingEntry *routing_entry,
00360 int32_t last_hop, bool reinforced)
00361 {
00362 GradientList::iterator gradient_itr;
00363 GradientEntry *gradient_entry;
00364
00365 for (gradient_itr = routing_entry->gradients_.begin();
00366 gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00367 gradient_entry = *gradient_itr;
00368 if (gradient_entry->node_addr_ == last_hop){
00369 GetTime(&(gradient_entry->tv_));
00370 if (reinforced)
00371 gradient_entry->reinforced_ = true;
00372 return;
00373 }
00374 }
00375
00376
00377 gradient_entry = new GradientEntry(last_hop);
00378 if (reinforced)
00379 gradient_entry->reinforced_ = true;
00380
00381 routing_entry->gradients_.push_back(gradient_entry);
00382 }
00383
00384 void GradientFilter::updateAgent(RoutingEntry *routing_entry,
00385 u_int16_t source_port)
00386 {
00387 AgentList::iterator agent_itr;
00388 AgentEntry *agent_entry;
00389
00390 for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00391 agent_entry = *agent_itr;
00392 if (agent_entry->port_ == source_port){
00393
00394 GetTime(&(agent_entry->tv_));
00395 return;
00396 }
00397 }
00398
00399
00400
00401 agent_entry = new AgentEntry(source_port);
00402 routing_entry->agents_.push_back(agent_entry);
00403 }
00404
00405 void GradientFilter::forwardPushExploratoryData(Message *msg,
00406 DataForwardingHistory *forwarding_history)
00407 {
00408 RoutingTable::iterator routing_itr;
00409 RoutingEntry *routing_entry;
00410 AgentList::iterator agent_itr;
00411 AgentEntry *agent_entry;
00412 Message *data_msg, *sink_message;
00413 TimerCallback *data_timer;
00414 unsigned int key[2];
00415 HashEntry *hash_entry;
00416
00417
00418 routing_itr = routing_list_.begin();
00419 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00420 &routing_itr);
00421
00422 sink_message = CopyMessage(msg);
00423
00424 while (routing_entry){
00425
00426
00427 for (agent_itr = routing_entry->agents_.begin();
00428 agent_itr != routing_entry->agents_.end(); ++agent_itr){
00429 agent_entry = *agent_itr;
00430
00431 if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00432
00433 sink_message->next_hop_ = LOCALHOST_ADDR;
00434 sink_message->next_port_ = agent_entry->port_;
00435
00436 ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00437
00438
00439 forwarding_history->forwardingToLibrary(agent_entry->port_);
00440 }
00441 }
00442
00443 if ((!forwarding_history->alreadyReinforced()) &&
00444 (routing_entry->agents_.size() > 0) &&
00445 (msg->last_hop_ != LOCALHOST_ADDR)){
00446
00447 sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00448 msg->pkt_num_, msg->last_hop_);
00449
00450
00451 forwarding_history->sendingReinforcement();
00452 }
00453
00454
00455 routing_itr++;
00456 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00457 &routing_itr);
00458 }
00459
00460
00461 delete sink_message;
00462
00463
00464
00465
00466 if (msg->last_hop_ != LOCALHOST_ADDR){
00467 key[0] = msg->pkt_num_;
00468 key[1] = msg->rdm_id_;
00469
00470 hash_entry = new HashEntry(msg->last_hop_);
00471
00472 putHash(hash_entry, key[0], key[1]);
00473 }
00474
00475
00476 if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00477 data_msg = CopyMessage(msg);
00478 data_msg->next_hop_ = BROADCAST_ADDR;
00479
00480 data_timer = new MessageSendTimer(this, data_msg);
00481
00482
00483 ((DiffusionRouting *)dr_)->addTimer(PUSH_DATA_FORWARD_DELAY +
00484 (int) ((PUSH_DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (PUSH_DATA_FORWARD_JITTER / 2))),
00485 data_timer);
00486
00487
00488 forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00489 }
00490 }
00491
00492 void GradientFilter::forwardExploratoryData(Message *msg,
00493 RoutingEntry *routing_entry,
00494 DataForwardingHistory *forwarding_history)
00495 {
00496 #ifdef USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00497 Message *data_msg;
00498 TimerCallback *data_timer;
00499 #else
00500 GradientList::iterator gradient_itr;
00501 GradientEntry *gradient_entry;
00502 #endif // USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00503 AgentList::iterator agent_itr;
00504 AgentEntry *agent_entry;
00505 Message *sink_message;
00506 unsigned int key[2];
00507 HashEntry *hash_entry;
00508
00509 sink_message = CopyMessage(msg);
00510
00511
00512 for (agent_itr = routing_entry->agents_.begin();
00513 agent_itr != routing_entry->agents_.end(); ++agent_itr){
00514 agent_entry = *agent_itr;
00515
00516 if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00517
00518 sink_message->next_hop_ = LOCALHOST_ADDR;
00519 sink_message->next_port_ = agent_entry->port_;
00520
00521
00522 forwarding_history->forwardingToLibrary(agent_entry->port_);
00523
00524 ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00525 }
00526 }
00527
00528 delete sink_message;
00529
00530
00531 if ((!forwarding_history->alreadyReinforced()) &&
00532 (routing_entry->agents_.size() > 0) &&
00533 (msg->last_hop_ != LOCALHOST_ADDR)){
00534
00535 sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00536 msg->pkt_num_, msg->last_hop_);
00537
00538
00539 forwarding_history->sendingReinforcement();
00540 }
00541
00542
00543
00544
00545 if (msg->last_hop_ != LOCALHOST_ADDR){
00546 setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00547 }
00548
00549
00550 if (msg->last_hop_ != LOCALHOST_ADDR){
00551 key[0] = msg->pkt_num_;
00552 key[1] = msg->rdm_id_;
00553
00554 hash_entry = new HashEntry(msg->last_hop_);
00555
00556 putHash(hash_entry, key[0], key[1]);
00557 }
00558
00559
00560 #ifdef USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00561 if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00562 if (routing_entry->gradients_.size() > 0){
00563
00564 data_msg = CopyMessage(msg);
00565 data_msg->next_hop_ = BROADCAST_ADDR;
00566
00567
00568 forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00569
00570 data_timer = new MessageSendTimer(this, data_msg);
00571
00572
00573 ((DiffusionRouting *)dr_)->addTimer(DATA_FORWARD_DELAY +
00574 (int) ((DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (DATA_FORWARD_JITTER / 2))),
00575 data_timer);
00576 }
00577 }
00578 #else
00579
00580 for (gradient_itr = routing_entry->gradients_.begin();
00581 gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00582
00583 gradient_entry = *gradient_itr;
00584
00585
00586 if (!forwarding_history->alreadyForwardedToNetwork(gradient_entry->node_addr_)){
00587 msg->next_hop_ = gradient_entry->node_addr_;
00588 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00589
00590
00591 forwarding_history->forwardingToNetwork(gradient_entry->node_addr_);
00592 }
00593 }
00594 #endif // USE_BROADCAST_TO_MULTIPLE_RECEIPTENTS
00595 }
00596
00597 void GradientFilter::forwardData(Message *msg, RoutingEntry *routing_entry,
00598 DataForwardingHistory *forwarding_history)
00599 {
00600 GradientList::iterator gradient_itr;
00601 AgentList::iterator agent_itr;
00602 GradientEntry *gradient_entry;
00603 AgentEntry *agent_entry;
00604 Message *sink_message, *negative_reinforcement_msg;
00605 bool has_sink = false;
00606
00607 sink_message = CopyMessage(msg);
00608
00609
00610 for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00611 agent_entry = *agent_itr;
00612
00613 has_sink = true;
00614
00615 if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00616
00617 sink_message->next_hop_ = LOCALHOST_ADDR;
00618 sink_message->next_port_ = agent_entry->port_;
00619
00620
00621 forwarding_history->forwardingToLibrary(agent_entry->port_);
00622
00623 ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00624 }
00625 }
00626
00627 delete sink_message;
00628
00629
00630
00631
00632 if (msg->last_hop_ != LOCALHOST_ADDR){
00633 setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00634 }
00635
00636
00637 gradient_itr = routing_entry->gradients_.begin();
00638 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00639 gradient_itr, &gradient_itr);
00640
00641 if (gradient_entry){
00642 while (gradient_entry){
00643
00644
00645
00646 if (gradient_entry->node_addr_ != msg->last_hop_){
00647 msg->next_hop_ = gradient_entry->node_addr_;
00648
00649
00650 if (!forwarding_history->alreadyForwardedToNetwork(msg->next_hop_)){
00651 DiffPrint(DEBUG_NO_DETAILS,
00652 "Forwarding data using Reinforced Gradient to node %d !\n",
00653 gradient_entry->node_addr_);
00654
00655 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00656
00657
00658 forwarding_history->forwardingToNetwork(msg->next_hop_);
00659 }
00660 }
00661
00662
00663 gradient_itr++;
00664 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00665 gradient_itr, &gradient_itr);
00666 }
00667 }
00668 else{
00669
00670
00671 if ((!has_sink) && (msg->last_hop_ != LOCALHOST_ADDR)){
00672 negative_reinforcement_msg = new Message(DIFFUSION_VERSION,
00673 NEGATIVE_REINFORCEMENT,
00674 0, 0,
00675 routing_entry->attrs_->size(),
00676 pkt_count_,
00677 random_id_,
00678 msg->last_hop_,
00679 LOCALHOST_ADDR);
00680 negative_reinforcement_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00681
00682 DiffPrint(DEBUG_NO_DETAILS,
00683 "Sending Negative Reinforcement to node %d !\n",
00684 msg->last_hop_);
00685
00686 ((DiffusionRouting *)dr_)->sendMessage(negative_reinforcement_msg,
00687 filter_handle_);
00688
00689 pkt_count_++;
00690 delete negative_reinforcement_msg;
00691 }
00692 }
00693 }
00694
00695 void GradientFilter::sendPositiveReinforcement(NRAttrVec *reinf_attrs,
00696 int32_t data_rdm_id,
00697 int32_t data_pkt_num,
00698 int32_t destination)
00699 {
00700 ReinforcementBlob *reinforcement_blob;
00701 NRAttribute *reinforcement_attr;
00702 TimerCallback *reinforcement_timer;
00703 Message *pos_reinf_message;
00704 NRAttrVec *attrs;
00705
00706 reinforcement_blob = new ReinforcementBlob(data_rdm_id, data_pkt_num);
00707
00708 reinforcement_attr = ReinforcementAttr.make(NRAttribute::IS,
00709 (void *) reinforcement_blob,
00710 sizeof(ReinforcementBlob));
00711
00712 attrs = CopyAttrs(reinf_attrs);
00713 attrs->push_back(reinforcement_attr);
00714
00715 pos_reinf_message = new Message(DIFFUSION_VERSION, POSITIVE_REINFORCEMENT,
00716 0, 0, attrs->size(), pkt_count_,
00717 random_id_, destination, LOCALHOST_ADDR);
00718 pos_reinf_message->msg_attr_vec_ = CopyAttrs(attrs);
00719
00720 DiffPrint(DEBUG_NO_DETAILS, "Sending Positive Reinforcement to node %d !\n",
00721 destination);
00722
00723
00724 reinforcement_timer = new MessageSendTimer(this, pos_reinf_message);
00725
00726
00727 ((DiffusionRouting *)dr_)->addTimer(POS_REINFORCEMENT_SEND_DELAY +
00728 (int) ((POS_REINFORCEMENT_JITTER * (GetRand() * 1.0 / RAND_MAX) - (POS_REINFORCEMENT_JITTER / 2))),
00729 reinforcement_timer);
00730 pkt_count_++;
00731 ClearAttrs(attrs);
00732 delete reinforcement_blob;
00733 }
00734
00735 GradientEntry * GradientFilter::findReinforcedGradients(GradientList *gradients,
00736 GradientList::iterator start,
00737 GradientList::iterator *place)
00738 {
00739 GradientList::iterator gradient_itr;
00740 GradientEntry *gradient_entry;
00741
00742 for (gradient_itr = start; gradient_itr != gradients->end(); ++gradient_itr){
00743 gradient_entry = *gradient_itr;
00744 if (gradient_entry->reinforced_){
00745 *place = gradient_itr;
00746 return gradient_entry;
00747 }
00748 }
00749
00750 return NULL;
00751 }
00752
00753 GradientEntry * GradientFilter::findReinforcedGradient(int32_t node_addr,
00754 RoutingEntry *routing_entry)
00755 {
00756 GradientList::iterator gradient_itr;
00757 GradientEntry *gradient_entry;
00758
00759 gradient_itr = routing_entry->gradients_.begin();
00760 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00761 gradient_itr, &gradient_itr);
00762
00763 if (gradient_entry){
00764 while(gradient_entry){
00765 if (gradient_entry->node_addr_ == node_addr)
00766 return gradient_entry;
00767
00768
00769 gradient_itr++;
00770 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00771 gradient_itr, &gradient_itr);
00772 }
00773 }
00774
00775 return NULL;
00776 }
00777
00778 void GradientFilter::deleteGradient(RoutingEntry *routing_entry,
00779 GradientEntry *gradient_entry)
00780 {
00781 GradientList::iterator gradient_itr;
00782 GradientEntry *current_entry;
00783
00784 for (gradient_itr = routing_entry->gradients_.begin();
00785 gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00786 current_entry = *gradient_itr;
00787 if (current_entry == gradient_entry){
00788 gradient_itr = routing_entry->gradients_.erase(gradient_itr);
00789 delete gradient_entry;
00790 return;
00791 }
00792 }
00793 DiffPrint(DEBUG_ALWAYS,
00794 "Error: deleteGradient could not find gradient to delete !\n");
00795 }
00796
00797 void GradientFilter::setReinforcementFlags(RoutingEntry *routing_entry,
00798 int32_t last_hop, int new_message)
00799 {
00800 DataNeighborList::iterator data_neighbor_itr;
00801 DataNeighborEntry *data_neighbor_entry;
00802
00803 for (data_neighbor_itr = routing_entry->data_neighbors_.begin();
00804 data_neighbor_itr != routing_entry->data_neighbors_.end();
00805 ++data_neighbor_itr){
00806 data_neighbor_entry = *data_neighbor_itr;
00807 if (data_neighbor_entry->neighbor_id_ == last_hop){
00808 if (data_neighbor_entry->data_flag_ > 0)
00809 return;
00810 data_neighbor_entry->data_flag_ = new_message;
00811 return;
00812 }
00813 }
00814
00815
00816 data_neighbor_entry = new DataNeighborEntry(last_hop, new_message);
00817
00818 routing_entry->data_neighbors_.push_back(data_neighbor_entry);
00819 }
00820
00821 void GradientFilter::sendInterest(NRAttrVec *attrs, RoutingEntry *routing_entry)
00822 {
00823 AgentList::iterator agent_itr;
00824 AgentEntry *agent_entry;
00825
00826 Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,
00827 attrs->size(), 0, 0, LOCALHOST_ADDR,
00828 LOCALHOST_ADDR);
00829
00830 msg->msg_attr_vec_ = CopyAttrs(attrs);
00831
00832 for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00833 agent_entry = *agent_itr;
00834
00835 msg->next_port_ = agent_entry->port_;
00836
00837 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00838 }
00839
00840 delete msg;
00841 }
00842
00843 void GradientFilter::sendDisinterest(NRAttrVec *attrs,
00844 RoutingEntry *routing_entry)
00845 {
00846 NRAttrVec *newAttrs;
00847 NRSimpleAttribute<int> *nrclass = NULL;
00848
00849 newAttrs = CopyAttrs(attrs);
00850
00851 nrclass = NRClassAttr.find(newAttrs);
00852 if (!nrclass){
00853 DiffPrint(DEBUG_ALWAYS,
00854 "Error: sendDisinterest couldn't find the class attribute !\n");
00855 ClearAttrs(newAttrs);
00856 delete newAttrs;
00857 return;
00858 }
00859
00860
00861 nrclass->setVal(NRAttribute::DISINTEREST_CLASS);
00862
00863 sendInterest(newAttrs, routing_entry);
00864
00865 ClearAttrs(newAttrs);
00866 delete newAttrs;
00867 }
00868
00869 void GradientFilter::recv(Message *msg, handle h)
00870 {
00871 if (h != filter_handle_){
00872 DiffPrint(DEBUG_ALWAYS,
00873 "Error: received msg for handle %d, subscribed to handle %d !\n",
00874 h, filter_handle_);
00875 return;
00876 }
00877
00878 if (msg->new_message_ == 1)
00879 processNewMessage(msg);
00880 else
00881 processOldMessage(msg);
00882 }
00883
00884 void GradientFilter::processOldMessage(Message *msg)
00885 {
00886 RoutingEntry *routing_entry;
00887 RoutingTable::iterator routing_itr;
00888
00889 switch (msg->msg_type_){
00890
00891 case INTEREST:
00892
00893 DiffPrint(DEBUG_NO_DETAILS, "Received Old Interest !\n");
00894
00895 if (msg->last_hop_ == LOCALHOST_ADDR){
00896
00897 DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local agent !\n");
00898 break;
00899 }
00900
00901
00902 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
00903 if (routing_entry)
00904 updateGradient(routing_entry, msg->last_hop_, false);
00905
00906 break;
00907
00908 case DATA:
00909
00910 DiffPrint(DEBUG_NO_DETAILS, "Received an old Data message !\n");
00911
00912
00913 routing_itr = routing_list_.begin();
00914 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00915 &routing_itr);
00916
00917 while (routing_entry){
00918 DiffPrint(DEBUG_NO_DETAILS,
00919 "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_);
00920
00921
00922 if (msg->last_hop_ != LOCALHOST_ADDR){
00923 setReinforcementFlags(routing_entry, msg->last_hop_, OLD_MESSAGE);
00924 }
00925
00926
00927 routing_itr++;
00928 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00929 &routing_itr);
00930 }
00931
00932 break;
00933
00934 case PUSH_EXPLORATORY_DATA:
00935
00936
00937 DiffPrint(DEBUG_NO_DETAILS,
00938 "Received an old Push Exploratory Data. Loop detected !\n");
00939
00940 break;
00941
00942 case EXPLORATORY_DATA:
00943
00944
00945 DiffPrint(DEBUG_NO_DETAILS,
00946 "Received an old Exploratory Data. Loop detected !\n");
00947
00948 break;
00949
00950 case POSITIVE_REINFORCEMENT:
00951
00952 DiffPrint(DEBUG_IMPORTANT, "Received an old Positive Reinforcement !\n");
00953
00954 break;
00955
00956 case NEGATIVE_REINFORCEMENT:
00957
00958 DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n");
00959
00960 DiffPrint(DEBUG_IMPORTANT, "pkt_num = %d, rdm_id = %d !\n",
00961 msg->pkt_num_, msg->rdm_id_);
00962
00963 break;
00964
00965 default:
00966
00967 break;
00968 }
00969 }
00970
00971 void GradientFilter::processNewMessage(Message *msg)
00972 {
00973 NRSimpleAttribute<void *> *reinforcement_attr = NULL;
00974 DataForwardingHistory *forwarding_history;
00975 NRSimpleAttribute<int> *nrclass = NULL;
00976 NRSimpleAttribute<int> *nrscope = NULL;
00977 ReinforcementBlob *reinforcement_blob;
00978 RoutingTable::iterator routing_itr;
00979 RoutingEntry *routing_entry;
00980 GradientList::iterator gradient_itr;
00981 GradientEntry *gradient_entry;
00982 NRAttrVec::iterator place;
00983 HashEntry *hash_entry;
00984 AttributeEntry *attribute_entry;
00985 Message *my_msg;
00986 TimerCallback *interest_timer, *subscription_timer;
00987 unsigned int key[2];
00988 bool new_data_type = false;
00989
00990 switch (msg->msg_type_){
00991
00992 case INTEREST:
00993
00994 DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n");
00995
00996 nrclass = NRClassAttr.find(msg->msg_attr_vec_);
00997 nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
00998
00999 if (!nrclass || !nrscope){
01000 DiffPrint(DEBUG_ALWAYS,
01001 "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01002 return;
01003 }
01004
01005
01006 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01007
01008 if (!routing_entry){
01009
01010 routing_entry = new RoutingEntry;
01011 routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01012 routing_list_.push_back(routing_entry);
01013 new_data_type = true;
01014 }
01015
01016 if (msg->last_hop_ == LOCALHOST_ADDR){
01017
01018 updateAgent(routing_entry, msg->source_port_);
01019 }
01020 else{
01021
01022 updateGradient(routing_entry, msg->last_hop_, false);
01023 }
01024
01025 if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01026 (nrclass->getOp() == NRAttribute::IS)){
01027
01028
01029 if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){
01030
01031 interest_timer = new InterestForwardTimer(this, CopyMessage(msg));
01032
01033 ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY +
01034 (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))),
01035 interest_timer);
01036 }
01037 }
01038 else{
01039 if ((nrclass->getOp() != NRAttribute::IS) &&
01040 (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) &&
01041 (new_data_type)){
01042
01043 subscription_timer = new SubscriptionExpirationTimer(this,
01044 CopyAttrs(msg->msg_attr_vec_));
01045
01046 ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY +
01047 (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)),
01048 subscription_timer);
01049 }
01050
01051
01052 break;
01053 }
01054
01055
01056 routing_itr = routing_list_.begin();
01057 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01058 &routing_itr);
01059
01060 while (routing_entry){
01061
01062 attribute_entry = findMatchingSubscription(routing_entry,
01063 msg->msg_attr_vec_);
01064
01065
01066 if (attribute_entry){
01067 GetTime(&(attribute_entry->tv_));
01068 }
01069 else{
01070
01071
01072 attribute_entry = new AttributeEntry(CopyAttrs(msg->msg_attr_vec_));
01073 routing_entry->attr_list_.push_back(attribute_entry);
01074 sendInterest(attribute_entry->attrs_, routing_entry);
01075 }
01076
01077 routing_itr++;
01078 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01079 &routing_itr);
01080 }
01081
01082 break;
01083
01084 case DATA:
01085
01086 DiffPrint(DEBUG_NO_DETAILS, "Received Data !\n");
01087
01088
01089 forwarding_history = new DataForwardingHistory;
01090
01091
01092 routing_itr = routing_list_.begin();
01093 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01094 &routing_itr);
01095
01096 while (routing_entry){
01097 forwardData(msg, routing_entry, forwarding_history);
01098 routing_itr++;
01099 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01100 &routing_itr);
01101 }
01102
01103 delete forwarding_history;
01104
01105 break;
01106
01107 case EXPLORATORY_DATA:
01108
01109 DiffPrint(DEBUG_NO_DETAILS, "Received Exploratory Data !\n");
01110
01111
01112 forwarding_history = new DataForwardingHistory;
01113
01114
01115 routing_itr = routing_list_.begin();
01116 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01117 &routing_itr);
01118
01119 while (routing_entry){
01120 forwardExploratoryData(msg, routing_entry, forwarding_history);
01121 routing_itr++;
01122 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01123 &routing_itr);
01124 }
01125
01126
01127 delete forwarding_history;
01128
01129 break;
01130
01131 case PUSH_EXPLORATORY_DATA:
01132
01133 DiffPrint(DEBUG_NO_DETAILS, "Received Push Exploratory Data !\n");
01134
01135
01136 forwarding_history = new DataForwardingHistory;
01137
01138
01139 forwardPushExploratoryData(msg, forwarding_history);
01140
01141
01142 delete forwarding_history;
01143
01144 break;
01145
01146 case POSITIVE_REINFORCEMENT:
01147
01148 DiffPrint(DEBUG_NO_DETAILS, "Received a Positive Reinforcement !\n");
01149
01150
01151 place = msg->msg_attr_vec_->begin();
01152 reinforcement_attr = ReinforcementAttr.find_from(msg->msg_attr_vec_,
01153 place, &place);
01154 if (!reinforcement_attr){
01155 DiffPrint(DEBUG_ALWAYS,
01156 "Error: Received an invalid Positive Reinforcement message !\n");
01157 return;
01158 }
01159
01160
01161
01162 reinforcement_blob = (ReinforcementBlob *) reinforcement_attr->getVal();
01163
01164 key[0] = reinforcement_blob->pkt_num_;
01165 key[1] = reinforcement_blob->rdm_id_;
01166
01167 hash_entry = getHash(key[0], key[1]);
01168
01169
01170 msg->msg_attr_vec_->erase(place);
01171
01172
01173 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01174
01175 if (!routing_entry){
01176
01177
01178
01179
01180 nrclass = NRClassAttr.find(msg->msg_attr_vec_);
01181 nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
01182
01183 if (!nrclass || !nrscope){
01184 DiffPrint(DEBUG_ALWAYS,
01185 "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01186 return;
01187 }
01188
01189
01190 routing_entry = new RoutingEntry;
01191 routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01192 routing_list_.push_back(routing_entry);
01193 }
01194
01195
01196 updateGradient(routing_entry, msg->last_hop_, true);
01197
01198
01199 msg->msg_attr_vec_->push_back(reinforcement_attr);
01200
01201
01202
01203
01204
01205
01206 if (hash_entry){
01207 msg->next_hop_ = hash_entry->last_hop_;
01208
01209 DiffPrint(DEBUG_NO_DETAILS,
01210 "Forwarding Positive Reinforcement to node %d !\n",
01211 hash_entry->last_hop_);
01212
01213 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
01214 }
01215
01216 break;
01217
01218 case NEGATIVE_REINFORCEMENT:
01219
01220 DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n");
01221
01222 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01223
01224 if (routing_entry){
01225 gradient_entry = findReinforcedGradient(msg->last_hop_, routing_entry);
01226
01227 if (gradient_entry){
01228
01229 deleteGradient(routing_entry, gradient_entry);
01230
01231 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
01232 routing_entry->gradients_.begin(),
01233 &gradient_itr);
01234
01235
01236
01237 if (!gradient_entry){
01238 my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
01239 0, 0, routing_entry->attrs_->size(), pkt_count_,
01240 random_id_, BROADCAST_ADDR, LOCALHOST_ADDR);
01241 my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
01242
01243 DiffPrint(DEBUG_NO_DETAILS,
01244 "Forwarding Negative Reinforcement to ALL !\n");
01245
01246 ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_);
01247
01248 pkt_count_++;
01249 delete my_msg;
01250 }
01251 }
01252 }
01253
01254 break;
01255
01256 default:
01257
01258 break;
01259 }
01260 }
01261
01262 HashEntry * GradientFilter::getHash(unsigned int pkt_num,
01263 unsigned int rdm_id)
01264 {
01265 unsigned int key[2];
01266
01267 key[0] = pkt_num;
01268 key[1] = rdm_id;
01269
01270 Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
01271
01272 if (entryPtr == NULL)
01273 return NULL;
01274
01275 return ((HashEntry *) Tcl_GetHashValue(entryPtr));
01276 }
01277
01278 void GradientFilter::putHash(HashEntry *new_hash_entry,
01279 unsigned int pkt_num,
01280 unsigned int rdm_id)
01281 {
01282 Tcl_HashEntry *tcl_hash_entry;
01283 HashEntry *hash_entry;
01284 HashList::iterator hash_itr;
01285 unsigned int key[2];
01286 int new_hash_key;
01287
01288 if (hash_list_.size() == HASH_TABLE_DATA_MAX_SIZE){
01289
01290
01291 for (int i = 0; ((i < HASH_TABLE_DATA_REMOVE_AT_ONCE)
01292 && (hash_list_.size() > 0)); i++){
01293 hash_itr = hash_list_.begin();
01294 tcl_hash_entry = *hash_itr;
01295 hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
01296 delete hash_entry;
01297 hash_list_.erase(hash_itr);
01298 Tcl_DeleteHashEntry(tcl_hash_entry);
01299 }
01300 }
01301
01302 key[0] = pkt_num;
01303 key[1] = rdm_id;
01304
01305 tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *) key, &new_hash_key);
01306
01307 if (new_hash_key == 0){
01308 DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n");
01309 return;
01310 }
01311
01312 Tcl_SetHashValue(tcl_hash_entry, new_hash_entry);
01313 hash_list_.push_back(tcl_hash_entry);
01314 }
01315
01316 handle GradientFilter::setupFilter()
01317 {
01318 NRAttrVec attrs;
01319 handle h;
01320
01321
01322
01323
01324 attrs.push_back(NRClassAttr.make(NRAttribute::IS,
01325 NRAttribute::INTEREST_CLASS));
01326
01327 h = ((DiffusionRouting *)dr_)->addFilter(&attrs,
01328 GRADIENT_FILTER_PRIORITY, filter_callback_);
01329
01330 ClearAttrs(&attrs);
01331 return h;
01332 }
01333
01334 #ifndef NS_DIFFUSION
01335 void GradientFilter::run()
01336 {
01337
01338 while (1){
01339 sleep(1000);
01340 }
01341 }
01342 #endif // !NS_DIFFUSION
01343
01344 #ifdef NS_DIFFUSION
01345 GradientFilter::GradientFilter(const char *diffrtg)
01346 {
01347 DiffAppAgent *agent;
01348 #else
01349 GradientFilter::GradientFilter(int argc, char **argv)
01350 {
01351 #endif // NS_DIFFUSION
01352 struct timeval tv;
01353 TimerCallback *reinforcement_timer, *gradient_timer;
01354
01355 GetTime(&tv);
01356 SetSeed(&tv);
01357 pkt_count_ = GetRand();
01358 random_id_ = GetRand();
01359
01360
01361 #ifdef NS_DIFFUSION
01362 agent = (DiffAppAgent *)TclObject::lookup(diffrtg);
01363 dr_ = agent->dr();
01364 #else
01365 parseCommandLine(argc, argv);
01366 dr_ = NR::createNR(diffusion_port_);
01367 #endif // NS_DIFFUSION
01368
01369
01370 filter_callback_ = new GradientFilterReceive(this);
01371
01372
01373 Tcl_InitHashTable(&htable_, 2);
01374
01375
01376 filter_handle_ = setupFilter();
01377
01378
01379 DiffPrint(DEBUG_IMPORTANT,
01380 "Gradient filter subscribed to *, received handle %d\n",
01381 filter_handle_);
01382
01383
01384 gradient_timer = new GradientExpirationCheckTimer(this);
01385 ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer);
01386
01387 reinforcement_timer = new ReinforcementCheckTimer(this);
01388 ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer);
01389
01390 GetTime(&tv);
01391
01392 DiffPrint(DEBUG_ALWAYS, "Gradient filter initialized at time %ld:%ld!\n",
01393 tv.tv_sec, tv.tv_usec);
01394 }
01395
01396 #ifndef USE_SINGLE_ADDRESS_SPACE
01397 int main(int argc, char **argv)
01398 {
01399 GradientFilter *app;
01400
01401
01402 app = new GradientFilter(argc, argv);
01403 app->run();
01404
01405 return 0;
01406 }
01407 #endif // !USE_SINGLE_ADDRESS_SPACE