00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "diffusion.hh"
00024
00025 #ifndef NS_DIFFUSION
00026 DiffusionCoreAgent *agent;
00027 #endif // !NS_DIFFUSION
00028
00029 class HashEntry {
00030 public:
00031 bool dummy;
00032
00033 HashEntry() {
00034 dummy = false;
00035 }
00036 };
00037
00038 class NeighborEntry {
00039 public:
00040 int32_t id;
00041 struct timeval tmv;
00042
00043 NeighborEntry(int _id) : id(_id)
00044 {
00045 GetTime(&tmv);
00046 }
00047 };
00048
00049 int NeighborsTimeoutTimer::expire()
00050 {
00051 agent_->neighborsTimeout();
00052
00053 return 0;
00054 }
00055
00056 int FilterTimeoutTimer::expire()
00057 {
00058 agent_->filterTimeout();
00059
00060 return 0;
00061 }
00062
00063 int DiffusionStopTimer::expire()
00064 {
00065 agent_->timeToStop();
00066 #ifndef NS_DIFFUSION
00067 exit(0);
00068 #endif // !NS_DIFFUSION
00069
00070
00071 return 0;
00072 }
00073
00074 void DiffusionCoreAgent::timeToStop()
00075 {
00076 FILE *outfile = NULL;
00077
00078 if (global_debug_level > DEBUG_SOME_DETAILS){
00079 #ifdef NS_DIFFUSION
00080 outfile = fopen("/tmp/diffusion.out", "a");
00081 #else
00082 outfile = fopen("/tmp/diffusion.out", "w");
00083 #endif // NS_DIFFUSION
00084
00085 if (outfile == NULL){
00086 DiffPrint(DEBUG_ALWAYS ,"Diffusion Error: Can't create /tmp/diffusion.out\n");
00087 return;
00088 }
00089 }
00090
00091 #ifdef STATS
00092 stats_->printStats(stdout);
00093 if (outfile)
00094 stats_->printStats(outfile);
00095 # ifndef WIRED
00096 # ifdef USE_RPC
00097 rpcstats_->printStats(stdout);
00098 if (outfile)
00099 rpcstats_->printStats(outfile);
00100 # endif // USE_RPC
00101 # endif // WIRED
00102 #endif // STATS
00103
00104 if (outfile)
00105 fclose(outfile);
00106 }
00107
00108 #ifndef NS_DIFFUSION
00109
00110 void signal_handler(int p)
00111 {
00112 agent->timeToStop();
00113 exit(0);
00114 }
00115
00116 void DiffusionCoreAgent::usage()
00117 {
00118 #ifdef IO_LOG
00119 DiffPrint(DEBUG_ALWAYS, "Usage: diffusion [-d debug] [-f filename] [-t stoptime] [-v] [-h] [-l] [-p port]\n\n");
00120 #else
00121 DiffPrint(DEBUG_ALWAYS, "Usage: diffusion [-d debug] [-f filename] [-t stoptime] [-v] [-h] [-p port]\n\n");
00122 #endif // IO_LOG
00123 DiffPrint(DEBUG_ALWAYS, "\t-d - Sets debug level (0-10)\n");
00124 DiffPrint(DEBUG_ALWAYS, "\t-t - Schedule diffusion to exit after stoptime seconds\n");
00125 DiffPrint(DEBUG_ALWAYS, "\t-f - Uses filename as the config file\n");
00126 DiffPrint(DEBUG_ALWAYS, "\t-v - Prints diffusion version\n");
00127 DiffPrint(DEBUG_ALWAYS, "\t-h - Prints this information\n");
00128 DiffPrint(DEBUG_ALWAYS, "\t-p - Sets diffusion port to port\n");
00129 #ifdef IO_LOG
00130 DiffPrint(DEBUG_ALWAYS, "\t-l - Turns on i/o logging\n");
00131 #endif // IO_LOG
00132
00133 DiffPrint(DEBUG_ALWAYS, "\n");
00134
00135 exit(0);
00136 }
00137
00138 void DiffusionCoreAgent::run()
00139 {
00140 DeviceList::iterator device_itr;
00141 DiffPacket in_pkt;
00142 fd_set fds;
00143
00144 bool flag;
00145 int status, max_sock, fd;
00146 struct timeval tv;
00147
00148
00149 while (1){
00150
00151
00152 FD_ZERO(&fds);
00153 max_sock = 0;
00154
00155
00156 timers_manager_->nextTimerTime(&tv);
00157 if (tv.tv_sec == 0 && tv.tv_usec == 0){
00158
00159 timers_manager_->executeAllExpiredTimers();
00160 continue;
00161 }
00162
00163 for (device_itr = in_devices_.begin();
00164 device_itr != in_devices_.end(); ++device_itr){
00165 (*device_itr)->addInFDS(&fds, &max_sock);
00166 }
00167
00168 status = select(max_sock+1, &fds, NULL, NULL, &tv);
00169
00170 if (status == 0){
00171
00172 timers_manager_->executeAllExpiredTimers();
00173 }
00174
00175
00176 if (status > 0){
00177 do{
00178 flag = false;
00179 for (device_itr = in_devices_.begin();
00180 device_itr != in_devices_.end(); ++device_itr){
00181 fd = (*device_itr)->checkInFDS(&fds);
00182 if (fd != 0){
00183
00184 in_pkt = (*device_itr)->recvPacket(fd);
00185
00186 if (in_pkt)
00187 recvPacket(in_pkt);
00188
00189
00190 FD_CLR(fd, &fds);
00191 status--;
00192 flag = true;
00193 }
00194 }
00195 } while ((status > 0) && (flag == true));
00196 }
00197
00198
00199 if (status < 0){
00200 DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
00201 }
00202 }
00203 }
00204 #endif // !NS_DIFFUSION
00205
00206 void DiffusionCoreAgent::neighborsTimeout()
00207 {
00208 struct timeval tmv;
00209 NeighborEntry *neighbor_entry;
00210 NeighborList::iterator neighbor_itr;
00211
00212 DiffPrint(DEBUG_MORE_DETAILS, "Neighbors Timeout !\n");
00213
00214 GetTime(&tmv);
00215
00216 neighbor_itr = neighbor_list_.begin();
00217
00218 while(neighbor_itr != neighbor_list_.end()){
00219 neighbor_entry = *neighbor_itr;
00220 if (tmv.tv_sec > neighbor_entry->tmv.tv_sec + NEIGHBORS_TIMEOUT){
00221
00222 neighbor_itr = neighbor_list_.erase(neighbor_itr);
00223 delete neighbor_entry;
00224 }
00225 else{
00226 neighbor_itr++;
00227 }
00228 }
00229 }
00230
00231 void DiffusionCoreAgent::filterTimeout()
00232 {
00233 struct timeval tmv;
00234 FilterEntry *filter_entry;
00235 FilterList::iterator filter_itr;
00236
00237 DiffPrint(DEBUG_MORE_DETAILS, "Filter Timeout !\n");
00238
00239 GetTime(&tmv);
00240
00241 filter_itr = filter_list_.begin();
00242
00243 while(filter_itr != filter_list_.end()){
00244 filter_entry = *filter_itr;
00245 if (tmv.tv_sec > filter_entry->tmv_.tv_sec + FILTER_TIMEOUT){
00246
00247
00248 DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d timed out !\n",
00249 filter_entry->agent_, filter_entry->handle_,
00250 filter_entry->priority_);
00251 filter_itr = filter_list_.erase(filter_itr);
00252 delete filter_entry;
00253 }
00254 else{
00255 filter_itr++;
00256 }
00257 }
00258 }
00259
00260 void DiffusionCoreAgent::sendMessage(Message *msg)
00261 {
00262 Tcl_HashEntry *tcl_hash_entry;
00263 unsigned int key[2];
00264 Message *send_message;
00265
00266 send_message = new Message(DIFFUSION_VERSION, msg->msg_type_, diffusion_port_,
00267 0, 0, msg->pkt_num_, msg->rdm_id_,
00268 msg->next_hop_, 0);
00269
00270 send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);
00271 send_message->num_attr_ = send_message->msg_attr_vec_->size();
00272 send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);
00273
00274
00275 key[0] = msg->pkt_num_;
00276 key[1] = msg->rdm_id_;
00277 tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);
00278 if (tcl_hash_entry)
00279 msg->new_message_ = 0;
00280 else
00281 msg->new_message_ = 1;
00282
00283 send_message->new_message_ = msg->new_message_;
00284
00285
00286 if (msg->next_port_){
00287
00288 send_message->last_hop_ = LOCALHOST_ADDR;
00289
00290
00291 if (send_message->next_hop_ != LOCALHOST_ADDR){
00292 DiffPrint(DEBUG_ALWAYS, "Error: Message destination is a local agent but next_hop != LOCALHOST_ADDR !\n");
00293 delete send_message;
00294 return;
00295 }
00296
00297
00298 sendMessageToLibrary(send_message, msg->next_port_);
00299 }
00300 else{
00301
00302 send_message->last_hop_ = my_id_;
00303
00304 #ifdef STATS
00305 stats_->logOutgoingMessage(send_message);
00306 #endif // STATS
00307
00308
00309 if (tcl_hash_entry == NULL)
00310 putHash(key[0], key[1]);
00311 else
00312 DiffPrint(DEBUG_DETAILS, "Message being sent is an old message !\n");
00313
00314
00315 sendMessageToNetwork(send_message);
00316 }
00317
00318 delete send_message;
00319 }
00320
00321 void DiffusionCoreAgent::forwardMessage(Message *msg, FilterEntry *filter_entry)
00322 {
00323 RedirectMessage *original_hdr;
00324 NRAttribute *original_header_attr;
00325 Message *send_message;
00326
00327
00328 original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
00329 msg->source_port_, msg->data_len_,
00330 msg->num_attr_, msg->rdm_id_,
00331 msg->pkt_num_, msg->next_hop_,
00332 msg->last_hop_, filter_entry->handle_,
00333 msg->next_port_);
00334
00335 original_header_attr = OriginalHdrAttr.make(NRAttribute::IS,
00336 (void *)original_hdr,
00337 sizeof(RedirectMessage));
00338
00339 send_message = new Message(DIFFUSION_VERSION, REDIRECT, diffusion_port_, 0,
00340 0, pkt_count_, random_id_, LOCALHOST_ADDR, my_id_);
00341
00342
00343 pkt_count_++;
00344
00345
00346 send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);
00347
00348
00349 send_message->msg_attr_vec_->push_back(original_header_attr);
00350 send_message->num_attr_ = send_message->msg_attr_vec_->size();
00351 send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);
00352
00353 sendMessageToLibrary(send_message, filter_entry->agent_);
00354
00355 delete send_message;
00356 delete original_hdr;
00357 }
00358
00359 #ifndef NS_DIFFUSION
00360 void DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id)
00361 {
00362 DiffPacket out_pkt = NULL;
00363 struct hdr_diff *dfh;
00364 int len;
00365 char *pos;
00366
00367 out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00368 dfh = HDR_DIFF(out_pkt);
00369
00370 pos = (char *) out_pkt;
00371 pos = pos + sizeof(struct hdr_diff);
00372
00373 len = PackAttrs(msg->msg_attr_vec_, pos);
00374
00375 LAST_HOP(dfh) = htonl(msg->last_hop_);
00376 NEXT_HOP(dfh) = htonl(msg->next_hop_);
00377 DIFF_VER(dfh) = msg->version_;
00378 MSG_TYPE(dfh) = msg->msg_type_;
00379 DATA_LEN(dfh) = htons(len);
00380 PKT_NUM(dfh) = htonl(msg->pkt_num_);
00381 RDM_ID(dfh) = htonl(msg->rdm_id_);
00382 NUM_ATTR(dfh) = htons(msg->num_attr_);
00383 SRC_PORT(dfh) = htons(msg->source_port_);
00384
00385 sendPacketToLibrary(out_pkt, sizeof(struct hdr_diff) + len, agent_id);
00386
00387 delete [] out_pkt;
00388 }
00389 #else
00390 void DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id)
00391 {
00392 Message *send_message;
00393 DeviceList::iterator device_itr;
00394 int len;
00395
00396 send_message = CopyMessage(msg);
00397 len = CalculateSize(send_message->msg_attr_vec_);
00398 len = len + sizeof(struct hdr_diff);
00399
00400 for (device_itr = local_out_devices_.begin();
00401 device_itr != local_out_devices_.end(); ++device_itr){
00402 (*device_itr)->sendPacket((DiffPacket) send_message, len, agent_id);
00403 }
00404 }
00405 #endif // !NS_DIFFUSION
00406
00407 #ifndef NS_DIFFUSION
00408 void DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
00409 {
00410 DiffPacket out_pkt = NULL;
00411 struct hdr_diff *dfh;
00412 int len;
00413 char *pos;
00414
00415 out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00416 dfh = HDR_DIFF(out_pkt);
00417
00418 pos = (char *) out_pkt;
00419 pos = pos + sizeof(struct hdr_diff);
00420
00421 len = PackAttrs(msg->msg_attr_vec_, pos);
00422
00423 LAST_HOP(dfh) = htonl(msg->last_hop_);
00424 NEXT_HOP(dfh) = htonl(msg->next_hop_);
00425 DIFF_VER(dfh) = msg->version_;
00426 MSG_TYPE(dfh) = msg->msg_type_;
00427 DATA_LEN(dfh) = htons(len);
00428 PKT_NUM(dfh) = htonl(msg->pkt_num_);
00429 RDM_ID(dfh) = htonl(msg->rdm_id_);
00430 NUM_ATTR(dfh) = htons(msg->num_attr_);
00431 SRC_PORT(dfh) = htons(msg->source_port_);
00432
00433 sendPacketToNetwork(out_pkt, sizeof(struct hdr_diff) + len, msg->next_hop_);
00434
00435 delete [] out_pkt;
00436 }
00437 #else
00438 void DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
00439 {
00440 Message *send_message;
00441 int len;
00442 int32_t dst;
00443 DeviceList::iterator device_itr;
00444
00445 send_message = CopyMessage(msg);
00446 len = CalculateSize(send_message->msg_attr_vec_);
00447 len = len + sizeof(struct hdr_diff);
00448 dst = send_message->next_hop_;
00449
00450 for (device_itr = out_devices_.begin();
00451 device_itr != out_devices_.end(); ++device_itr){
00452 (*device_itr)->sendPacket((DiffPacket) send_message, len, dst);
00453 }
00454 }
00455 #endif // !NS_DIFFUSION
00456
00457 void DiffusionCoreAgent::sendPacketToLibrary(DiffPacket pkt, int len,
00458 u_int16_t dst)
00459 {
00460 DeviceList::iterator device_itr;
00461
00462 for (device_itr = local_out_devices_.begin();
00463 device_itr != local_out_devices_.end(); ++device_itr){
00464 (*device_itr)->sendPacket(pkt, len, dst);
00465 }
00466 }
00467
00468 void DiffusionCoreAgent::sendPacketToNetwork(DiffPacket pkt, int len, int dst)
00469 {
00470 DeviceList::iterator device_itr;
00471
00472 for (device_itr = out_devices_.begin();
00473 device_itr != out_devices_.end(); ++device_itr){
00474 (*device_itr)->sendPacket(pkt, len, dst);
00475 }
00476 }
00477
00478 void DiffusionCoreAgent::updateNeighbors(int id)
00479 {
00480 NeighborList::iterator neighbor_itr;
00481 NeighborEntry *neighbor_entry;
00482
00483 if (id == LOCALHOST_ADDR || id == my_id_)
00484 return;
00485
00486 for (neighbor_itr = neighbor_list_.begin();
00487 neighbor_itr != neighbor_list_.end(); ++neighbor_itr){
00488 if ((*neighbor_itr)->id == id)
00489 break;
00490 }
00491
00492 if (neighbor_itr == neighbor_list_.end()){
00493
00494 neighbor_entry = new NeighborEntry(id);
00495 neighbor_list_.push_front(neighbor_entry);
00496 }
00497 else{
00498
00499 GetTime(&((*neighbor_itr)->tmv));
00500 }
00501 }
00502
00503 FilterEntry * DiffusionCoreAgent::findFilter(int16_t handle, u_int16_t agent)
00504 {
00505 FilterList::iterator filter_itr;
00506 FilterEntry *filter_entry;
00507
00508 for (filter_itr = filter_list_.begin();
00509 filter_itr != filter_list_.end(); ++filter_itr){
00510 filter_entry = *filter_itr;
00511 if (handle != filter_entry->handle_ || agent != filter_entry->agent_)
00512 continue;
00513
00514
00515 return filter_entry;
00516 }
00517 return NULL;
00518 }
00519
00520 FilterEntry * DiffusionCoreAgent::deleteFilter(int16_t handle, u_int16_t agent)
00521 {
00522 FilterList::iterator filter_itr = filter_list_.begin();
00523 FilterEntry *filter_entry = NULL;
00524
00525 while (filter_itr != filter_list_.end()){
00526 filter_entry = *filter_itr;
00527 if (handle == filter_entry->handle_ && agent == filter_entry->agent_){
00528 filter_list_.erase(filter_itr);
00529 break;
00530 }
00531 filter_entry = NULL;
00532 filter_itr++;
00533 }
00534 return filter_entry;
00535 }
00536
00537 bool DiffusionCoreAgent::addFilter(NRAttrVec *attrs, u_int16_t agent,
00538 int16_t handle, u_int16_t priority)
00539 {
00540 FilterList::iterator filter_itr;
00541 FilterEntry *filter_entry;
00542
00543 filter_itr = filter_list_.begin();
00544 while (filter_itr != filter_list_.end()){
00545 filter_entry = *filter_itr;
00546 if (filter_entry->priority_ == priority)
00547 return false;
00548 filter_itr++;
00549 }
00550
00551 filter_entry = new FilterEntry(handle, priority, agent);
00552
00553
00554 filter_entry->filter_attrs_ = CopyAttrs(attrs);
00555
00556
00557 filter_list_.push_back(filter_entry);
00558
00559 return true;
00560 }
00561
00562 FilterList::iterator DiffusionCoreAgent::findMatchingFilter(NRAttrVec *attrs,
00563 FilterList::iterator filter_itr)
00564 {
00565 FilterEntry *filter_entry;
00566
00567 for (;filter_itr != filter_list_.end(); ++filter_itr){
00568 filter_entry = *filter_itr;
00569
00570 if (OneWayMatch(filter_entry->filter_attrs_, attrs)){
00571
00572 break;
00573 }
00574 }
00575 return filter_itr;
00576 }
00577
00578 bool DiffusionCoreAgent::restoreOriginalHeader(Message *msg)
00579 {
00580 NRAttrVec::iterator attr_itr = msg->msg_attr_vec_->begin();
00581 NRSimpleAttribute<void *> *original_header_attr = NULL;
00582 RedirectMessage *original_hdr;
00583
00584
00585 original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
00586 attr_itr, &attr_itr);
00587 if (!original_header_attr){
00588 DiffPrint(DEBUG_ALWAYS, "Error: DiffusionCoreAgent::ProcessControlMessage couldn't find the OriginalHdrAttr !\n");
00589 return false;
00590 }
00591
00592
00593 original_hdr = (RedirectMessage *) original_header_attr->getVal();
00594
00595 msg->msg_type_ = original_hdr->msg_type_;
00596 msg->source_port_ = original_hdr->source_port_;
00597 msg->pkt_num_ = original_hdr->pkt_num_;
00598 msg->rdm_id_ = original_hdr->rdm_id_;
00599 msg->next_hop_ = original_hdr->next_hop_;
00600 msg->last_hop_ = original_hdr->last_hop_;
00601 msg->new_message_ = original_hdr->new_message_;
00602 msg->num_attr_ = original_hdr->num_attr_;
00603 msg->data_len_ = original_hdr->data_len_;
00604 msg->next_port_ = original_hdr->next_port_;
00605
00606
00607 msg->msg_attr_vec_->erase(attr_itr);
00608 delete original_header_attr;
00609
00610 return true;
00611 }
00612
00613 FilterList * DiffusionCoreAgent::getFilterList(NRAttrVec *attrs)
00614 {
00615 FilterList *matching_filter_list = new FilterList;
00616 FilterList::iterator known_filters_itr, filter_list_itr;
00617 FilterEntry *matching_filter_entry, *filter_entry;
00618
00619
00620
00621
00622 known_filters_itr = findMatchingFilter(attrs, filter_list_.begin());
00623
00624 while (known_filters_itr != filter_list_.end()){
00625
00626 matching_filter_entry = *known_filters_itr;
00627
00628 for (filter_list_itr = matching_filter_list->begin();
00629 filter_list_itr != matching_filter_list->end(); ++filter_list_itr){
00630 filter_entry = *filter_list_itr;
00631
00632
00633 if (matching_filter_entry->priority_ > filter_entry->priority_)
00634 break;
00635 }
00636
00637
00638 matching_filter_list->insert(filter_list_itr, matching_filter_entry);
00639
00640
00641 known_filters_itr++;
00642 known_filters_itr = findMatchingFilter(attrs, known_filters_itr);
00643 }
00644 return matching_filter_list;
00645 }
00646
00647 u_int16_t DiffusionCoreAgent::getNextFilterPriority(int16_t handle,
00648 u_int16_t priority,
00649 u_int16_t agent)
00650 {
00651 FilterList::iterator filter_itr;
00652 FilterEntry *filter_entry;
00653
00654 if ((priority < FILTER_MIN_PRIORITY) ||
00655 (priority > FILTER_KEEP_PRIORITY))
00656 return FILTER_INVALID_PRIORITY;
00657
00658 if (priority < FILTER_KEEP_PRIORITY)
00659 return (priority - 1);
00660
00661 filter_itr = filter_list_.begin();
00662
00663 while (filter_itr != filter_list_.end()){
00664 filter_entry = *filter_itr;
00665
00666 if ((filter_entry->handle_ == handle) && (filter_entry->agent_ == agent)){
00667
00668 return (filter_entry->priority_ - 1);
00669 }
00670
00671 filter_itr++;
00672 }
00673
00674 return FILTER_INVALID_PRIORITY;
00675 }
00676
00677 void DiffusionCoreAgent::processMessage(Message *msg)
00678 {
00679 FilterList *filter_list;
00680 FilterList::iterator filter_list_itr;
00681 FilterEntry *filter_entry;
00682
00683 filter_list = getFilterList(msg->msg_attr_vec_);
00684
00685
00686
00687 if (filter_list->size() > 0){
00688 filter_list_itr = filter_list->begin();
00689 filter_entry = *filter_list_itr;
00690
00691 forwardMessage(msg, filter_entry);
00692 filter_list->clear();
00693 }
00694 delete filter_list;
00695 }
00696
00697 void DiffusionCoreAgent::processControlMessage(Message *msg)
00698 {
00699 NRSimpleAttribute<void *> *ctrl_msg_attr = NULL;
00700 NRAttrVec::iterator attr_itr;
00701 ControlMessage *control_blob = NULL;
00702 FilterList *filter_list;
00703 FilterList::iterator filter_list_itr;
00704 FilterEntry *filter_entry;
00705 int command, param1, param2;
00706 u_int16_t priority, source_port, new_priority;
00707 int16_t handle;
00708 bool filter_is_last = false;
00709
00710
00711 if (msg->last_hop_ != LOCALHOST_ADDR){
00712 DiffPrint(DEBUG_ALWAYS,
00713 "Error: Received control message from another node !\n");
00714 return;
00715 }
00716
00717
00718 attr_itr = msg->msg_attr_vec_->begin();
00719 ctrl_msg_attr = ControlMsgAttr.find_from(msg->msg_attr_vec_,
00720 attr_itr, &attr_itr);
00721
00722 if (!ctrl_msg_attr){
00723
00724 DiffPrint(DEBUG_ALWAYS, "Error: Control message received is invalid !\n");
00725 return;
00726 }
00727
00728
00729 control_blob = (ControlMessage *) ctrl_msg_attr->getVal();
00730 command = control_blob->command_;
00731 param1 = control_blob->param1_;
00732 param2 = control_blob->param2_;
00733
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772 logControlMessage(msg, command, param1, param2);
00773
00774
00775 msg->msg_attr_vec_->erase(attr_itr);
00776 delete ctrl_msg_attr;
00777
00778 switch(command){
00779 case ADD_UPDATE_FILTER:
00780
00781 priority = param1;
00782 handle = param2;
00783
00784 filter_entry = findFilter(handle, msg->source_port_);
00785
00786 if (filter_entry){
00787
00788 if (PerfectMatch(filter_entry->filter_attrs_, msg->msg_attr_vec_)){
00789
00790 GetTime(&(filter_entry->tmv_));
00791
00792
00793 if (priority == filter_entry->priority_){
00794
00795 DiffPrint(DEBUG_SOME_DETAILS, "Filter %d, %d, %d refreshed.\n",
00796 filter_entry->agent_, filter_entry->handle_,
00797 filter_entry->priority_);
00798 }
00799 else{
00800
00801 DiffPrint(DEBUG_NO_DETAILS,
00802 "Updated priority of filter %d, %d, %d to %d\n",
00803 msg->source_port_, handle, filter_entry->priority_, priority);
00804 filter_entry->priority_ = priority;
00805 }
00806
00807 break;
00808 }
00809 else{
00810
00811 DiffPrint(DEBUG_ALWAYS,
00812 "Filter attributes cannot change during an update !\n");
00813 break;
00814 }
00815 }
00816 else{
00817
00818 if (!addFilter(msg->msg_attr_vec_, msg->source_port_, handle, priority)){
00819 DiffPrint(DEBUG_ALWAYS, "Failed to add filter %d, %d, %d\n",
00820 msg->source_port_, handle, priority);
00821 }
00822 else{
00823 DiffPrint(DEBUG_NO_DETAILS, "Adding filter %d, %d, %d\n",
00824 msg->source_port_, handle, priority);
00825 }
00826 }
00827
00828 break;
00829
00830 case REMOVE_FILTER:
00831
00832 handle = param1;
00833 filter_entry = deleteFilter(handle, msg->source_port_);
00834 if (filter_entry){
00835
00836 DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d deleted.\n",
00837 filter_entry->agent_, filter_entry->handle_,
00838 filter_entry->priority_);
00839
00840 delete filter_entry;
00841 }
00842 else{
00843 DiffPrint(DEBUG_ALWAYS, "Couldn't find filter to delete !\n");
00844 }
00845
00846 break;
00847
00848 case SEND_MESSAGE:
00849
00850 handle = param1;
00851 priority = param2;
00852 source_port = msg->source_port_;
00853
00854 if (!restoreOriginalHeader(msg))
00855 break;
00856
00857 new_priority = getNextFilterPriority(handle, priority, source_port);
00858
00859 if (new_priority == FILTER_INVALID_PRIORITY)
00860 break;
00861
00862
00863 filter_list = getFilterList(msg->msg_attr_vec_);
00864
00865
00866 if (filter_list->size() > 0){
00867 for (filter_list_itr = filter_list->begin();
00868 filter_list_itr != filter_list->end(); ++filter_list_itr){
00869 filter_entry = *filter_list_itr;
00870 if (filter_entry->priority_ <= new_priority){
00871 forwardMessage(msg, filter_entry);
00872 break;
00873 }
00874 }
00875
00876 if (filter_list_itr == filter_list->end())
00877 filter_is_last = true;
00878
00879 }
00880 else{
00881 filter_is_last = true;
00882 }
00883
00884 if (filter_is_last){
00885
00886 sendMessage(msg);
00887 }
00888
00889 filter_list->clear();
00890
00891 delete filter_list;
00892
00893 break;
00894
00895 default:
00896
00897 DiffPrint(DEBUG_ALWAYS, "Error: Unknown control message received !\n");
00898
00899 break;
00900 }
00901 }
00902
00903 void DiffusionCoreAgent::logControlMessage(Message *msg, int command,
00904 int param1, int param2)
00905 {
00906
00907 }
00908
00909 #ifdef NS_DIFFUSION
00910 DiffusionCoreAgent::DiffusionCoreAgent(DiffRoutingAgent *diffrtg, int nodeid)
00911 {
00912 #else
00913 DiffusionCoreAgent::DiffusionCoreAgent(int argc, char **argv)
00914 {
00915 int opt;
00916 int debug_level;
00917 #endif // NS_DIFFUSION
00918 DeviceList *in_devices, *out_devices, *local_out_devices;
00919 DiffusionIO *device;
00920 TimerCallback *callback;
00921 char *scadds_env;
00922 long stop_time;
00923 struct timeval tv;
00924 #ifdef IO_LOG
00925 IOLog *pseudo_io_device;
00926 bool use_io_log = false;
00927 #endif // IO_LOG
00928
00929 opterr = 0;
00930 config_file_ = NULL;
00931 stop_time = 0;
00932
00933 scadds_env = getenv("scadds_addr");
00934 diffusion_port_ = DEFAULT_DIFFUSION_PORT;
00935
00936 #ifndef NS_DIFFUSION
00937
00938 while (1){
00939 opt = getopt(argc, argv, COMMAND_LINE_ARGS);
00940
00941 switch(opt){
00942
00943 case 'p':
00944
00945 diffusion_port_ = (u_int16_t) atoi(optarg);
00946 if ((diffusion_port_ < 1024) || (diffusion_port_ >= 65535)){
00947 DiffPrint(DEBUG_ALWAYS,
00948 "Diffusion Error: Port must be between 1024 and 65535 !\n");
00949 exit(-1);
00950 }
00951
00952 break;
00953
00954 case 't':
00955
00956 stop_time = atol(optarg);
00957 if (stop_time <= 0){
00958 DiffPrint(DEBUG_ALWAYS, "Diffusion Error: stop time must be > 0\n");
00959 exit(-1);
00960 }
00961 else{
00962 DiffPrint(DEBUG_ALWAYS, "%s will stop after %ld seconds\n",
00963 PROGRAM, stop_time);
00964 }
00965
00966 break;
00967
00968 #ifdef IO_LOG
00969 case 'l':
00970
00971 use_io_log = true;
00972
00973 break;
00974
00975 #endif // IO_LOG
00976
00977 case 'h':
00978
00979 usage();
00980
00981 break;
00982
00983 case 'v':
00984
00985 DiffPrint(DEBUG_ALWAYS, "\n%s %s\n", PROGRAM, RELEASE);
00986 exit(0);
00987
00988 break;
00989
00990 case 'd':
00991
00992 debug_level = atoi(optarg);
00993
00994 if (debug_level < 1 || debug_level > 10){
00995 DiffPrint(DEBUG_ALWAYS,
00996 "Error: Debug level outside range or missing !\n");
00997 usage();
00998 }
00999
01000 global_debug_level = debug_level;
01001
01002 break;
01003
01004 case 'f':
01005
01006 if (!strncasecmp(optarg, "-", 1)){
01007 DiffPrint(DEBUG_ALWAYS, "Error: Parameter is missing !\n");
01008 usage();
01009 }
01010
01011 config_file_ = strdup(optarg);
01012
01013 break;
01014
01015 case '?':
01016
01017 DiffPrint(DEBUG_ALWAYS,
01018 "Error: %c isn't a valid option or its parameter is missing !\n",
01019 optopt);
01020 usage();
01021
01022 break;
01023
01024 case ':':
01025
01026 DiffPrint(DEBUG_ALWAYS, "Parameter missing !\n");
01027 usage();
01028
01029 break;
01030 }
01031
01032 if (opt == -1)
01033 break;
01034 }
01035
01036 if (!config_file_)
01037 config_file_ = strdup(DEFAULT_CONFIG_FILE);
01038
01039
01040 if (scadds_env != NULL){
01041 my_id_ = atoi(scadds_env);
01042 }
01043 else{
01044 DiffPrint(DEBUG_ALWAYS,
01045 "Diffusion : scadds_addr not set. Using random id.\n");
01046
01047
01048 do{
01049 GetTime(&tv);
01050 SetSeed(&tv);
01051 my_id_ = GetRand();
01052 }
01053 while(my_id_ == LOCALHOST_ADDR || my_id_ == BROADCAST_ADDR);
01054 }
01055 #else
01056 my_id_ = nodeid;
01057 #endif // !NS_DIFFUSION
01058
01059
01060 lon_ = 0.0;
01061 lat_ = 0.0;
01062
01063 #ifdef STATS
01064 stats_ = new DiffusionStats(my_id_);
01065 # ifndef WIRED
01066 # ifdef USE_RPC
01067 rpcstats_ = new RPCStats(my_id_);
01068 # endif // USE_RPC
01069 # endif // !WIRED
01070 #endif // STATS
01071
01072 GetTime(&tv);
01073 SetSeed(&tv);
01074 pkt_count_ = GetRand();
01075 random_id_ = GetRand();
01076
01077 Tcl_InitHashTable(&htable_, 2);
01078
01079
01080 timers_manager_ = new TimerManager;
01081
01082
01083 callback = new NeighborsTimeoutTimer(this);
01084 timers_manager_->addTimer(NEIGHBORS_DELAY, callback);
01085
01086 callback = new FilterTimeoutTimer(this);
01087 timers_manager_->addTimer(FILTER_DELAY, callback);
01088
01089 if (stop_time > 0){
01090 callback = new DiffusionStopTimer(this);
01091 timers_manager_->addTimer((stop_time * 1000), callback);
01092 }
01093
01094 GetTime(&tv);
01095
01096
01097 DiffPrint(DEBUG_ALWAYS, "Diffusion : starting at time %ld:%ld\n",
01098 tv.tv_sec, tv.tv_usec);
01099 DiffPrint(DEBUG_ALWAYS, "Diffusion : Node id = %d\n", my_id_);
01100
01101
01102 #ifdef IO_LOG
01103 if (use_io_log){
01104 pseudo_io_device = new IOLog(my_id_);
01105 in_devices_.push_back(pseudo_io_device);
01106 out_devices_.push_back(pseudo_io_device);
01107
01108 in_devices = &(pseudo_io_device->in_devices_);
01109 out_devices = &(pseudo_io_device->out_devices_);
01110 local_out_devices = &(local_out_devices_);
01111 }
01112 else{
01113 in_devices = &(in_devices_);
01114 out_devices = &(out_devices_);
01115 local_out_devices = &(local_out_devices_);
01116 }
01117 #else
01118 in_devices = &(in_devices_);
01119 out_devices = &(out_devices_);
01120 local_out_devices = &(local_out_devices_);
01121 #endif // IO_LOG
01122
01123 #ifdef NS_DIFFUSION
01124 device = new LocalApp(diffrtg);
01125 local_out_devices->push_back(device);
01126
01127 device = new LinkLayerAbs(diffrtg);
01128 out_devices->push_back(device);
01129 #endif // NS_DIFFUSION
01130
01131 #ifdef UDP
01132 device = new UDPLocal(&diffusion_port_);
01133 in_devices->push_back(device);
01134 local_out_devices->push_back(device);
01135
01136 #ifdef WIRED
01137 device = new UDPWired(config_file_);
01138 out_devices->push_back(device);
01139 #endif // WIRED
01140 #endif // UDP
01141
01142 #ifdef USE_RPC
01143 device = new RPCIO();
01144 in_devices->push_back(device);
01145 out_devices->push_back(device);
01146 #endif // USE_RPC
01147
01148 #ifdef USE_MOTE_NIC
01149 device = new MOTEIO();
01150 in_devices->push_back(device);
01151 out_devices->push_back(device);
01152 #endif // USE_MOTE_NIC
01153
01154 #ifdef USE_WINSNG2
01155 device = new WINSNG2();
01156 in_devices->push_back(device);
01157 out_devices->push_back(device);
01158 #endif // USE_WINSNG2
01159 }
01160
01161 HashEntry * DiffusionCoreAgent::getHash(unsigned int pkt_num,
01162 unsigned int rdm_id)
01163 {
01164 unsigned int key[2];
01165
01166 key[0] = pkt_num;
01167 key[1] = rdm_id;
01168
01169 Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
01170
01171 if (entryPtr == NULL)
01172 return NULL;
01173
01174 return (HashEntry *)Tcl_GetHashValue(entryPtr);
01175 }
01176
01177 void DiffusionCoreAgent::putHash(unsigned int pkt_num,
01178 unsigned int rdm_id)
01179 {
01180 Tcl_HashEntry *tcl_hash_entry;
01181 HashEntry *hash_entry;
01182 HashList::iterator hash_itr;
01183 unsigned int key[2];
01184 int new_hash_key;
01185
01186 if (hash_list_.size() == HASH_TABLE_MAX_SIZE){
01187
01188
01189 for (int i = 0; ((i < HASH_TABLE_REMOVE_AT_ONCE)
01190 && (hash_list_.size() > 0)); i++){
01191 hash_itr = hash_list_.begin();
01192 tcl_hash_entry = *hash_itr;
01193 hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
01194 delete hash_entry;
01195 hash_list_.erase(hash_itr);
01196 Tcl_DeleteHashEntry(tcl_hash_entry);
01197 }
01198 }
01199
01200 key[0] = pkt_num;
01201 key[1] = rdm_id;
01202
01203 tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *)key, &new_hash_key);
01204
01205 if (new_hash_key == 0){
01206 DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n");
01207 return;
01208 }
01209
01210 hash_entry = new HashEntry;
01211
01212 Tcl_SetHashValue(tcl_hash_entry, hash_entry);
01213 hash_list_.push_back(tcl_hash_entry);
01214 }
01215
01216 #ifndef NS_DIFFUSION
01217 void DiffusionCoreAgent::recvPacket(DiffPacket pkt)
01218 {
01219 struct hdr_diff *dfh = HDR_DIFF(pkt);
01220 Message *rcv_message = NULL;
01221 int8_t version, msg_type;
01222 u_int16_t data_len, num_attr, source_port;
01223 int32_t rdm_id, pkt_num, next_hop, last_hop;
01224
01225
01226 version = DIFF_VER(dfh);
01227 msg_type = MSG_TYPE(dfh);
01228 source_port = ntohs(SRC_PORT(dfh));
01229 pkt_num = ntohl(PKT_NUM(dfh));
01230 rdm_id = ntohl(RDM_ID(dfh));
01231 num_attr = ntohs(NUM_ATTR(dfh));
01232 next_hop = ntohl(NEXT_HOP(dfh));
01233 last_hop = ntohl(LAST_HOP(dfh));
01234 data_len = ntohs(DATA_LEN(dfh));
01235
01236
01237 rcv_message = new Message(version, msg_type, source_port, data_len,
01238 num_attr, pkt_num, rdm_id, next_hop, last_hop);
01239
01240
01241 rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
01242
01243
01244 recvMessage(rcv_message);
01245
01246
01247 delete rcv_message;
01248
01249 delete [] pkt;
01250 }
01251 #endif // !NS_DIFFUSION
01252
01253 void DiffusionCoreAgent::recvMessage(Message *msg)
01254 {
01255 Tcl_HashEntry *tcl_hash_entry;
01256 unsigned int key[2];
01257
01258
01259 if (msg->version_ != DIFFUSION_VERSION)
01260 return;
01261
01262
01263 if (msg->last_hop_ == my_id_){
01264 DiffPrint(DEBUG_ALWAYS, "Error: A diffusion ID conflict has been detected !\n");
01265 exit(-1);
01266 }
01267
01268
01269 if ((msg->next_hop_ != BROADCAST_ADDR) &&
01270 (msg->next_hop_ != LOCALHOST_ADDR) &&
01271 (msg->next_hop_ != my_id_))
01272 return;
01273
01274
01275 if (msg->msg_type_ != CONTROL){
01276
01277
01278 key[0] = msg->pkt_num_;
01279 key[1] = msg->rdm_id_;
01280 tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);
01281
01282 if (tcl_hash_entry != NULL){
01283 DiffPrint(DEBUG_DETAILS, "Received old message !\n");
01284 msg->new_message_ = 0;
01285 }
01286 else{
01287
01288 putHash(key[0], key[1]);
01289 msg->new_message_ = 1;
01290 }
01291 }
01292
01293 #ifdef STATS
01294 stats_->logIncomingMessage(msg);
01295 #endif // STATS
01296
01297
01298 if (msg->msg_type_ == CONTROL)
01299 processControlMessage(msg);
01300 else
01301 processMessage(msg);
01302 }
01303
01304 #ifndef USE_SINGLE_ADDRESS_SPACE
01305 int main(int argc, char **argv)
01306 {
01307 agent = new DiffusionCoreAgent(argc, argv);
01308
01309 signal(SIGINT, signal_handler);
01310
01311 agent->run();
01312
01313 return 0;
01314 }
01315 #endif // !USE_SINGLE_ADDRESS_SPACE