00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025
00026 #include "dr.hh"
00027
00028 class CallbackEntry {
00029 public:
00030 NR::Callback *cb_;
00031 NR::handle subscription_handle_;
00032
00033 CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) :
00034 cb_(cb), subscription_handle_(subscription_handle) {};
00035 };
00036
00037 class HandleEntry {
00038 public:
00039 handle hdl_;
00040 bool valid_;
00041 NRAttrVec *attrs_;
00042 NR::Callback *cb_;
00043 struct timeval exploratory_time_;
00044
00045 HandleEntry()
00046 {
00047 GetTime(&exploratory_time_);
00048 valid_ = true;
00049 cb_ = NULL;
00050 };
00051
00052 ~HandleEntry(){
00053
00054 ClearAttrs(attrs_);
00055 delete attrs_;
00056 };
00057 };
00058
00059 int InterestCallback::expire()
00060 {
00061 int retval;
00062
00063
00064 retval = drt_->interestTimeout(handle_entry_);
00065
00066 if (retval < 0)
00067 delete this;
00068
00069 return retval;
00070 }
00071
00072 int FilterKeepaliveCallback::expire()
00073 {
00074 int retval;
00075
00076
00077 retval = drt_->filterKeepaliveTimeout(filter_entry_);
00078
00079 if (retval < 0)
00080 delete this;
00081
00082 return retval;
00083 }
00084
00085 int OldAPITimer::expire()
00086 {
00087 int retval;
00088
00089
00090 retval = cb_->expire(0, p_);
00091
00092 if (retval < 0)
00093 delete this;
00094
00095 return retval;
00096 }
00097
00098 #ifdef NS_DIFFUSION
00099 class DiffEventQueue;
00100
00101 int DiffusionRouting::getAgentId(int id) {
00102 if (id != -1)
00103 agent_id_ = id;
00104 return agent_id_;
00105 }
00106
00107 NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) {
00108 return(new DiffusionRouting(port, da));
00109 }
00110 #else
00111 NR *dr = NULL;
00112
00113 #ifdef USE_THREADS
00114 void * ReceiveThread(void *dr)
00115 {
00116
00117 ((DiffusionRouting *)dr)->run(true, WAIT_FOREVER);
00118
00119 return NULL;
00120 }
00121 #endif // USE_THREADS
00122
00123 NR * NR::createNR(u_int16_t port)
00124 {
00125
00126 if (dr)
00127 return dr;
00128
00129 dr = new DiffusionRouting(port);
00130
00131 #ifdef USE_THREADS
00132 int retval;
00133 pthread_t thread;
00134
00135
00136 retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr);
00137
00138 if (retval){
00139 DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n");
00140 exit(-1);
00141 }
00142 #endif // USE_THREADS
00143
00144 return dr;
00145 }
00146 #endif // NS_DIFFUSION
00147
00148 void GetLock(pthread_mutex_t *mutex)
00149 {
00150 #ifdef USE_THREADS
00151 pthread_mutex_lock(mutex);
00152 #endif // USE_THREADS
00153 }
00154
00155 void ReleaseLock(pthread_mutex_t *mutex)
00156 {
00157 #ifdef USE_THREADS
00158 pthread_mutex_unlock(mutex);
00159 #endif // USE_THREADS
00160 }
00161
00162 #ifdef NS_DIFFUSION
00163 DiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da)
00164 #else
00165 DiffusionRouting::DiffusionRouting(u_int16_t port)
00166 #endif
00167 {
00168 struct timeval tv;
00169 DiffusionIO *device;
00170
00171
00172 next_handle_ = 1;
00173 GetTime(&tv);
00174 SetSeed(&tv);
00175 pkt_count_ = GetRand();
00176 random_id_ = GetRand();
00177 agent_id_ = 0;
00178
00179 if (port == 0)
00180 port = DEFAULT_DIFFUSION_PORT;
00181
00182 diffusion_port_ = port;
00183
00184
00185 timers_manager_ = new TimerManager;
00186
00187
00188 #ifdef NS_DIFFUSION
00189 device = new NsLocal(da);
00190 local_out_devices_.push_back(device);
00191 #endif // NS_DIFFUSION
00192
00193 #ifdef UDP
00194 device = new UDPLocal(&agent_id_);
00195 in_devices_.push_back(device);
00196 local_out_devices_.push_back(device);
00197 #endif // UDP
00198
00199
00200 DiffPrint(DEBUG_ALWAYS,
00201 "Diffusion Routing Agent initializing... Agent Id = %d\n",
00202 agent_id_);
00203
00204 #ifdef USE_THREADS
00205
00206 dr_mtx_ = new pthread_mutex_t;
00207 pthread_mutex_init(dr_mtx_, NULL);
00208 #endif // USE_THREADS
00209 }
00210
00211 DiffusionRouting::~DiffusionRouting()
00212 {
00213 HandleList::iterator itr;
00214 HandleEntry *current;
00215
00216
00217 for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){
00218 current = *itr;
00219 delete current;
00220 }
00221
00222 for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){
00223 current = *itr;
00224 delete current;
00225 }
00226 }
00227
00228 handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb)
00229 {
00230 HandleEntry *my_handle;
00231 NRAttribute *scope_attr;
00232 TimerCallback *timer_callback;
00233
00234
00235 GetLock(dr_mtx_);
00236
00237
00238 if (!checkSubscription(subscribe_attrs)){
00239 DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n");
00240 ReleaseLock(dr_mtx_);
00241 return FAIL;
00242 }
00243
00244
00245 my_handle = new HandleEntry;
00246 my_handle->hdl_ = next_handle_;
00247 next_handle_++;
00248 my_handle->cb_ = (NR::Callback *) cb;
00249 sub_list_.push_back(my_handle);
00250
00251
00252 my_handle->attrs_ = CopyAttrs(subscribe_attrs);
00253
00254
00255 if (!hasScope(subscribe_attrs)){
00256 scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE);
00257 my_handle->attrs_->push_back(scope_attr);
00258 }
00259
00260
00261 timer_callback = new InterestCallback(this, my_handle);
00262 timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback);
00263
00264
00265 ReleaseLock(dr_mtx_);
00266
00267 return my_handle->hdl_;
00268 }
00269
00270 int DiffusionRouting::unsubscribe(handle subscription_handle)
00271 {
00272 HandleEntry *my_handle = NULL;
00273
00274
00275 GetLock(dr_mtx_);
00276
00277 my_handle = findHandle(subscription_handle, &sub_list_);
00278 if (!my_handle){
00279
00280 ReleaseLock(dr_mtx_);
00281 return FAIL;
00282 }
00283
00284
00285 my_handle->valid_ = false;
00286
00287
00288 ReleaseLock(dr_mtx_);
00289
00290 return OK;
00291 }
00292
00293 handle DiffusionRouting::publish(NRAttrVec *publish_attrs)
00294 {
00295 HandleEntry *my_handle;
00296 NRAttribute *scope_attr;
00297
00298
00299 GetLock(dr_mtx_);
00300
00301
00302 if (!checkPublication(publish_attrs)){
00303 DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n");
00304 ReleaseLock(dr_mtx_);
00305 return FAIL;
00306 }
00307
00308
00309 my_handle = new HandleEntry;
00310 my_handle->hdl_ = next_handle_;
00311 next_handle_++;
00312 pub_list_.push_back(my_handle);
00313
00314
00315 my_handle->attrs_ = CopyAttrs(publish_attrs);
00316
00317
00318 if (!hasScope(publish_attrs)){
00319 scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE);
00320 my_handle->attrs_->push_back(scope_attr);
00321 }
00322
00323
00324 ReleaseLock(dr_mtx_);
00325
00326 return my_handle->hdl_;
00327 }
00328
00329 int DiffusionRouting::unpublish(handle publication_handle)
00330 {
00331 HandleEntry *my_handle = NULL;
00332
00333
00334 GetLock(dr_mtx_);
00335
00336 my_handle = removeHandle(publication_handle, &pub_list_);
00337 if (!my_handle){
00338
00339 ReleaseLock(dr_mtx_);
00340 return FAIL;
00341 }
00342
00343
00344 delete my_handle;
00345
00346
00347 ReleaseLock(dr_mtx_);
00348
00349 return OK;
00350 }
00351
00352 int DiffusionRouting::send(handle publication_handle,
00353 NRAttrVec *send_attrs)
00354 {
00355 Message *my_message;
00356 HandleEntry *my_handle;
00357 int8_t send_message_type = DATA;
00358 struct timeval current_time;
00359
00360
00361 GetLock(dr_mtx_);
00362
00363
00364 my_handle = findHandle(publication_handle, &pub_list_);
00365 if (!my_handle){
00366 ReleaseLock(dr_mtx_);
00367 return FAIL;
00368 }
00369
00370
00371 if (!checkSend(send_attrs)){
00372 DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the send attributes !\n");
00373 ReleaseLock(dr_mtx_);
00374 return FAIL;
00375 }
00376
00377
00378 GetTime(¤t_time);
00379
00380 if (TimevalCmp(¤t_time, &(my_handle->exploratory_time_)) >= 0){
00381
00382
00383 if (isPushData(my_handle->attrs_)){
00384
00385
00386
00387 GetTime(&(my_handle->exploratory_time_));
00388 my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY;
00389
00390 send_message_type = PUSH_EXPLORATORY_DATA;
00391 }
00392 else{
00393
00394
00395
00396 GetTime(&(my_handle->exploratory_time_));
00397 my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY;
00398
00399 send_message_type = EXPLORATORY_DATA;
00400 }
00401 }
00402
00403
00404 my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_,
00405 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00406 LOCALHOST_ADDR);
00407
00408 pkt_count_++;
00409
00410
00411 my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_);
00412
00413
00414 AddAttrs(my_message->msg_attr_vec_, send_attrs);
00415
00416
00417 my_message->num_attr_ = my_message->msg_attr_vec_->size();
00418 my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_);
00419
00420
00421 ReleaseLock(dr_mtx_);
00422
00423
00424 sendMessageToDiffusion(my_message);
00425
00426 delete my_message;
00427
00428 return OK;
00429 }
00430
00431 handle DiffusionRouting::addFilter(NRAttrVec *filter_attrs, u_int16_t priority,
00432 FilterCallback *cb)
00433 {
00434 FilterEntry *filter_entry;
00435 NRAttrVec *attrs;
00436 NRAttribute *ctrl_msg_attr;
00437 ControlMessage *control_blob;
00438 Message *my_message;
00439 TimerCallback *timer_callback;
00440
00441
00442 if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){
00443 DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !\n");
00444 return FAIL;
00445 }
00446
00447
00448 GetLock(dr_mtx_);
00449
00450
00451 filter_entry = new FilterEntry(next_handle_, priority, agent_id_);
00452 next_handle_++;
00453 filter_entry->cb_ = (FilterCallback *) cb;
00454 filter_list_.push_back(filter_entry);
00455
00456
00457 filter_entry->filter_attrs_ = CopyAttrs(filter_attrs);
00458
00459
00460 attrs = CopyAttrs(filter_attrs);
00461 control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00462 priority, filter_entry->handle_);
00463
00464 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00465 (void *)control_blob,
00466 sizeof(ControlMessage));
00467
00468 attrs->push_back(ctrl_msg_attr);
00469
00470
00471 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00472 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00473 LOCALHOST_ADDR);
00474
00475
00476 pkt_count_++;
00477
00478
00479 my_message->msg_attr_vec_ = attrs;
00480 my_message->num_attr_ = attrs->size();
00481 my_message->data_len_ = CalculateSize(attrs);
00482
00483
00484 ReleaseLock(dr_mtx_);
00485
00486
00487 sendMessageToDiffusion(my_message);
00488
00489
00490 timer_callback = new FilterKeepaliveCallback(this, filter_entry);
00491 timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback);
00492
00493
00494 delete my_message;
00495 delete control_blob;
00496
00497 return filter_entry->handle_;
00498 }
00499
00500 int DiffusionRouting::removeFilter(handle filter_handle)
00501 {
00502 FilterEntry *filter_entry = NULL;
00503 ControlMessage *control_blob;
00504 NRAttribute *ctrl_msg_attr;
00505 NRAttrVec *attrs;
00506 Message *my_message;
00507
00508
00509 GetLock(dr_mtx_);
00510
00511 filter_entry = findFilter(filter_handle);
00512 if (!filter_entry){
00513
00514 ReleaseLock(dr_mtx_);
00515 return FAIL;
00516 }
00517
00518 control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0);
00519
00520 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00521 (void *)control_blob,
00522 sizeof(ControlMessage));
00523
00524 attrs = new NRAttrVec;
00525 attrs->push_back(ctrl_msg_attr);
00526
00527 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00528 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00529 LOCALHOST_ADDR);
00530
00531
00532 pkt_count_++;
00533
00534
00535 my_message->msg_attr_vec_ = attrs;
00536 my_message->num_attr_ = attrs->size();
00537 my_message->data_len_ = CalculateSize(attrs);
00538
00539
00540 filter_entry->valid_ = false;
00541
00542
00543 sendMessageToDiffusion(my_message);
00544
00545
00546 ReleaseLock(dr_mtx_);
00547
00548
00549 delete my_message;
00550 delete control_blob;
00551
00552 return OK;
00553 }
00554
00555 handle DiffusionRouting::addTimer(int timeout, TimerCallback *callback)
00556 {
00557 return (timers_manager_->addTimer(timeout, callback));
00558 }
00559
00560 handle DiffusionRouting::addTimer(int timeout, void *p, TimerCallbacks *cb)
00561 {
00562 TimerCallback *callback;
00563
00564 callback = new OldAPITimer(cb, p);
00565
00566 return (addTimer(timeout, callback));
00567 }
00568
00569 bool DiffusionRouting::removeTimer(handle hdl)
00570 {
00571 return (timers_manager_->removeTimer(hdl));
00572 }
00573
00574 int DiffusionRouting::filterKeepaliveTimeout(FilterEntry *filter_entry)
00575 {
00576 FilterEntry *my_entry = NULL;
00577 ControlMessage *control_blob;
00578 NRAttribute *ctrl_msg_attr;
00579 NRAttrVec *attrs;
00580 Message *my_message;
00581
00582
00583 GetLock(dr_mtx_);
00584
00585 if (filter_entry->valid_){
00586
00587 control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00588 filter_entry->priority_,
00589 filter_entry->handle_);
00590
00591 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00592 (void *)control_blob,
00593 sizeof(ControlMessage));
00594
00595 attrs = CopyAttrs(filter_entry->filter_attrs_);
00596 attrs->push_back(ctrl_msg_attr);
00597
00598 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00599 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00600 LOCALHOST_ADDR);
00601
00602
00603 pkt_count_++;
00604
00605
00606 my_message->msg_attr_vec_ = attrs;
00607 my_message->num_attr_ = attrs->size();
00608 my_message->data_len_ = CalculateSize(attrs);
00609
00610
00611 sendMessageToDiffusion(my_message);
00612
00613 delete my_message;
00614 delete control_blob;
00615
00616
00617 ReleaseLock(dr_mtx_);
00618
00619
00620 return (FILTER_KEEPALIVE_DELAY);
00621 }
00622 else{
00623
00624 my_entry = deleteFilter(filter_entry->handle_);
00625
00626
00627 if (my_entry != filter_entry){
00628 DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !\n");
00629 exit(-1);
00630 }
00631
00632 delete my_entry;
00633
00634
00635 ReleaseLock(dr_mtx_);
00636
00637 return -1;
00638 }
00639 }
00640
00641 int DiffusionRouting::interestTimeout(HandleEntry *handle_entry)
00642 {
00643 HandleEntry *my_handle = NULL;
00644 Message *my_message;
00645
00646
00647 GetLock(dr_mtx_);
00648
00649 if (handle_entry->valid_){
00650
00651 my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0,
00652 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00653 LOCALHOST_ADDR);
00654
00655
00656 pkt_count_++;
00657
00658
00659 my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_);
00660 my_message->num_attr_ = handle_entry->attrs_->size();
00661 my_message->data_len_ = CalculateSize(handle_entry->attrs_);
00662
00663
00664 sendMessageToDiffusion(my_message);
00665
00666 delete my_message;
00667
00668
00669 ReleaseLock(dr_mtx_);
00670
00671
00672 return (INTEREST_DELAY +
00673 (int) ((INTEREST_JITTER * (GetRand() * 1.0 / RAND_MAX)) -
00674 (INTEREST_JITTER / 2)));
00675 }
00676 else{
00677
00678 my_handle = removeHandle(handle_entry->hdl_, &sub_list_);
00679
00680
00681 if (my_handle != handle_entry){
00682 DiffPrint(DEBUG_ALWAYS,
00683 "Error: interestTimeout: Handles should match !\n");
00684 exit(-1);
00685 }
00686
00687 delete my_handle;
00688
00689
00690 ReleaseLock(dr_mtx_);
00691
00692
00693 return -1;
00694 }
00695 }
00696
00697 int DiffusionRouting::sendMessage(Message *msg, handle h,
00698 u_int16_t priority)
00699 {
00700 RedirectMessage *original_hdr;
00701 NRAttribute *original_attr, *ctrl_msg_attr;
00702 ControlMessage *control_blob;
00703 NRAttrVec *attrs;
00704 Message *my_message;
00705
00706 if ((priority < FILTER_MIN_PRIORITY) ||
00707 (priority > FILTER_KEEP_PRIORITY))
00708 return FAIL;
00709
00710
00711 original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
00712 msg->source_port_, msg->data_len_,
00713 msg->num_attr_, msg->rdm_id_,
00714 msg->pkt_num_, msg->next_hop_,
00715 msg->last_hop_, 0,
00716 msg->next_port_);
00717
00718 original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr,
00719 sizeof(RedirectMessage));
00720
00721
00722 control_blob = new ControlMessage(SEND_MESSAGE, h, priority);
00723
00724 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob,
00725 sizeof(ControlMessage));
00726
00727
00728 attrs = CopyAttrs(msg->msg_attr_vec_);
00729 attrs->push_back(original_attr);
00730 attrs->push_back(ctrl_msg_attr);
00731
00732 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00733 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00734 LOCALHOST_ADDR);
00735
00736
00737 pkt_count_++;
00738
00739
00740 my_message->msg_attr_vec_ = attrs;
00741 my_message->num_attr_ = attrs->size();
00742 my_message->data_len_ = CalculateSize(attrs);
00743
00744
00745 sendMessageToDiffusion(my_message);
00746
00747 delete my_message;
00748 delete control_blob;
00749 delete original_hdr;
00750
00751 return OK;
00752 }
00753
00754 #ifndef NS_DIFFUSION
00755 void DiffusionRouting::doIt()
00756 {
00757 run(true, WAIT_FOREVER);
00758 }
00759
00760 void DiffusionRouting::doOne(long timeout)
00761 {
00762 run(false, timeout);
00763 }
00764
00765 void DiffusionRouting::run(bool wait_condition, long max_timeout)
00766 {
00767 DeviceList::iterator itr;
00768 int status, max_sock, fd;
00769 bool flag;
00770 DiffPacket in_pkt;
00771 fd_set fds;
00772 struct timeval tv;
00773 struct timeval max_tv;
00774
00775 do{
00776 FD_ZERO(&fds);
00777 max_sock = 0;
00778
00779
00780 max_tv.tv_sec = (int) (max_timeout / 1000);
00781 max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000);
00782
00783 for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
00784 (*itr)->addInFDS(&fds, &max_sock);
00785 }
00786
00787
00788 timers_manager_->nextTimerTime(&tv);
00789
00790 if (tv.tv_sec == MAXVALUE){
00791
00792 if (max_timeout == WAIT_FOREVER){
00793 tv.tv_sec = POLLING_INTERVAL;
00794 tv.tv_usec = 0;
00795 }
00796 else{
00797 tv = max_tv;
00798 }
00799 }
00800 else{
00801 if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){
00802
00803
00804 tv = max_tv;
00805 }
00806 }
00807
00808 status = select(max_sock+1, &fds, NULL, NULL, &tv);
00809
00810 if (status == 0){
00811
00812 timers_manager_->executeAllExpiredTimers();
00813 }
00814
00815 if (status > 0){
00816 do{
00817 flag = false;
00818 for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
00819 fd = (*itr)->checkInFDS(&fds);
00820 if (fd != 0){
00821
00822 in_pkt = (*itr)->recvPacket(fd);
00823 recvPacket(in_pkt);
00824
00825
00826 FD_CLR(fd, &fds);
00827 status--;
00828 flag = true;
00829 }
00830 }
00831 } while ((status > 0) && (flag == true));
00832 }
00833 else
00834 if (status < 0){
00835 DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
00836 }
00837 } while (wait_condition);
00838 }
00839
00840 #endif // NS_DIFFUSION
00841
00842 #ifndef NS_DIFFUSION
00843 void DiffusionRouting::sendMessageToDiffusion(Message *msg)
00844 {
00845 DiffPacket out_pkt = NULL;
00846 struct hdr_diff *dfh;
00847 char *pos;
00848 int len;
00849
00850 out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00851 dfh = HDR_DIFF(out_pkt);
00852
00853 pos = (char *) out_pkt;
00854 pos = pos + sizeof(struct hdr_diff);
00855
00856 len = PackAttrs(msg->msg_attr_vec_, pos);
00857
00858 LAST_HOP(dfh) = htonl(msg->last_hop_);
00859 NEXT_HOP(dfh) = htonl(msg->next_hop_);
00860 DIFF_VER(dfh) = msg->version_;
00861 MSG_TYPE(dfh) = msg->msg_type_;
00862 DATA_LEN(dfh) = htons(len);
00863 PKT_NUM(dfh) = htonl(msg->pkt_num_);
00864 RDM_ID(dfh) = htonl(msg->rdm_id_);
00865 NUM_ATTR(dfh) = htons(msg->num_attr_);
00866 SRC_PORT(dfh) = htons(msg->source_port_);
00867
00868 sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_);
00869
00870 delete [] out_pkt;
00871 }
00872 #else
00873 void DiffusionRouting::sendMessageToDiffusion(Message *msg)
00874 {
00875 Message *my_msg;
00876 DeviceList::iterator itr;
00877 int len;
00878
00879 my_msg = CopyMessage(msg);
00880 len = CalculateSize(my_msg->msg_attr_vec_);
00881 len = len + sizeof(struct hdr_diff);
00882
00883 for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
00884 (*itr)->sendPacket((DiffPacket) my_msg, len, diffusion_port_);
00885 }
00886 }
00887 #endif // !NS_DIFFUSION
00888
00889 void DiffusionRouting::sendPacketToDiffusion(DiffPacket pkt, int len, int dst)
00890 {
00891 DeviceList::iterator itr;
00892
00893 for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
00894 (*itr)->sendPacket(pkt, len, dst);
00895 }
00896 }
00897
00898 #ifndef NS_DIFFUSION
00899 void DiffusionRouting::recvPacket(DiffPacket pkt)
00900 {
00901 struct hdr_diff *dfh = HDR_DIFF(pkt);
00902 Message *rcv_message = NULL;
00903 int8_t version, msg_type;
00904 u_int16_t data_len, num_attr, source_port;
00905 int32_t pkt_num, rdm_id, next_hop, last_hop;
00906
00907
00908 version = DIFF_VER(dfh);
00909 msg_type = MSG_TYPE(dfh);
00910 source_port = ntohs(SRC_PORT(dfh));
00911 pkt_num = ntohl(PKT_NUM(dfh));
00912 rdm_id = ntohl(RDM_ID(dfh));
00913 num_attr = ntohs(NUM_ATTR(dfh));
00914 next_hop = ntohl(NEXT_HOP(dfh));
00915 last_hop = ntohl(LAST_HOP(dfh));
00916 data_len = ntohs(DATA_LEN(dfh));
00917
00918
00919 rcv_message = new Message(version, msg_type, source_port, data_len,
00920 num_attr, pkt_num, rdm_id, next_hop, last_hop);
00921
00922
00923 rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
00924
00925
00926 recvMessage(rcv_message);
00927
00928
00929 delete rcv_message;
00930 delete [] pkt;
00931 }
00932 #endif // !NS_DIFFUSION
00933
00934 void DiffusionRouting::recvMessage(Message *msg)
00935 {
00936
00937 if (msg->version_ != DIFFUSION_VERSION)
00938 return;
00939
00940
00941 if (msg->next_hop_ != LOCALHOST_ADDR)
00942 return;
00943
00944
00945 if (msg->msg_type_ == REDIRECT)
00946 processControlMessage(msg);
00947 else
00948 processMessage(msg);
00949 }
00950
00951 void DiffusionRouting::processControlMessage(Message *msg)
00952 {
00953 NRSimpleAttribute<void *> *original_header_attr = NULL;
00954 NRAttrVec::iterator place = msg->msg_attr_vec_->begin();
00955 RedirectMessage *original_header;
00956 FilterEntry *entry;
00957 handle my_handle;
00958
00959
00960 original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
00961 place, &place);
00962 if (!original_header_attr){
00963 DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !\n");
00964 return;
00965 }
00966
00967
00968 original_header = (RedirectMessage *) original_header_attr->getVal();
00969 my_handle = original_header->handle_;
00970 msg->msg_type_ = original_header->msg_type_;
00971 msg->source_port_ = original_header->source_port_;
00972 msg->pkt_num_ = original_header->pkt_num_;
00973 msg->rdm_id_ = original_header->rdm_id_;
00974 msg->next_hop_ = original_header->next_hop_;
00975 msg->last_hop_ = original_header->last_hop_;
00976 msg->num_attr_ = original_header->num_attr_;
00977 msg->new_message_ = original_header->new_message_;
00978 msg->next_port_ = original_header->next_port_;
00979
00980
00981 msg->msg_attr_vec_->erase(place);
00982 delete original_header_attr;
00983
00984
00985 GetLock(dr_mtx_);
00986
00987 entry = findFilter(my_handle);
00988 if (entry && entry->valid_){
00989
00990 if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){
00991 ReleaseLock(dr_mtx_);
00992 entry->cb_->recv(msg, my_handle);
00993 return;
00994 }
00995 else{
00996 DiffPrint(DEBUG_ALWAYS,
00997 "Warning: Filter doesn't match incoming message's attributes !\n");
00998 }
00999 }
01000 else{
01001 DiffPrint(DEBUG_IMPORTANT,
01002 "Report: Cannot find filter (possibly deleted ?)\n");
01003 }
01004
01005 ReleaseLock(dr_mtx_);
01006 }
01007
01008 void DiffusionRouting::processMessage(Message *msg)
01009 {
01010 CallbackList cbl;
01011 CallbackEntry *aux;
01012 HandleList::iterator sub_itr;
01013 CallbackList::iterator cbl_itr;
01014 HandleEntry *entry;
01015 NRAttrVec *callback_attrs;
01016
01017
01018 GetLock(dr_mtx_);
01019
01020 for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){
01021 entry = *sub_itr;
01022 if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_)))
01023 if (entry->cb_){
01024 aux = new CallbackEntry(entry->cb_, entry->hdl_);
01025 cbl.push_back(aux);
01026 }
01027 }
01028
01029
01030 ReleaseLock(dr_mtx_);
01031
01032
01033 for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){
01034
01035 callback_attrs = CopyAttrs(msg->msg_attr_vec_);
01036
01037
01038 aux = *cbl_itr;
01039 aux->cb_->recv(callback_attrs, aux->subscription_handle_);
01040 delete aux;
01041
01042
01043 ClearAttrs(callback_attrs);
01044 delete callback_attrs;
01045 }
01046
01047
01048 cbl.clear();
01049 }
01050
01051 HandleEntry * DiffusionRouting::removeHandle(handle my_handle, HandleList *hl)
01052 {
01053 HandleList::iterator itr;
01054 HandleEntry *entry;
01055
01056 for (itr = hl->begin(); itr != hl->end(); ++itr){
01057 entry = *itr;
01058 if (entry->hdl_ == my_handle){
01059 hl->erase(itr);
01060 return entry;
01061 }
01062 }
01063 return NULL;
01064 }
01065
01066 HandleEntry * DiffusionRouting::findHandle(handle my_handle, HandleList *hl)
01067 {
01068 HandleList::iterator itr;
01069 HandleEntry *entry;
01070
01071 for (itr = hl->begin(); itr != hl->end(); ++itr){
01072 entry = *itr;
01073 if (entry->hdl_ == my_handle)
01074 return entry;
01075 }
01076 return NULL;
01077 }
01078
01079 FilterEntry * DiffusionRouting::deleteFilter(handle my_handle)
01080 {
01081 FilterList::iterator itr;
01082 FilterEntry *entry;
01083
01084 for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01085 entry = *itr;
01086 if (entry->handle_ == my_handle){
01087 filter_list_.erase(itr);
01088 return entry;
01089 }
01090 }
01091 return NULL;
01092 }
01093
01094 FilterEntry * DiffusionRouting::findFilter(handle my_handle)
01095 {
01096 FilterList::iterator itr;
01097 FilterEntry *entry;
01098
01099 for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01100 entry = *itr;
01101 if (entry->handle_ == my_handle)
01102 return entry;
01103 }
01104 return NULL;
01105 }
01106
01107 bool DiffusionRouting::hasScope(NRAttrVec *attrs)
01108 {
01109 NRAttribute *temp = NULL;
01110
01111 temp = NRScopeAttr.find(attrs);
01112 if (temp)
01113 return true;
01114
01115 return false;
01116 }
01117
01118 bool DiffusionRouting::checkSubscription(NRAttrVec *attrs)
01119 {
01120 NRSimpleAttribute<int> *nrclass = NULL;
01121 NRSimpleAttribute<int> *nrscope = NULL;
01122
01123
01124 nrclass = NRClassAttr.find(attrs);
01125 nrscope = NRScopeAttr.find(attrs);
01126
01127
01128 if (!nrclass)
01129 return false;
01130
01131 if (nrscope){
01132
01133
01134
01135
01136
01137 if (nrscope->getOp() != NRAttribute::IS)
01138 return false;
01139
01140
01141 if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) &&
01142 (nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01143 (nrclass->getOp() == NRAttribute::IS))
01144 return true;
01145
01146
01147 if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01148 return true;
01149
01150
01151 return false;
01152 }
01153
01154
01155
01156 if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01157 (nrclass->getOp() == NRAttribute::IS))
01158 return true;
01159
01160 return false;
01161 }
01162
01163 bool DiffusionRouting::checkPublication(NRAttrVec *attrs)
01164 {
01165 NRSimpleAttribute<int> *nrclass = NULL;
01166 NRSimpleAttribute<int> *nrscope = NULL;
01167
01168
01169 nrclass = NRClassAttr.find(attrs);
01170 nrscope = NRScopeAttr.find(attrs);
01171
01172
01173 if (!nrclass)
01174 return false;
01175
01176
01177
01178 if ((nrclass->getVal() != NRAttribute::DATA_CLASS) ||
01179 (nrclass->getOp() != NRAttribute::IS))
01180 return false;
01181
01182 if (nrscope){
01183
01184
01185
01186
01187
01188 if (nrscope->getOp() != NRAttribute::IS)
01189 return false;
01190
01191
01192 if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) ||
01193 (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE))
01194 return true;
01195
01196
01197 return false;
01198 }
01199
01200
01201
01202 return true;
01203 }
01204
01205 bool DiffusionRouting::checkSend(NRAttrVec *attrs)
01206 {
01207 NRSimpleAttribute<int> *nrclass = NULL;
01208 NRSimpleAttribute<int> *nrscope = NULL;
01209
01210
01211 nrclass = NRClassAttr.find(attrs);
01212 nrscope = NRScopeAttr.find(attrs);
01213
01214 if (nrclass || nrscope)
01215 return false;
01216
01217 return true;
01218 }
01219
01220 bool DiffusionRouting::isPushData(NRAttrVec *attrs)
01221 {
01222 NRSimpleAttribute<int> *nrclass = NULL;
01223 NRSimpleAttribute<int> *nrscope = NULL;
01224
01225
01226 nrclass = NRClassAttr.find(attrs);
01227 nrscope = NRScopeAttr.find(attrs);
01228
01229
01230 if (nrclass && nrscope){
01231 if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01232 return false;
01233 return true;
01234 }
01235 else{
01236 DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !\n");
01237 return false;
01238 }
01239 }