00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include <assert.h>
00033 #include <math.h>
00034 #include <stdio.h>
00035 #include <signal.h>
00036 #include <float.h>
00037
00038 #include <tcl.h>
00039 #include <stdlib.h>
00040
00041 #include "diff_header.h"
00042 #include "agent.h"
00043 #include "tclcl.h"
00044 #include "ip.h"
00045 #include "config.h"
00046 #include "packet.h"
00047 #include "trace.h"
00048 #include "random.h"
00049 #include "classifier.h"
00050 #include "node.h"
00051 #include "diffusion.h"
00052 #include "iflist.h"
00053 #include "hash_table.h"
00054 #include "arp.h"
00055 #include "mac.h"
00056 #include "ll.h"
00057 #include "dsr/path.h"
00058 #include "god.h"
00059 #include "routing_table.h"
00060 #include "diff_rate.h"
00061
00062 extern char *MsgStr[];
00063
00064 static class DiffusionRateClass : public TclClass {
00065 public:
00066 DiffusionRateClass() : TclClass("Agent/Diffusion/RateGradient") {}
00067 TclObject* create(int , const char*const* ) {
00068 return(new DiffusionRate());
00069 }
00070 } class_diffusion_rate;
00071
00072
00073 void GradientTimer::expire(Event *)
00074 {
00075 a_->GradientTimeOut();
00076 }
00077
00078
00079 void NegativeReinforceTimer::expire(Event *)
00080 {
00081 a_->NegReinfTimeOut();
00082 }
00083
00084
00085 DiffusionRate::DiffusionRate() : DiffusionAgent()
00086 {
00087 DUP_SUP_ = true;
00088
00089 sub_type_ = BCAST_SUB;
00090 org_type_ = UNICAST_ORG;
00091 pos_type_ = POS_ALL;
00092 pos_node_type_ = INTM_POS;
00093 neg_win_type_ = NEG_TIMER;
00094 neg_thr_type_ = NEG_ABSOLUTE;
00095 neg_max_type_ = NEG_FIXED_MAX;
00096
00097 num_not_send_bcast_data = 0;
00098 num_data_bcast_send = 0;
00099 num_data_bcast_rcv = 0;
00100 num_neg_bcast_send = 0;
00101 num_neg_bcast_rcv = 0;
00102 }
00103
00104
00105 void DiffusionRate::recv(Packet* packet, Handler*)
00106 {
00107 hdr_cdiff* dfh = HDR_CDIFF(packet);
00108
00109
00110
00111 Pkt_Hash_Entry *hashPtr= PktTable.GetHash(dfh->sender_id, dfh->pk_num);
00112
00113
00114 #ifdef DEBUG_RATE
00115 printf("DF node %x recv %s (%x, %x, %d)\n",
00116 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00117 (dfh->sender_id).port_, dfh->pk_num);
00118 #endif
00119
00120
00121
00122
00123 if (hashPtr != NULL) {
00124 consider_old(packet);
00125 return;
00126 }
00127
00128
00129
00130 PktTable.put_in_hash(dfh);
00131
00132
00133
00134
00135 if (DUP_SUP_ == true) {
00136
00137 if (dfh->mess_type == DATA) {
00138 if (DataTable.GetHash(dfh->attr) != NULL) {
00139 consider_old(packet);
00140 return;
00141 } else {
00142 DataTable.PutInHash(dfh->attr);
00143 }
00144
00145 }
00146 }
00147
00148 consider_new(packet);
00149 }
00150
00151
00152 void DiffusionRate::consider_old(Packet *pkt)
00153 {
00154 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00155 hdr_cmn* cmh = HDR_CMN(pkt);
00156 unsigned char msg_type = dfh->mess_type;
00157 unsigned int dtype = dfh->data_type;
00158
00159 switch (msg_type) {
00160 case INTEREST :
00161 InterestHandle(pkt);
00162 return;
00163
00164 case DATA:
00165
00166 if (cmh->next_hop_ == (nsaddr_t)MAC_BROADCAST) {
00167 num_data_bcast_rcv++;
00168 }
00169
00170 if (dfh->report_rate == ORIGINAL) {
00171 routing_table[dtype].CntOldOrg(dfh->forward_agent_id);
00172 }
00173 Packet::free(pkt);
00174 return;
00175
00176 case NEG_REINFORCE:
00177 if (cmh->next_hop_ == (nsaddr_t)MAC_BROADCAST) {
00178 num_neg_bcast_rcv++;
00179 }
00180 break;
00181
00182 default :
00183 Packet::free(pkt);
00184 break;
00185 }
00186 }
00187
00188
00189 void DiffusionRate::consider_new(Packet *pkt)
00190 {
00191 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00192 hdr_cmn * cmh = HDR_CMN(pkt);
00193 unsigned char msg_type = dfh->mess_type;
00194 unsigned int dtype = dfh->data_type;
00195
00196 Agent_List *agentPtr;
00197 Packet *gen_pkt;
00198 hdr_cdiff *gen_dfh;
00199
00200 switch (msg_type) {
00201 case INTEREST :
00202 InterestHandle(pkt);
00203 return;
00204
00205 case POS_REINFORCE :
00206 if ( POS_REINF_ == false ) {
00207 printf("Hey, we are not in pos_reinf mode.\n");
00208 Packet::free(pkt);
00209 exit(-1);
00210 }
00211
00212 ProcessPosReinf(pkt);
00213 return;
00214
00215 case NEG_REINFORCE :
00216 if (cmh->next_hop_ == (nsaddr_t)MAC_BROADCAST) {
00217 num_neg_bcast_rcv++;
00218 } else {
00219 routing_table[dtype].CntNeg(dfh->forward_agent_id);
00220 }
00221
00222 if (NEG_REINF_ == false) {
00223 printf("Hey, we are not in neg_reinf mode.\n");
00224 Packet::free(pkt);
00225 exit(-1);
00226 }
00227
00228 ProcessNegReinf(pkt);
00229 return;
00230
00231 case DATA_READY :
00232
00233
00234
00235 agentPtr = new Agent_List;
00236 AGT_ADDR(agentPtr) = dfh->sender_id;
00237 agentPtr->next = routing_table[dtype].source;
00238 routing_table[dtype].source = agentPtr;
00239
00240 God::instance()->AddSource(dtype, (dfh->sender_id).addr_);
00241
00242
00243
00244
00245
00246
00247 if (routing_table[dtype].active != NULL ||
00248 routing_table[dtype].sink != NULL) {
00249 gen_pkt = prepare_message(dtype, dfh->sender_id, DATA_REQUEST);
00250 gen_dfh = HDR_CDIFF(gen_pkt);
00251 gen_dfh->report_rate = SUB_SAMPLED;
00252 send_to_dmux(gen_pkt, 0);
00253
00254
00255
00256
00257
00258
00259 }
00260
00261 Packet::free(pkt);
00262 return;
00263
00264 case DATA :
00265
00266 if (cmh->next_hop_ == (nsaddr_t)MAC_BROADCAST) {
00267 num_data_bcast_rcv++;
00268 }
00269
00270 DataForSink(pkt);
00271
00272 if (dfh->report_rate == SUB_SAMPLED) {
00273 routing_table[dtype].CntNewSub(dfh->forward_agent_id);
00274 FwdData(pkt);
00275 return;
00276 }
00277
00278 if (dfh->report_rate == ORIGINAL) {
00279 routing_table[dtype].new_org_counter++;
00280 routing_table[dtype].CntNewOrg(dfh->forward_agent_id);
00281 FwdData(pkt);
00282
00283 if (neg_win_type_ == NEG_COUNTER) {
00284 CheckNegCounter(dtype);
00285 return;
00286 }
00287 }
00288 return;
00289
00290 default :
00291 Packet::free(pkt);
00292 break;
00293 }
00294 }
00295
00296
00297 void DiffusionRate::CheckNegCounter(int dtype)
00298 {
00299 if (neg_max_type_ == NEG_FIXED_MAX) {
00300 if (routing_table[dtype].new_org_counter >= MAX_NEG_COUNTER
00301 && NEG_REINF_ == true) {
00302 GenNeg(dtype);
00303 routing_table[dtype].new_org_counter = 0;
00304 routing_table[dtype].ClrAllNewOrg();
00305 routing_table[dtype].ClrAllOldOrg();
00306 }
00307 return;
00308 }
00309
00310 if (neg_max_type_ == NEG_SCALE_MAX) {
00311 if (routing_table[dtype].new_org_counter >=
00312 PER_IIF * routing_table[dtype].num_iif
00313 && NEG_REINF_ == true) {
00314 GenNeg(dtype);
00315 routing_table[dtype].new_org_counter = 0;
00316 routing_table[dtype].ClrAllNewOrg();
00317 routing_table[dtype].ClrAllOldOrg();
00318 }
00319 return;
00320 }
00321 }
00322
00323
00324 void DiffusionRate::InterestHandle(Packet *pkt)
00325 {
00326 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00327 unsigned int dtype = dfh->data_type;
00328 Agent_List *agentPtr;
00329
00330 nsaddr_t from_nodeID;
00331 PrvCurPtr RetVal;
00332 Out_List *OutPtr;
00333
00334
00335 if (dfh->ts_ + INTEREST_TIMEOUT < NOW) {
00336 Packet::free(pkt);
00337 return;
00338 }
00339
00340
00341
00342
00343 from_nodeID = (dfh->sender_id).addr_;
00344
00345 if (THIS_NODE == from_nodeID) {
00346
00347
00348
00349
00350 RetVal = INTF_FIND(routing_table[dtype].sink, dfh->sender_id);
00351
00352 if (RetVal.cur == NULL) {
00353
00354 agentPtr = new Agent_List;
00355 AGT_ADDR(agentPtr) = dfh->sender_id;
00356 INTF_INSERT(routing_table[dtype].sink, agentPtr);
00357
00358 God::instance()->AddSink(dtype, THIS_NODE);
00359 }
00360 } else {
00361
00362
00363
00364 RetVal = INTF_FIND(routing_table[dtype].active, dfh->forward_agent_id);
00365 if (RetVal.cur == NULL) {
00366 OutPtr = new Out_List;
00367 AGT_ADDR(OutPtr) = dfh->forward_agent_id;
00368 GRADIENT(OutPtr) = dfh->report_rate;
00369 GRAD_TMOUT(OutPtr) = dfh->ts_ + INTEREST_TIMEOUT;
00370 INTF_INSERT(routing_table[dtype].active, OutPtr);
00371 routing_table[dtype].num_active ++;
00372 } else {
00373 GRAD_TMOUT(RetVal.cur) = max(GRAD_TMOUT(RetVal.cur),
00374 dfh->ts_ + INTEREST_TIMEOUT);
00375 }
00376
00377 }
00378
00379 if (NOW > routing_table[dtype].last_fwd_time + INTEREST_PERIODIC) {
00380 if (routing_table[dtype].ExistOriginalGradient() == true) {
00381 DataReqAll(dtype, ORIGINAL);
00382 } else {
00383 DataReqAll(dtype, SUB_SAMPLED);
00384 }
00385 routing_table[dtype].last_fwd_time = NOW;
00386 MACprepare(pkt, MAC_BROADCAST, NS_AF_ILINK, 0);
00387 MACsend(pkt, JITTER*Random::uniform(1.0));
00388 overhead++;
00389
00390 #ifdef DEBUG_RATE
00391 hdr_cmn *cmh = HDR_CMN(pkt);
00392 printf("DF node %x will send %s (%x, %x, %d) to %x\n",
00393 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00394 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00395 #endif
00396
00397
00398 return;
00399 }
00400
00401 Packet::free(pkt);
00402 return;
00403 }
00404
00405
00406 void DiffusionRate::NegReinfTimeOut()
00407 {
00408 for (int i=0; i<MAX_DATA_TYPE; i++) {
00409 GenNeg(i);
00410 routing_table[i].new_org_counter = 0;
00411 routing_table[i].ClrAllNewOrg();
00412 routing_table[i].ClrAllOldOrg();
00413 }
00414
00415 neg_reinf_timer->resched(NEG_CHECK);
00416 }
00417
00418
00419 void DiffusionRate::GradientTimeOut()
00420 {
00421 int i;
00422 Agent_List *cur_out, **prv_out;
00423
00424 for (i=0; i<MAX_DATA_TYPE; i++) {
00425 for (cur_out = routing_table[i].active,
00426 prv_out = (Agent_List **)&routing_table[i].active;
00427 cur_out != NULL; ) {
00428
00429 if (NOW > GRAD_TMOUT(cur_out)) {
00430 INTF_REMOVE(prv_out, cur_out);
00431 routing_table[i].num_active -- ;
00432 cur_out = *prv_out;
00433 }
00434 else {
00435 prv_out = &(cur_out->next);
00436 cur_out = cur_out->next;
00437 }
00438
00439 }
00440 }
00441
00442 gradient_timer->resched(INTEREST_TIMEOUT);
00443 }
00444
00445
00446 bool DiffusionRate::FwdSubsample(Packet *pkt)
00447 {
00448 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00449 Out_List *cur_out;
00450 Packet *cur_pkt;
00451 hdr_cdiff *cur_dfh;
00452 hdr_ip *cur_iph;
00453 unsigned int dtype = dfh->data_type;
00454
00455 if (routing_table[dtype].num_active <= 0) {
00456 num_not_send_bcast_data++;
00457 return false;
00458 }
00459
00460
00461
00462 num_data_bcast_send++;
00463
00464 if (sub_type_ == BCAST_SUB) {
00465 MACprepare(pkt, MAC_BROADCAST, NS_AF_ILINK, 0);
00466 MACsend(pkt, JITTER*Random::uniform(1.0));
00467
00468 #ifdef DEBUG_RATE
00469 hdr_cmn *cmh = HDR_CMN(pkt);
00470 printf("DF node %x will send %s (%x, %x, %d) to %x\n",
00471 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00472 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00473 #endif // DEBUG_RATE
00474
00475 return true;
00476 }
00477
00478 if (sub_type_ == UNICAST_SUB) {
00479 for (cur_out = routing_table[dtype].active; cur_out!= NULL;
00480 cur_out = OUT_NEXT(cur_out)) {
00481
00482 cur_pkt = pkt->copy();
00483 cur_iph = HDR_IP(cur_pkt);
00484 cur_iph->dst_ = AGT_ADDR(cur_out);
00485
00486 cur_dfh = HDR_CDIFF(cur_pkt);
00487 cur_dfh->forward_agent_id = here_;
00488 cur_dfh->num_next = 1;
00489 cur_dfh->next_nodes[0] = NODE_ADDR(cur_out);
00490
00491 MACprepare(cur_pkt, NODE_ADDR(cur_out), NS_AF_INET,
00492 MAC_RETRY_);
00493 MACsend(cur_pkt, 0);
00494
00495 #ifdef DEBUG_RATE
00496 cur_cmh = HDR_CMN(cur_pkt);
00497 printf("DF node %x will send %s (%x, %x, %d) to %x\n",
00498 THIS_NODE, MsgStr[cur_dfh->mess_type],
00499 (cur_dfh->sender_id).addr_, (cur_dfh->sender_id).port_,
00500 cur_dfh->pk_num, cur_cmh->next_hop());
00501 #endif // DEBUG_RATE
00502
00503 }
00504
00505 return true;
00506 }
00507
00508 return false;
00509 }
00510
00511
00512 void DiffusionRate::TriggerPosReinf(Packet *pkt, ns_addr_t forward_agent)
00513 {
00514 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00515 unsigned int dtype = dfh->data_type;
00516 nsaddr_t forwarder_node = forward_agent.addr_;
00517
00518 if (pos_node_type_ == INTM_POS) {
00519 if (routing_table[dtype].sink != NULL ||
00520 routing_table[dtype].ExistOriginalGradient() == true) {
00521 DataReqAll(dtype, ORIGINAL);
00522 if (THIS_NODE != forwarder_node) {
00523 PosReinf(dtype, forwarder_node, dfh->sender_id,
00524 dfh->pk_num);
00525 routing_table[dtype].CntPosSend(forward_agent);
00526 routing_table[dtype].ClrNewSub(forward_agent);
00527 }
00528 }
00529 return;
00530 }
00531
00532
00533 if (pos_node_type_ == END_POS) {
00534 if (routing_table[dtype].sink != NULL) {
00535 DataReqAll(dtype, ORIGINAL);
00536 if (THIS_NODE != forwarder_node) {
00537 PosReinf(dtype, forwarder_node, dfh->sender_id,
00538 dfh->pk_num);
00539 routing_table[dtype].CntPosSend(forward_agent);
00540 routing_table[dtype].ClrNewSub(forward_agent);
00541 }
00542 }
00543
00544 return;
00545 }
00546 }
00547
00548
00549 void DiffusionRate::FwdOriginal(Packet *pkt)
00550 {
00551 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00552 unsigned int dtype = dfh->data_type;
00553 Out_List *cur_out;
00554 Packet *cur_pkt;
00555 hdr_cdiff *cur_dfh;
00556 hdr_ip *cur_iph;
00557
00558 if (org_type_ == BCAST_ORG) {
00559 MACprepare(pkt, MAC_BROADCAST, NS_AF_ILINK, 0);
00560 MACsend(pkt, JITTER*Random::uniform(1.0));
00561
00562 #ifdef DEBUG_RATE
00563 hdr_cmn *cmh = HDR_CMN(pkt);
00564 printf("DF node %x will send %s (%x, %x, %d) to %x\n",
00565 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00566 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00567 #endif // DEBUG_RATE
00568
00569 return;
00570 }
00571
00572 if (org_type_ == UNICAST_ORG) {
00573 for (cur_out = routing_table[dtype].active; cur_out!= NULL;
00574 cur_out = OUT_NEXT(cur_out)) {
00575 if (GRADIENT(cur_out) == ORIGINAL) {
00576
00577 cur_pkt = pkt->copy();
00578 cur_iph = HDR_IP(cur_pkt);
00579 cur_iph->dst_ = AGT_ADDR(cur_out);
00580
00581 cur_dfh = HDR_CDIFF(cur_pkt);
00582 cur_dfh->forward_agent_id = here_;
00583 cur_dfh->num_next = 1;
00584 cur_dfh->next_nodes[0] = NODE_ADDR(cur_out);
00585
00586 cur_out->num_data_send++;
00587
00588 MACprepare(cur_pkt, NODE_ADDR(cur_out), NS_AF_INET,
00589 MAC_RETRY_);
00590 MACsend(cur_pkt, 0);
00591
00592 #ifdef DEBUG_RATE
00593 cur_cmh = HDR_CMN(cur_pkt);
00594 printf("DF node %x will send %s (%x, %x, %d) to %x\n",
00595 THIS_NODE, MsgStr[cur_dfh->mess_type],
00596 (cur_dfh->sender_id).addr_, (cur_dfh->sender_id).port_,
00597 cur_dfh->pk_num, cur_cmh->next_hop());
00598 #endif // DEBUG_RATE
00599
00600 }
00601 }
00602
00603 Packet::free(pkt);
00604 return;
00605 }
00606 }
00607
00608
00609 void DiffusionRate::FwdData(Packet *pkt)
00610 {
00611 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00612 unsigned int dtype = dfh->data_type;
00613 nsaddr_t forwarder_node;
00614 ns_addr_t forward_agent;
00615 bool forward_flag;
00616
00617 forwarder_node = (dfh->forward_agent_id).addr_;
00618 forward_agent = dfh->forward_agent_id;
00619
00620 if (dfh->report_rate == SUB_SAMPLED) {
00621 forward_flag = FwdSubsample(pkt);
00622 TriggerPosReinf(pkt, forward_agent);
00623
00624 if (forward_flag == false) {
00625 Packet::free(pkt);
00626 }
00627 return;
00628 }
00629
00630
00631
00632 if (routing_table[dtype].ExistOriginalGradient() == false
00633 && routing_table[dtype].sink == NULL) {
00634
00635 if (THIS_NODE != forwarder_node && NEG_REINF_ == true) {
00636 BcastNeg(dtype);
00637 routing_table[dtype].new_org_counter = 0;
00638 routing_table[dtype].ClrAllNewOrg();
00639 routing_table[dtype].ClrAllOldOrg();
00640 }
00641 Packet::free(pkt);
00642 return;
00643 }
00644
00645 if (routing_table[dtype].ExistOriginalGradient() == false) {
00646 Packet::free(pkt);
00647 return;
00648 }
00649
00650 FwdOriginal(pkt);
00651 }
00652
00653
00654 void DiffusionRate::DataReqAll(unsigned int dtype, int report_rate)
00655 {
00656 Agent_List *cur_agent;
00657 Packet *pkt;
00658 hdr_cdiff *dfh;
00659
00660 for (cur_agent=routing_table[dtype].source; cur_agent != NULL;
00661 cur_agent = AGENT_NEXT(cur_agent) ) {
00662 pkt = prepare_message(dtype, AGT_ADDR(cur_agent), DATA_REQUEST);
00663 dfh = HDR_CDIFF(pkt);
00664 dfh->report_rate = report_rate;
00665 send_to_dmux(pkt, 0);
00666 }
00667 }
00668
00669
00670 void DiffusionRate::GenNeg(int dtype)
00671 {
00672 In_List *cur;
00673
00674 if (neg_thr_type_ == NEG_ABSOLUTE) {
00675 for (cur= routing_table[dtype].iif; cur != NULL; cur= IN_NEXT(cur)) {
00676 if (NEW_ORG_RECV(cur) <= 0 && OLD_ORG_RECV(cur) > MAX_DUP_DATA) {
00677 UcastNeg(dtype, AGT_ADDR(cur));
00678 cur->num_neg_send++;
00679 }
00680 }
00681 return;
00682 }
00683
00684 if (neg_thr_type_ == NEG_RELATIVE) {
00685 int most= routing_table[dtype].MostRecvOrg();
00686
00687 for (cur= routing_table[dtype].iif; cur != NULL; cur= IN_NEXT(cur)) {
00688 if (OLD_ORG_RECV(cur) > MAX_DUP_DATA &&
00689 NEW_ORG_RECV(cur) <= NEG_MIN_RATIO*most) {
00690 UcastNeg(dtype, AGT_ADDR(cur));
00691 cur->num_neg_send++;
00692 }
00693 }
00694 return;
00695 }
00696 }
00697
00698
00699 void DiffusionRate::BcastNeg(int dtype)
00700 {
00701 ns_addr_t bcast_addr;
00702 bcast_addr.addr_ = MAC_BROADCAST;
00703 bcast_addr.port_ = ROUTING_PORT;
00704
00705 Packet *pkt=prepare_message(dtype, bcast_addr, NEG_REINFORCE);
00706
00707 MACprepare(pkt, MAC_BROADCAST, NS_AF_ILINK, 0);
00708 MACsend(pkt, 0);
00709 overhead++;
00710 num_neg_bcast_send++;
00711
00712 #ifdef DEBUG_RATE
00713 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00714 hdr_cmn *cmh = HDR_CMN(pkt);
00715 printf("DF node %d will send %s (%x, %x, %d) to %x\n",
00716 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00717 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00718 #endif // DEBUG_RATE
00719
00720 }
00721
00722
00723 void DiffusionRate::UcastNeg(int dtype, ns_addr_t to)
00724 {
00725 Packet *pkt=prepare_message(dtype, to, NEG_REINFORCE);
00726 MACprepare(pkt, to.addr_, NS_AF_INET, 0);
00727 MACsend(pkt, 0);
00728 overhead++;
00729
00730 #ifdef DEBUG_RATE
00731 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00732 hdr_cmn *cmh = HDR_CMN(pkt);
00733 printf("DF node %d will send %s (%x, %x, %d) to %x\n",
00734 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00735 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00736 #endif
00737
00738 }
00739
00740
00741 void DiffusionRate::ProcessNegReinf(Packet *pkt)
00742 {
00743 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00744 unsigned int dtype = dfh->data_type;
00745 Out_List *cur_out;
00746 PrvCurPtr RetVal;
00747
00748 RetVal=INTF_FIND(routing_table[dtype].active, dfh->forward_agent_id);
00749
00750 if (RetVal.cur == NULL) {
00751 Packet::free(pkt);
00752 return;
00753 }
00754
00755 cur_out = (Out_List *)(RetVal.cur);
00756 if (GRADIENT(cur_out) == SUB_SAMPLED) {
00757 Packet::free(pkt);
00758 return;
00759 }
00760
00761 GRADIENT(cur_out) = SUB_SAMPLED;
00762
00763 if (routing_table[dtype].ExistOriginalGradient() == false &&
00764 routing_table[dtype].sink == NULL) {
00765
00766 DataReqAll(dtype, SUB_SAMPLED);
00767
00768 if (NEG_REINF_ == true) {
00769 BcastNeg(dtype);
00770 routing_table[dtype].new_org_counter = 0;
00771 routing_table[dtype].ClrAllNewOrg();
00772 routing_table[dtype].ClrAllOldOrg();
00773 }
00774 }
00775
00776 Packet::free(pkt);
00777 }
00778
00779
00780 void DiffusionRate::ProcessPosReinf(Packet *pkt)
00781 {
00782 hdr_cdiff *dfh= HDR_CDIFF(pkt);
00783 unsigned int dtype = dfh->data_type;
00784 Out_List *cur_out, *OutPtr;
00785 PrvCurPtr RetVal;
00786
00787 RetVal=INTF_FIND(routing_table[dtype].active, dfh->forward_agent_id);
00788
00789 if (RetVal.cur != NULL) {
00790 cur_out = (Out_List *)(RetVal.cur);
00791 GRADIENT(cur_out) = ORIGINAL;
00792 GRAD_TMOUT(RetVal.cur) = max(GRAD_TMOUT(RetVal.cur),
00793 dfh->ts_ + INTEREST_TIMEOUT);
00794 NUM_POS_RECV(cur_out)++;
00795 } else {
00796
00797 OutPtr = new Out_List;
00798 AGT_ADDR(OutPtr) = dfh->forward_agent_id;
00799 GRADIENT(OutPtr) = dfh->report_rate;
00800 GRAD_TMOUT(OutPtr) = dfh->ts_ + INTEREST_TIMEOUT;
00801 INTF_INSERT(routing_table[dtype].active, OutPtr);
00802 routing_table[dtype].num_active ++;
00803 NUM_POS_RECV(OutPtr)++;
00804 }
00805
00806 DataReqAll(dtype, ORIGINAL);
00807
00808
00809 Pkt_Hash_Entry *hashPtr;
00810 nsaddr_t next_node;
00811 In_List *recent_in;
00812 In_List *cur;
00813
00814 switch(pos_type_) {
00815
00816 case POS_HASH:
00817 hashPtr=PktTable.GetHash(dfh->info.sender, dfh->info.seq);
00818 if (hashPtr == NULL) {
00819 perror("Hey! I've never seen that packet before.\n");
00820 Packet::free(pkt);
00821 exit(-1);
00822 }
00823
00824 next_node = (hashPtr->forwarder_id).addr_;
00825 if (next_node == THIS_NODE) {
00826 Packet::free(pkt);
00827 return;
00828 }
00829
00830 PosReinf(dtype, hashPtr->forwarder_id.addr_, dfh->info.sender,
00831 dfh->info.seq);
00832 routing_table[dtype].CntPosSend(hashPtr->forwarder_id);
00833 routing_table[dtype].ClrNewSub(hashPtr->forwarder_id);
00834
00835 #ifdef DEBUG_RATE
00836 printf("DF node %d will send %s to %x\n",
00837 THIS_NODE, MsgStr[dfh->mess_type], hashPtr->forwarder_id.addr_);
00838 #endif // DEBUG_RATE
00839
00840 Packet::free(pkt);
00841 return;
00842
00843
00844 case POS_LAST:
00845 recent_in = routing_table[dtype].MostRecentIn();
00846 if (recent_in == NULL) {
00847 Packet::free(pkt);
00848 return;
00849 }
00850
00851 next_node = NODE_ADDR(recent_in);
00852 if (next_node == THIS_NODE) {
00853 Packet::free(pkt);
00854 return;
00855 }
00856
00857 PosReinf(dtype, NODE_ADDR(recent_in), dfh->info.sender, dfh->info.seq);
00858 routing_table[dtype].CntPosSend(AGT_ADDR(recent_in));
00859 routing_table[dtype].ClrNewSub(AGT_ADDR(recent_in));
00860
00861 #ifdef DEBUG_RATE
00862 printf("DF node %d will send %s to %x\n",
00863 THIS_NODE, MsgStr[dfh->mess_type], NODE_ADDR(recent_in));
00864 #endif // DEBUG_RATE
00865
00866 Packet::free(pkt);
00867 return;
00868
00869
00870 case POS_ALL:
00871 for (cur = routing_table[dtype].iif; cur!=NULL; cur = IN_NEXT(cur)) {
00872
00873 if (NEW_SUB_RECV(cur) <= 0) {
00874 continue;
00875 }
00876
00877 next_node = NODE_ADDR(cur);
00878
00879 if (next_node == THIS_NODE) {
00880 continue;
00881 }
00882
00883 PosReinf(dtype, NODE_ADDR(cur), dfh->info.sender, dfh->info.seq);
00884 routing_table[dtype].CntPosSend(AGT_ADDR(cur));
00885 routing_table[dtype].ClrNewSub(AGT_ADDR(cur));
00886
00887 #ifdef DEBUG_RATE
00888 printf("DF node %d will send %s to %x\n",
00889 THIS_NODE, MsgStr[dfh->mess_type], NODE_ADDR(cur));
00890 #endif // DEBUG_RATE
00891
00892 }
00893 Packet::free(pkt);
00894 return;
00895
00896 default:
00897 Packet::free(pkt);
00898 return;
00899 }
00900 }
00901
00902
00903 void DiffusionRate::PosReinf(int dtype, nsaddr_t to_node,
00904 ns_addr_t info_sender, unsigned int info_seq)
00905 {
00906 ns_addr_t to_agent_addr;
00907 to_agent_addr.addr_ = to_node;
00908 to_agent_addr.port_ = ROUTING_PORT;
00909
00910 Packet *pkt=prepare_message(dtype, to_agent_addr, POS_REINFORCE);
00911 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00912
00913 dfh->report_rate = ORIGINAL;
00914 dfh->info.sender = info_sender;
00915 dfh->info.seq = info_seq;
00916
00917 MACprepare(pkt, to_node, NS_AF_INET, 1);
00918 MACsend(pkt, 0);
00919 overhead++;
00920
00921 #ifdef DEBUG_RATE
00922 hdr_cmn *cmh = HDR_CMN(pkt);
00923 printf("DF node %d will send %s (%x, %x, %d) to %x\n",
00924 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00925 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00926 #endif
00927
00928 }
00929
00930
00931 void DiffusionRate::Start()
00932 {
00933 DiffusionAgent::Start();
00934
00935 gradient_timer = new GradientTimer(this);
00936 gradient_timer->resched(INTEREST_TIMEOUT);
00937
00938 if ( neg_win_type_ == NEG_TIMER && NEG_REINF_ == true) {
00939 neg_reinf_timer = new NegativeReinforceTimer(this);
00940 neg_reinf_timer->resched(NEG_CHECK);
00941 }
00942 }
00943
00944
00945 void DiffusionRate::reset()
00946 {
00947 DiffusionAgent::reset();
00948 DataTable.reset();
00949 }
00950
00951
00952 void DiffusionRate::Print_IOlist()
00953 {
00954 Out_List *cur_out;
00955 In_List *cur_in;
00956 int i;
00957
00958 for (i=0; i<1; i++) {
00959 printf("Node %d DATA TYPE %d: send bcast data %d, not send %d, rcv %d\n",
00960 THIS_NODE, i, num_data_bcast_send, num_not_send_bcast_data,
00961 num_data_bcast_rcv);
00962 printf("Node %d neg bcast send %d, neg bcast rcv %d\n",
00963 THIS_NODE, num_neg_bcast_send, num_neg_bcast_rcv);
00964 for (cur_out = routing_table[i].active; cur_out != NULL;
00965 cur_out = OUT_NEXT(cur_out) ) {
00966 printf("DF node %d has oif %d (%f,%d) send data %d recv neg %d pos %d\n",
00967 THIS_NODE, NODE_ADDR(cur_out), GRADIENT(cur_out),
00968 routing_table[i].num_active, NUM_DATA_SEND(cur_out),
00969 NUM_NEG_RECV(cur_out), NUM_POS_RECV(cur_out));
00970 }
00971
00972 for (cur_in = routing_table[i].iif; cur_in != NULL;
00973 cur_in = IN_NEXT(cur_in) ) {
00974 printf("Diffusion node %d has iif for %d\n",
00975 THIS_NODE, NODE_ADDR(cur_in));
00976 printf("Node %d recv new sub %d,new org %d,old org %d:send neg %d pos %d\n",
00977 THIS_NODE, TOTAL_NEW_SUB_RECV(cur_in), TOTAL_NEW_ORG_RECV(cur_in),
00978 TOTAL_OLD_ORG_RECV(cur_in), NUM_NEG_SEND(cur_in),
00979 NUM_POS_SEND(cur_in));
00980 }
00981
00982 }
00983 }
00984
00985
00986 int DiffusionRate::command(int argc, const char*const*argv)
00987 {
00988 if (argc == 2) {
00989 if (strcasecmp(argv[1], "enable-suppression") == 0) {
00990 DUP_SUP_ = true;
00991 return TCL_OK;
00992 }
00993
00994 if (strcasecmp(argv[1], "disable-suppression") == 0) {
00995 DUP_SUP_ = false;
00996 return TCL_OK;
00997 }
00998
00999 }
01000
01001 else if (argc == 3) {
01002
01003 if (strcasecmp(argv[1], "set-sub-tx-type") == 0 ) {
01004 sub_type_ = ParseSubType(argv[2]);
01005 return TCL_OK;
01006 }
01007
01008 if (strcasecmp(argv[1], "set-org-tx-type") == 0 ) {
01009 org_type_ = ParseOrgType(argv[2]);
01010 return TCL_OK;
01011 }
01012
01013 if (strcasecmp(argv[1], "set-pos-type") == 0 ) {
01014 pos_type_ = ParsePosType(argv[2]);
01015 return TCL_OK;
01016 }
01017
01018 if (strcasecmp(argv[1], "set-pos-node-type") == 0 ) {
01019 pos_node_type_ = ParsePosNodeType(argv[2]);
01020 return TCL_OK;
01021 }
01022
01023 if (strcasecmp(argv[1], "set-neg-win-type") == 0 ) {
01024 neg_win_type_ = ParseNegWinType(argv[2]);
01025 return TCL_OK;
01026 }
01027
01028 if (strcasecmp(argv[1], "set-neg-thr-type") == 0 ) {
01029 neg_thr_type_ = ParseNegThrType(argv[2]);
01030 return TCL_OK;
01031 }
01032
01033 if (strcasecmp(argv[1], "set-neg-max-type") == 0 ) {
01034 neg_max_type_ = ParseNegMaxType(argv[2]);
01035 return TCL_OK;
01036 }
01037 }
01038
01039
01040 return DiffusionAgent::command(argc, argv);
01041 }
01042
01043
01044
01045 sub_t ParseSubType(const char* str)
01046 {
01047 if (strcasecmp(str, "BROADCAST") == 0) {
01048 return BCAST_SUB;
01049 }
01050
01051 if (strcasecmp(str, "UNICAST") == 0) {
01052 return UNICAST_SUB;
01053 }
01054
01055 fprintf(stderr,"ParseSubType Error -- Only BROADCAST or UNICAST\n");
01056 exit(-1);
01057 }
01058
01059
01060
01061 org_t ParseOrgType(const char* str)
01062 {
01063 if (strcasecmp(str, "BROADCAST") == 0) {
01064 return BCAST_ORG;
01065 }
01066
01067 if (strcasecmp(str, "UNICAST") == 0) {
01068 return UNICAST_ORG;
01069 }
01070
01071 fprintf(stderr,"ParseOrgType Error -- Only BROADCAST or UNICAST\n");
01072 exit(-1);
01073 }
01074
01075
01076 pos_t ParsePosType(const char* str)
01077 {
01078 if (strcasecmp(str, "HASH") == 0) {
01079 return POS_HASH;
01080 }
01081
01082 if (strcasecmp(str, "LAST") == 0) {
01083 return POS_LAST;
01084 }
01085
01086 if (strcasecmp(str, "ALL") == 0) {
01087 return POS_ALL;
01088 }
01089
01090 fprintf(stderr,"ParsePosType Error -- Only HASH, LAST, or ALL\n");
01091 exit(-1);
01092 }
01093
01094
01095 pos_ndt ParsePosNodeType(const char* str)
01096 {
01097 if (strcasecmp(str, "END") == 0) {
01098 return END_POS;
01099 }
01100
01101 if (strcasecmp(str, "INTM") == 0) {
01102 return INTM_POS;
01103 }
01104
01105 fprintf(stderr,"ParsePosNodeType Error -- Only END or INTM\n");
01106 exit(-1);
01107 }
01108
01109
01110 neg_wint ParseNegWinType(const char* str)
01111 {
01112 if (strcasecmp(str, "COUNTER") == 0) {
01113 return NEG_COUNTER;
01114 }
01115
01116 if (strcasecmp(str, "TIMER") == 0) {
01117 return NEG_TIMER;
01118 }
01119
01120 fprintf(stderr,"ParseNegWinType Error -- Only COUNTER or TIMER\n");
01121 exit(-1);
01122 }
01123
01124
01125 neg_tht ParseNegThrType(const char* str)
01126 {
01127 if (strcasecmp(str, "ABSOLUTE") == 0) {
01128 return NEG_ABSOLUTE;
01129 }
01130
01131 if (strcasecmp(str, "RELATIVE") == 0) {
01132 return NEG_RELATIVE;
01133 }
01134
01135 fprintf(stderr,"ParseNegThrType Error -- Only ABSOLUTE or RELATIVE\n");
01136 exit(-1);
01137 }
01138
01139
01140 neg_mxt ParseNegMaxType(const char* str)
01141 {
01142 if (strcasecmp(str, "FIXED") == 0) {
01143 return NEG_FIXED_MAX;
01144 }
01145
01146 if (strcasecmp(str, "SCALE") == 0) {
01147 return NEG_SCALE_MAX;
01148 }
01149
01150 fprintf(stderr,"ParseNegMaxType Error -- Only FIXED or SCALE\n");
01151 exit(-1);
01152 }
01153
01154
01155
01156
01157