00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <assert.h>
00034 #include <math.h>
00035 #include <stdio.h>
00036 #include <signal.h>
00037 #include <float.h>
00038 #include <stdlib.h>
00039
00040 #include <tcl.h>
00041
00042
00043 #include "diff_header.h"
00044 #include "agent.h"
00045 #include "tclcl.h"
00046 #include "ip.h"
00047 #include "config.h"
00048 #include "packet.h"
00049 #include "trace.h"
00050 #include "random.h"
00051 #include "classifier.h"
00052 #include "node.h"
00053 #include "omni_mcast.h"
00054 #include "iflist.h"
00055 #include "hash_table.h"
00056 #include "arp.h"
00057 #include "mac.h"
00058 #include "ll.h"
00059 #include "dsr/path.h"
00060 #include "god.h"
00061
00062 static class OmniMcastClass : public TclClass {
00063 public:
00064 OmniMcastClass() : TclClass("Agent/OmniMcast") {}
00065 TclObject* create(int argc, const char*const* argv) {
00066 return(new OmniMcastAgent());
00067 }
00068 } class_omni_mcast;
00069
00070
00071 void OmniMcastArpBufferTimer::expire(Event *e)
00072 {
00073 a_->ArpBufferCheck();
00074 resched(ARP_BUFFER_CHECK + ARP_BUFFER_CHECK *
00075 (double) ((int) e>>5 & 0xff) /256.0);
00076 }
00077
00078 void OmniMcastSendBufTimer::expire(Event *e)
00079 {
00080 a_->SendBufferCheck();
00081 resched(SEND_BUFFER_CHECK + SEND_BUFFER_CHECK * (double) ((int) e>>5 & 0xff)/256.0);
00082 }
00083
00084
00085 void OmniMcastAgent::DataForSink(Packet *pkt)
00086 {
00087 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00088 unsigned int dtype = dfh->data_type;
00089 Agent_List *cur_agent;
00090 Packet *cur_pkt;
00091 hdr_cdiff *cur_dfh;
00092 hdr_ip *cur_iph;
00093
00094
00095 for (cur_agent= (routing_table[dtype]).sink; cur_agent != NULL;
00096 cur_agent= AGENT_NEXT(cur_agent) ) {
00097
00098 cur_pkt = pkt->copy();
00099 cur_iph = HDR_IP(cur_pkt);
00100 cur_iph->dst_ = AGT_ADDR(cur_agent);
00101
00102 cur_dfh = HDR_CDIFF(cur_pkt);
00103 cur_dfh->forward_agent_id = here_;
00104 cur_dfh->num_next = 1;
00105 cur_dfh->next_nodes[0] = NODE_ADDR(cur_agent);
00106
00107 send_to_dmux(cur_pkt, 0);
00108 }
00109 }
00110
00111
00112 void OmniMcastAgent::GodForwardData(Packet *pkt)
00113 {
00114 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00115 unsigned int dtype = dfh->data_type;
00116 Packet *cur_pkt;
00117 hdr_cdiff *cur_dfh;
00118 hdr_ip *cur_iph;
00119 nsaddr_t src_node = (dfh->sender_id).addr_;
00120 int ret_num_oif;
00121 int *next_oif= God::instance()->NextOIFs(dtype, src_node, THIS_NODE,
00122 &ret_num_oif);
00123
00124 if (ret_num_oif == 0) {
00125 Packet::free(pkt);
00126 return;
00127 }
00128
00129 assert(next_oif != NULL);
00130
00131 for (int i=0; i<ret_num_oif; i++) {
00132 cur_pkt = pkt->copy();
00133 cur_iph = HDR_IP(cur_pkt);
00134
00135 (cur_iph->dst_).addr_ = next_oif[i];
00136 (cur_iph->dst_).port_ = ROUTING_PORT;
00137
00138 cur_dfh = HDR_CDIFF(cur_pkt);
00139 cur_dfh->forward_agent_id = here_;
00140 cur_dfh->num_next = 1;
00141 cur_dfh->next_nodes[0] = next_oif[i];
00142
00143 MACprepare(cur_pkt, next_oif[i], NS_AF_INET, MAC_RETRY_);
00144 MACsend(cur_pkt, 0);
00145 }
00146
00147 delete []next_oif;
00148 Packet::free(pkt);
00149 }
00150
00151
00152 Packet *OmniMcastAgent::prepare_message(unsigned int dtype, ns_addr_t to_addr,
00153 int msg_type)
00154 {
00155 Packet *pkt;
00156 hdr_cdiff *dfh;
00157 hdr_ip *iph;
00158
00159 pkt = create_packet();
00160 dfh = HDR_CDIFF(pkt);
00161 iph = HDR_IP(pkt);
00162
00163 dfh->mess_type = msg_type;
00164 dfh->pk_num = pk_count;
00165 pk_count++;
00166 dfh->sender_id = here_;
00167 dfh->data_type = dtype;
00168 dfh->forward_agent_id = here_;
00169
00170 dfh->ts_ = NOW;
00171 dfh->num_next = 1;
00172 dfh->next_nodes[0] = to_addr.addr_;
00173
00174 iph->src_ = here_;
00175 iph->dst_ = to_addr;
00176
00177 return pkt;
00178 }
00179
00180
00181 OmniMcastAgent::OmniMcastAgent() : Agent(PT_DIFF), arp_buf_timer(this),
00182 send_buf_timer(this)
00183 {
00184 pk_count = 0;
00185 target_ = 0;
00186 node = NULL;
00187 tracetarget = NULL;
00188 }
00189
00190
00191 void OmniMcastAgent::recv(Packet* packet, Handler*)
00192 {
00193 hdr_cdiff* dfh = HDR_CDIFF(packet);
00194
00195
00196
00197 Pkt_Hash_Entry *hashPtr= PktTable.GetHash(dfh->sender_id, dfh->pk_num);
00198
00199
00200
00201 if (hashPtr != NULL) {
00202 Packet::free(packet);
00203 return;
00204 }
00205
00206
00207
00208 PktTable.put_in_hash(dfh);
00209
00210
00211
00212 ConsiderNew(packet);
00213 }
00214
00215
00216 void OmniMcastAgent::ConsiderNew(Packet *pkt)
00217 {
00218 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00219 unsigned char msg_type = dfh->mess_type;
00220 unsigned int dtype = dfh->data_type;
00221
00222 Pkt_Hash_Entry *hashPtr;
00223 Agent_List *agentPtr;
00224 PrvCurPtr RetVal;
00225 nsaddr_t from_nodeID, forward_nodeID;
00226
00227 Packet *gen_pkt;
00228 hdr_cdiff *gen_dfh;
00229
00230 switch (msg_type) {
00231 case INTEREST :
00232
00233 hashPtr = PktTable.GetHash(dfh->sender_id, dfh->pk_num);
00234
00235
00236
00237
00238 from_nodeID = (dfh->sender_id).addr_;
00239 forward_nodeID = (dfh->forward_agent_id).addr_;
00240
00241
00242 if (THIS_NODE == from_nodeID) {
00243
00244
00245
00246
00247 RetVal = INTF_FIND(routing_table[dtype].sink, dfh->sender_id);
00248
00249 if (RetVal.cur == NULL) {
00250
00251 agentPtr = new Agent_List;
00252 AGT_ADDR(agentPtr) = dfh->sender_id;
00253 INTF_INSERT(routing_table[dtype].sink, agentPtr);
00254
00255 God::instance()->AddSink(dtype, THIS_NODE);
00256 }
00257
00258 }
00259
00260 Packet::free(pkt);
00261 return;
00262
00263
00264 case DATA_READY :
00265
00266
00267
00268 agentPtr = new Agent_List;
00269 AGT_ADDR(agentPtr) = dfh->sender_id;
00270 agentPtr->next = routing_table[dtype].source;
00271 routing_table[dtype].source = agentPtr;
00272
00273 God::instance()->AddSource(dtype, (dfh->sender_id).addr_);
00274 gen_pkt = prepare_message(dtype, dfh->sender_id, DATA_REQUEST);
00275 gen_dfh = HDR_CDIFF(gen_pkt);
00276 gen_dfh->report_rate = ORIGINAL;
00277 send_to_dmux(gen_pkt, 0);
00278 Packet::free(pkt);
00279 return;
00280
00281
00282 case DATA :
00283
00284 DataForSink(pkt);
00285 GodForwardData(pkt);
00286 return;
00287
00288
00289 default :
00290
00291 Packet::free(pkt);
00292 break;
00293 }
00294 }
00295
00296 void OmniMcastAgent::Terminate()
00297 {
00298 #ifdef DEBUG_OUTPUT
00299 printf("node %d: remaining energy %f, initial energy %f\n", THIS_NODE,
00300 node->energy_model()->energy(),
00301 node->energy_model()->initialenergy() );
00302 #endif
00303 }
00304
00305 void OmniMcastAgent::Start()
00306 {
00307 arp_buf_timer.sched(ARP_BUFFER_CHECK + ARP_BUFFER_CHECK *
00308 Random::uniform(1.0));
00309 send_buf_timer.sched(SEND_BUFFER_CHECK + SEND_BUFFER_CHECK *
00310 Random::uniform(1.0));
00311 }
00312
00313
00314 void OmniMcastAgent::StopSource()
00315 {
00316 Agent_List *cur;
00317
00318 for (int i=0; i<MAX_DATA_TYPE; i++) {
00319 for (cur=routing_table[i].source; cur!=NULL; cur=AGENT_NEXT(cur) ) {
00320 SEND_MESSAGE(i, AGT_ADDR(cur), DATA_STOP);
00321 }
00322 }
00323 }
00324
00325
00326 Packet * OmniMcastAgent:: create_packet()
00327 {
00328 Packet *pkt = allocpkt();
00329
00330 if (pkt==NULL) return NULL;
00331
00332 hdr_cmn* cmh = HDR_CMN(pkt);
00333 cmh->size() = 36;
00334
00335 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00336 dfh->ts_ = NOW;
00337 return pkt;
00338 }
00339
00340
00341 void OmniMcastAgent::MACprepare(Packet *pkt, nsaddr_t next_hop,
00342 unsigned int type, bool lk_dtct)
00343 {
00344 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00345 hdr_cmn* cmh = HDR_CMN(pkt);
00346 hdr_ip* iph = HDR_IP(pkt);
00347
00348 dfh->forward_agent_id = here_;
00349 if (type == NS_AF_ILINK && next_hop == (nsaddr_t)MAC_BROADCAST) {
00350 cmh->xmit_failure_ = 0;
00351 cmh->next_hop() = MAC_BROADCAST;
00352 cmh->addr_type() = NS_AF_ILINK;
00353 cmh->direction() = hdr_cmn::DOWN;
00354
00355
00356 iph->src_ = here_;
00357 iph->dst_.addr_ = next_hop;
00358 iph->dst_.port_ = ROUTING_PORT;
00359
00360 dfh->num_next = 1;
00361 dfh->next_nodes[0] = next_hop;
00362
00363 return;
00364 }
00365
00366 if (lk_dtct != 0) {
00367 cmh->xmit_failure_ = OmniMcastXmitFailedCallback;
00368 cmh->xmit_failure_data_ = (void *) this;
00369 }
00370 else {
00371 cmh->xmit_failure_ = 0;
00372 }
00373
00374 cmh->direction() = hdr_cmn::DOWN;
00375 cmh->next_hop() = next_hop;
00376 cmh->addr_type() = type;
00377
00378 iph->src_ = here_;
00379 iph->dst_.addr_ = next_hop;
00380 iph->dst_.port_ = ROUTING_PORT;
00381
00382 dfh->num_next = 1;
00383 dfh->next_nodes[0] = next_hop;
00384 }
00385
00386
00387 void OmniMcastAgent::MACsend(Packet *pkt, Time delay)
00388 {
00389 hdr_cmn* cmh = HDR_CMN(pkt);
00390 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00391
00392 if (dfh->mess_type == DATA)
00393 cmh->size() = (God::instance()->data_pkt_size) + 4*(dfh->num_next - 1);
00394 else
00395 cmh->size() = 36 + 4*(dfh->num_next -1);
00396
00397 Scheduler::instance().schedule(ll, pkt, delay);
00398 }
00399
00400
00401 void OmniMcastXmitFailedCallback(Packet *pkt, void *data)
00402 {
00403 OmniMcastAgent *agent = (OmniMcastAgent *)data;
00404 agent->xmitFailed(pkt);
00405 }
00406
00407
00408 void OmniMcastAgent::xmitFailed(Packet *pkt)
00409 {
00410
00411 }
00412
00413
00414 void OmniMcastAgent::StickPacketInArpBuffer(Packet *pkt)
00415 {
00416 Time min = DBL_MAX;
00417 int min_index = 0;
00418 int c;
00419
00420 for (c=0; c < ARP_BUF_SIZE; c++) {
00421 if (arp_buf[c].p == NULL) {
00422 arp_buf[c].t = NOW;
00423 arp_buf[c].attempt = 1;
00424 arp_buf[c].p = pkt;
00425 return;
00426 }
00427 else if (arp_buf[c].t < min) {
00428 min = arp_buf[c].t;
00429 min_index = c;
00430 }
00431 }
00432
00433
00434
00435 ARPEntry *llinfo;
00436 hdr_cmn* cmh = HDR_CMN(arp_buf[min_index].p);
00437
00438 llinfo= arp_table->arplookup(cmh->next_hop());
00439
00440 if (llinfo == 0) {
00441
00442 xmitFailed(arp_buf[min_index].p);
00443 }
00444 else
00445 MACsend(arp_buf[min_index].p, 0);
00446
00447
00448
00449
00450 arp_buf[min_index].t = NOW;
00451 arp_buf[min_index].attempt = 1;
00452 arp_buf[min_index].p = pkt;
00453 }
00454
00455
00456 void OmniMcastAgent::ArpBufferCheck()
00457 {
00458 int c;
00459 ARPEntry *llinfo;
00460 hdr_cmn* cmh;
00461
00462 for (c = 0; c < ARP_BUF_SIZE; c++) {
00463 if (arp_buf[c].p == NULL)
00464 continue;
00465
00466 cmh = HDR_CMN(arp_buf[c].p);
00467 llinfo= arp_table->arplookup(cmh->next_hop());
00468 if (llinfo != 0) {
00469 MACsend(arp_buf[c].p, 0);
00470 arp_buf[c].p = NULL;
00471 continue;}
00472
00473 if (arp_buf[c].attempt > ARP_MAX_ATTEMPT) {
00474
00475 xmitFailed(arp_buf[c].p);
00476 arp_buf[c].p = NULL;
00477 continue;
00478 }
00479
00480 arp_table->arprequest(THIS_NODE, cmh->next_hop(), (LL *)ll);
00481 arp_buf[c].attempt ++;
00482 }
00483 }
00484
00485
00486 void OmniMcastAgent::StickPacketInSendBuffer(Packet *p)
00487 {
00488 Time min = DBL_MAX;
00489 int min_index = 0;
00490 int c;
00491
00492 for (c = 0 ; c < SEND_BUF_SIZE ; c ++) {
00493 if (send_buf[c].p == NULL)
00494 {
00495 send_buf[c].t = NOW;
00496 send_buf[c].p = p;
00497 return;
00498 }
00499 else if (send_buf[c].t < min)
00500 {
00501 min = send_buf[c].t;
00502 min_index = c;
00503 }
00504 }
00505
00506
00507
00508 if (send_buf[min_index].p != NULL) {
00509 MACsend(send_buf[min_index].p, 0);
00510 }
00511
00512
00513
00514 send_buf[min_index].t = Scheduler::instance().clock();
00515 send_buf[min_index].p = p;
00516 }
00517
00518
00519 void OmniMcastAgent::SendBufferCheck()
00520 {
00521 for (int c = 0; c < SEND_BUF_SIZE; c++) {
00522 if (send_buf[c].p != NULL) {
00523 MACsend(send_buf[c].p, 0);
00524 send_buf[c].p = NULL;
00525 }
00526 }
00527 }
00528
00529
00530 void OmniMcastAgent::trace (char *fmt,...)
00531 {
00532 va_list ap;
00533
00534 if (!tracetarget)
00535 return;
00536
00537 va_start (ap, fmt);
00538 vsprintf (tracetarget->pt_->buffer (), fmt, ap);
00539 tracetarget->pt_->dump ();
00540 va_end (ap);
00541 }
00542
00543
00544
00545 int OmniMcastAgent::command(int argc, const char*const* argv)
00546 {
00547 Tcl& tcl = Tcl::instance();
00548
00549 if (argc == 2) {
00550
00551 if (strcasecmp(argv[1], "reset-state")==0) {
00552 reset();
00553 return TCL_OK;
00554 }
00555
00556 if (strcasecmp(argv[1], "reset")==0) {
00557 return Agent::command(argc, argv);
00558 }
00559
00560 if (strcasecmp(argv[1], "start")==0) {
00561 Start();
00562 return TCL_OK;
00563 }
00564
00565 if (strcasecmp(argv[1], "stop")==0) {
00566 return TCL_OK;
00567 }
00568
00569 if (strcasecmp(argv[1], "terminate")==0) {
00570 Terminate();
00571 return TCL_OK;
00572 }
00573
00574 if (strcasecmp(argv[1], "stop-source")==0) {
00575 StopSource();
00576 return TCL_OK;
00577 }
00578
00579 } else if (argc == 3) {
00580
00581 if (strcasecmp(argv[1], "on-node")==0) {
00582 node = (Node *)tcl.lookup(argv[2]);
00583 return TCL_OK;
00584 }
00585
00586 if (strcasecmp(argv[1], "add-ll") == 0) {
00587
00588 TclObject *obj;
00589
00590 if ( (obj = TclObject::lookup(argv[2])) == 0) {
00591 fprintf(stderr, "OmniMcast Node: %d lookup of %s failed\n", THIS_NODE,
00592 argv[2]);
00593 return TCL_ERROR;
00594 }
00595 ll = (NsObject *) obj;
00596
00597
00598 arp_table = ((LL *)ll)->arp_table();
00599 if (arp_table == NULL)
00600 return TCL_ERROR;
00601
00602 return TCL_OK;
00603 }
00604
00605 if (strcasecmp (argv[1], "tracetarget") == 0) {
00606 TclObject *obj;
00607 if ((obj = TclObject::lookup (argv[2])) == 0) {
00608 fprintf (stderr, "%s: %s lookup of %s failed\n", __FILE__, argv[1],
00609 argv[2]);
00610 return TCL_ERROR;
00611 }
00612
00613 tracetarget = (Trace *) obj;
00614 return TCL_OK;
00615 }
00616
00617 if (strcasecmp(argv[1], "port-dmux") == 0) {
00618
00619 TclObject *obj;
00620
00621 if ( (obj = TclObject::lookup(argv[2])) == 0) {
00622 fprintf(stderr, "OmniMcast Node: %d lookup of %s failed\n", THIS_NODE,
00623 argv[2]);
00624 return TCL_ERROR;
00625 }
00626 port_dmux = (NsObject *) obj;
00627 return TCL_OK;
00628 }
00629
00630 }
00631
00632 return Agent::command(argc, argv);
00633 }
00634
00635
00636 void OmniMcastAgent::reset()
00637 {
00638 PktTable.reset();
00639
00640 for (int i=0; i<MAX_DATA_TYPE; i++) {
00641 routing_table[i].reset();
00642 }
00643 clear_arp_buf();
00644 clear_send_buf();
00645 }
00646
00647
00648 void OmniMcastAgent::clear_arp_buf()
00649 {
00650 for (int i=0; i<ARP_BUF_SIZE; i++) {
00651 arp_buf[i].t = 0;
00652 arp_buf[i].attempt = 0;
00653 if (arp_buf[i].p != NULL)
00654 Packet::free(arp_buf[i].p);
00655 arp_buf[i].p = NULL;
00656 }
00657 }
00658
00659
00660 void OmniMcastAgent::clear_send_buf()
00661 {
00662 for (int i=0; i<SEND_BUF_SIZE; i++) {
00663 send_buf[i].t = 0;
00664 if (send_buf[i].p != NULL)
00665 Packet::free(send_buf[i].p);
00666 send_buf[i].p = NULL;
00667 }
00668 }
00669
00670
00671 void OmniMcast_Entry::reset()
00672 {
00673 clear_agentlist(source);
00674 clear_agentlist(sink);
00675 source = NULL;
00676 sink = NULL;
00677 }
00678
00679
00680 void OmniMcast_Entry::clear_agentlist(Agent_List *list)
00681 {
00682 Agent_List *cur=list;
00683 Agent_List *temp = NULL;
00684
00685 while (cur != NULL) {
00686 temp = AGENT_NEXT(cur);
00687 delete cur;
00688 cur = temp;
00689 }
00690 }
00691
00692
00693