00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <stdlib.h>
00031 #include "diff_sink.h"
00032 #include "diffusion.h"
00033 #include "diff_rate.h"
00034 #include "hash_table.h"
00035 #include "agent.h"
00036 #include "packet.h"
00037 #include "tclcl.h"
00038 #include "random.h"
00039 #include "god.h"
00040
00041 #define REPORT_PERIOD 1.0
00042
00043 extern char* MsgStr[];
00044
00045 void Report_Timer::expire(Event *) {
00046 a_->report();
00047 }
00048
00049
00050 void Sink_Timer::expire(Event *) {
00051 a_->timeout(0);
00052 }
00053
00054 void Periodic_Timer::expire(Event *) {
00055 a_->bcast_interest();
00056 }
00057
00058 static class SinkClass : public TclClass {
00059 public:
00060 SinkClass() : TclClass("Agent/Diff_Sink") {}
00061 TclObject* create(int , const char*const* ) {
00062 return(new SinkAgent());
00063 }
00064 } class_sink;
00065
00066
00067 SinkAgent::SinkAgent() : Agent(PT_DIFF), data_type_(0),
00068 running_(0), random_(0), sink_timer_(this), periodic_timer_(this),
00069 report_timer_(this)
00070 {
00071
00072
00073 APP_DUP_ = true;
00074
00075 periodic_ = true;
00076 always_max_rate_ = false;
00077
00078
00079
00080 bind("data_type_", &data_type_);
00081 bind_time("interval_", &interval_);
00082 bind("packetSize_", &size_);
00083 bind("random_", &random_);
00084 bind("maxpkts_", &maxpkts_);
00085
00086
00087
00088
00089 pk_count=0;
00090 num_recv=0;
00091 num_send=0;
00092 RecvPerSec=0;
00093
00094 cum_delay=0.0;
00095
00096 data_counter = 0;
00097 simple_report_rate = ORIGINAL;
00098
00099 last_arrival_time = -1.0;
00100 }
00101
00102 void SinkAgent::start()
00103 {
00104 running_ = 1;
00105 sendpkt();
00106 sink_timer_.resched(interval_);
00107 }
00108
00109 void SinkAgent::stop()
00110 {
00111 if (running_) {
00112 running_ = 0;
00113 }
00114
00115 if (periodic_ == true) {
00116 periodic_ = false;
00117 periodic_timer_.force_cancel();
00118 }
00119 }
00120
00121
00122 void SinkAgent::report()
00123 {
00124
00125 report_timer_.resched(REPORT_PERIOD);
00126 RecvPerSec = 0;
00127 }
00128
00129
00130 void SinkAgent::timeout(int)
00131 {
00132 if (running_) {
00133 sendpkt();
00134 double t = interval_;
00135 if (random_)
00136
00137 t += interval_ * Random::uniform(-0.5, 0.5);
00138 sink_timer_.resched(t);
00139 }
00140 }
00141
00142
00143 void SinkAgent::sendpkt()
00144 {
00145 if (pk_count >= maxpkts_) {
00146 running_ = 0;
00147 return;
00148 }
00149
00150 Packet* pkt = create_packet();
00151 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00152 hdr_ip* iph = HDR_IP(pkt);
00153 hdr_cmn* cmh = HDR_CMN(pkt);
00154
00155
00156 data_counter = (data_counter + SUB_SAMPLED)% ORIGINAL;
00157 if (data_counter == SUB_SAMPLED) {
00158 dfh->report_rate = SUB_SAMPLED;
00159 } else {
00160 dfh->report_rate = ORIGINAL;
00161 }
00162 if (simple_report_rate < dfh->report_rate) {
00163 Packet::free(pkt);
00164 return;
00165 }
00166
00167 cmh->size() = size_;
00168 dfh->mess_type = DATA;
00169 dfh->pk_num = pk_count;
00170
00171 pk_count++;
00172
00173 dfh->sender_id = here_;
00174 dfh->data_type = data_type_;
00175 dfh->forward_agent_id = here_;
00176
00177 dfh->num_next = 1;
00178 dfh->next_nodes[0] = here_.addr_;
00179
00180 iph->src_ = here_;
00181 iph->dst_.addr_ = here_.addr_;
00182 iph->dst_.port_ = ROUTING_PORT;
00183
00184
00185
00186
00187
00188
00189 num_send++;
00190 dfh->attr[0] = data_type_;
00191
00192 if (APP_DUP_ == true)
00193 dfh->attr[1] = 0;
00194 else
00195 dfh->attr[1] = here_.addr_;
00196
00197 dfh->attr[2] = num_send;
00198 God::instance()->CountNewData(dfh->attr);
00199 send(pkt, 0);
00200 }
00201
00202
00203 void SinkAgent::bcast_interest()
00204 {
00205 Packet* pkt = create_packet();
00206 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00207 hdr_ip* iph = HDR_IP(pkt);
00208
00209
00210 dfh->mess_type = INTEREST;
00211 dfh->pk_num = pk_count;
00212 pk_count++;
00213 dfh->sender_id = here_;
00214 dfh->data_type = data_type_;
00215 dfh->forward_agent_id = here_;
00216
00217 dfh->report_rate = SUB_SAMPLED;
00218 dfh->num_next = 1;
00219 dfh->next_nodes[0] = here_.addr_;
00220
00221 iph->src_ = here_;
00222 iph->dst_.addr_ = here_.addr_;
00223 iph->dst_.port_ = ROUTING_PORT;
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235 send(pkt, 0);
00236 if (periodic_ == true)
00237 periodic_timer_.resched(INTEREST_PERIODIC);
00238 }
00239
00240
00241 void SinkAgent::data_ready()
00242 {
00243
00244 Packet* pkt = create_packet();
00245
00246
00247 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00248 hdr_ip* iph = HDR_IP(pkt);
00249
00250
00251 dfh->mess_type = DATA_READY;
00252 dfh->pk_num = pk_count;
00253 pk_count++;
00254 dfh->sender_id = here_;
00255 dfh->data_type = data_type_;
00256 dfh->forward_agent_id = here_;
00257
00258 dfh->num_next = 1;
00259 dfh->next_nodes[0] = here_.addr_;
00260
00261 iph->src_ = here_;
00262 iph->dst_.addr_ = here_.addr_;
00263 iph->dst_.port_ = ROUTING_PORT;
00264
00265 send(pkt, 0);
00266 }
00267
00268
00269 void SinkAgent::Terminate()
00270 {
00271 #ifdef DEBUG_OUTPUT
00272 printf("SINK %d : TYPE %d : terminates (send %d, recv %d, cum_delay %f)\n",
00273 here_.addr_, data_type_, num_send, num_recv, cum_delay);
00274 #endif
00275 }
00276
00277
00278 int SinkAgent::command(int argc, const char*const* argv)
00279 {
00280 if (argc == 2) {
00281
00282 if (strcmp(argv[1], "enable-duplicate") == 0) {
00283 APP_DUP_ = true;
00284 return TCL_OK;
00285 }
00286
00287 if (strcmp(argv[1], "disable-duplicate") == 0) {
00288 APP_DUP_ = false;
00289 return TCL_OK;
00290 }
00291
00292 if (strcmp(argv[1], "always-max-rate") == 0) {
00293 always_max_rate_ = true;
00294 return TCL_OK;
00295 }
00296
00297 if (strcmp(argv[1], "terminate") == 0) {
00298 Terminate();
00299 return TCL_OK;
00300 }
00301
00302 if (strcmp(argv[1], "announce") == 0) {
00303 bcast_interest();
00304 report_timer_.resched(REPORT_PERIOD);
00305
00306 return (TCL_OK);
00307 }
00308
00309 if (strcmp(argv[1], "ready") == 0) {
00310 God::instance()->data_pkt_size = size_;
00311 data_ready();
00312 return (TCL_OK);
00313 }
00314
00315 if (strcmp(argv[1], "send") == 0) {
00316 sendpkt();
00317 return (TCL_OK);
00318 }
00319
00320 if (strcmp(argv[1], "cbr-start") == 0) {
00321 start();
00322 return (TCL_OK);
00323 }
00324
00325 if (strcmp(argv[1], "stop") == 0) {
00326 stop();
00327 report_timer_.force_cancel();
00328 return (TCL_OK);
00329 }
00330
00331 }
00332
00333 if (argc == 3) {
00334 if (strcmp(argv[1], "data-type") == 0) {
00335 data_type_ = atoi(argv[2]);
00336 return (TCL_OK);
00337 }
00338 }
00339
00340 return (Agent::command(argc, argv));
00341 }
00342
00343
00344 void SinkAgent::recv(Packet* pkt, Handler*)
00345 {
00346 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00347
00348
00349
00350
00351
00352
00353
00354 if (data_type_ != dfh->data_type) {
00355 printf("Hey, What are you doing? I am not a sink for %d. I'm a sink for %d. \n", dfh->data_type, data_type_);
00356 Packet::free(pkt);
00357 return;
00358 }
00359
00360
00361 switch(dfh->mess_type) {
00362 case DATA_REQUEST :
00363
00364 if (always_max_rate_ == false)
00365 simple_report_rate = dfh->report_rate;
00366
00367 if (!running_) start();
00368
00369
00370
00371
00372 break;
00373
00374 case DATA_STOP :
00375
00376 if (running_) stop();
00377 break;
00378
00379 case DATA :
00380
00381 if (APP_DUP_ == true) {
00382 if (DataTable.GetHash(dfh->attr) != NULL) {
00383 Packet::free(pkt);
00384 return;
00385 } else {
00386 DataTable.PutInHash(dfh->attr);
00387 }
00388 }
00389
00390 cum_delay = cum_delay + (NOW - dfh->ts_);
00391 num_recv++;
00392 RecvPerSec++;
00393 God::instance()->IncrRecv();
00394
00395
00396
00397
00398
00399
00400
00401 last_arrival_time = NOW;
00402
00403 break;
00404
00405 default:
00406
00407 break;
00408 }
00409
00410 Packet::free(pkt);
00411 }
00412
00413
00414 void SinkAgent::reset()
00415 {
00416 }
00417
00418
00419 void SinkAgent:: set_addr(ns_addr_t address)
00420 {
00421 here_=address;
00422 }
00423
00424
00425 int SinkAgent:: get_pk_count()
00426 {
00427 return pk_count;
00428 }
00429
00430
00431 void SinkAgent:: incr_pk_count()
00432 {
00433 pk_count++;
00434 }
00435
00436
00437 Packet * SinkAgent:: create_packet()
00438 {
00439 Packet *pkt = allocpkt();
00440
00441 if (pkt==NULL) return NULL;
00442
00443 hdr_cmn* cmh = HDR_CMN(pkt);
00444
00445 cmh->size() = 36;
00446
00447 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00448 dfh->ts_ = NOW;
00449
00450 return pkt;
00451 }
00452
00453
00454
00455
00456