Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

ew.cc

Go to the documentation of this file.
00001 // Copyright (c) 1999 by the University of Southern California
00002 // All rights reserved.
00003 //
00004 // Permission to use, copy, modify, and distribute this software and its
00005 // documentation in source and binary forms for non-commercial purposes
00006 // and without fee is hereby granted, provided that the above copyright
00007 // notice appear in all copies and that both the copyright notice and
00008 // this permission notice appear in supporting documentation. and that
00009 // any documentation, advertising materials, and other materials related
00010 // to such distribution and use acknowledge that the software was
00011 // developed by the University of Southern California, Information
00012 // Sciences Institute.  The name of the University may not be used to
00013 // endorse or promote products derived from this software without
00014 // specific prior written permission.
00015 //
00016 // THE UNIVERSITY OF SOUTHERN CALIFORNIA makes no representations about
00017 // the suitability of this software for any purpose.  THIS SOFTWARE IS
00018 // PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES,
00019 // INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
00020 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00021 //
00022 // Other copyrights might apply to parts of this software and are so
00023 // noted when applicable.
00024 //
00025 // ew.cc (Early warning system)
00026 //   by Xuan Chen (xuanc@isi.edu), USC/ISI
00027 
00028 #include "ip.h"
00029 #include "tcp.h"
00030 #include "tcp-full.h"
00031 #include "random.h"
00032 
00033 #include "ew.h"
00034 
00035 // Definition of High-Low Filter
00036 HLF::HLF() {
00037   alpha = 0;
00038   high = low = 0;
00039 }
00040 
00041 void HLF::reset(double value) {
00042   high = low = value;
00043 }
00044 
00045 void HLF::reset() {
00046   reset(0);
00047 }
00048 
00049 // Set Alpha
00050 void HLF::setAlpha(double value) {
00051   if (value > 1 || value < 0)
00052     return;
00053 
00054   if (value >= 0.5)
00055     alpha = value;
00056   else
00057     alpha = 1 - value;
00058 }
00059 
00060 // Get outputs from HLF
00061 double HLF::getHigh() {
00062   return(high);
00063 }
00064 
00065 double HLF::getLow() {
00066   return(low);
00067 }
00068 
00069 // update high-low filter
00070 // high(t) = alpha * high(t-1) + (1 - alpha) * o(t)
00071 // low(t) = (1 - alpha) * low(t-1) + alpha * o(t)
00072 void HLF::update(double input) {
00073   high = alpha * high + (1 - alpha) * input;
00074   low = (1 - alpha) * low + alpha * input;
00075   //  printf("HLF %d %.2f, %.2f\n", (int)now, high, low);
00076 }
00077 
00078 // Definition for a tocken-bucket rate limitor
00079 TBrateLimitor::TBrateLimitor() {
00080   TBrateLimitor(DEFAULT_TB_RATE_P);
00081 };
00082 
00083 TBrateLimitor::TBrateLimitor(double rate) {
00084   double now = Scheduler::instance().clock();
00085   pkt_mode = 1;
00086   bucket_size = DEFAULT_TB_SIZE;
00087   tocken_rate = 0;
00088   tocken_num = bucket_size;
00089   last_time = now;
00090   ini_tocken_rate = rate;
00091 
00092   setRate(rate);
00093   //printf("TB pkt_mode:%d, bucket_size:%g, tocken_num:%g, last_time:%g\n",
00094   //     pkt_mode, bucket_size, tocken_num, last_time);
00095 
00096   resetScore();
00097 };
00098 
00099 // adjust the rate limit to the default value
00100 void TBrateLimitor::setRate(double rate) {
00101   if (rate != tocken_rate) {
00102     last_tocken_rate = tocken_rate;
00103     tocken_rate = rate;
00104     printf("TR %d %.2f %d %d\n", (int)(Scheduler::instance().clock()), 
00105            tocken_rate, p_score, n_score);
00106   }
00107 }
00108 
00109 // adjust the rate limit to approaching an optimal rate limit
00110 void TBrateLimitor::adjustRate() {
00111   // pay the penalty
00112   adjustScore(-1);
00113 
00114   double rate = tocken_rate;
00115   if (p_score >= n_score)
00116     rate = tocken_rate * (1 + 0.2);
00117   else 
00118     rate = tocken_rate * (1 - 0.2);
00119 
00120   setRate(rate);
00121 }
00122 
00123 // Reset negative / positive score
00124 void TBrateLimitor::resetScore() {
00125   n_score = p_score = 0;
00126 }
00127 
00128 // adjust the score for increasing or decreasing scores
00129 void TBrateLimitor::adjustScore(int score) {
00130   // pay the penalty
00131   if (last_tocken_rate > tocken_rate)
00132     n_score += score;
00133   else
00134     p_score += score;
00135 }
00136 
00137 int TBrateLimitor::run(double incoming, double t_rate) {
00138   double now = Scheduler::instance().clock();
00139   double interval = now - last_time;
00140   
00141   //printf("TB: now:%g last_time:%g interval:%g; ", now, last_time, interval);
00142   tocken_num += interval * t_rate;
00143   last_time = now;
00144 
00145   // more tockens are overflowed
00146   if (tocken_num > bucket_size)
00147     tocken_num = bucket_size;
00148 
00149   //printf("tocken #:%g; ", tocken_num);
00150 
00151   // through (0 dropping probability)
00152   if (tocken_num >= incoming) {
00153     tocken_num -= incoming;
00154     //printf("...through\n");
00155     return 0;
00156   }
00157 
00158   // dropped
00159   //printf("...dropped\n");
00160   return 1;
00161 }
00162 
00163 int TBrateLimitor::run(double incoming) {
00164   return (run(incoming, tocken_rate));
00165 }
00166 
00167 // EW detector
00168 // Constructor
00169 EWdetector::EWdetector() {
00170   ew_src = ew_dst = -1;
00171 
00172   // reset measurement
00173   cur_rate = avg_rate = 0;
00174 
00175   // reset timers
00176   db_timer = dt_timer = 0;
00177 
00178   // reset alarm
00179   resetAlarm();
00180   resetChange();
00181 
00182   // High-low filter
00183   hlf.setAlpha(EW_FADING_FACTOR);
00184 }
00185 
00186 //EWdetector::~EWdetector() {
00187 //};
00188 
00189 // Enable detecting and debugging
00190 void EWdetector::setDt(int inv) {
00191   dt_inv = inv;
00192   //printf("DT: %d\n", dt_inv);
00193 };
00194 void EWdetector::setDb(int inv) {
00195   db_inv = inv;
00196   //printf("DB: %d\n", db_inv);
00197 };
00198 
00199 void EWdetector::setLink(int src, int dst) {
00200   ew_src = src;
00201   ew_dst = dst;
00202   //printf("EW: (%d:%d)\n", ew_src, ew_dst);
00203 };
00204 
00205 void EWdetector::setAlarm() {
00206   alarm = 1;
00207 
00208   // Reset low and high gain filters' input values to the long-term avg
00209   // Actually, there is no change to high gain filter
00210   hlf.reset(avg_rate);
00211 };
00212 
00213 void EWdetector::resetAlarm() {
00214   alarm = 0;
00215 
00216   // Reset low and high gain filters' input values to the long-term avg
00217   // Actually, there is no change to low gain filter
00218   hlf.reset(avg_rate);
00219 };
00220 
00221 // Set and reset change flag
00222 void EWdetector::setChange() {
00223   change = 1;
00224 }
00225 
00226 void EWdetector::resetChange() {
00227   change = 0;
00228 }
00229 
00230 // Test if the alarm has been triggered
00231 int EWdetector::testAlarm() {
00232   if (!change)
00233     return(EW_UNCHANGE);
00234   else 
00235     return(alarm);
00236 }
00237 
00238 // Update long term average
00239 void EWdetector::updateAvg() {
00240   // update the request rate
00241   // update the aggregated response rate
00242   // Update flip-flop filter
00243   hlf.update(cur_rate);
00244   
00245   // Update SWIN, not used any more.
00246   //updateSWin(cur_rate);
00247   //ravgSWin();
00248   //printSWin();
00249   
00250   // Update the long term average value with the output from different filters
00251   if (!alarm) {
00252     // Use low-gain filter for fast response
00253     avg_rate = hlf.getLow();
00254   } else {
00255     // Use high-gain filter to keep the long term average stable
00256     avg_rate = hlf.getHigh();
00257   }
00258 }
00259 
00260 // the detector's engine
00261 void EWdetector::run(Packet *pkt) {
00262   // get the time
00263   now = Scheduler::instance().clock();
00264   
00265   //printf("EW[%d:%d] run ", ew_src, ew_dst);
00266   // update the measurement 
00267   measure(pkt);
00268 
00269   // There is a timeout!
00270   if (now >= dt_timer) {
00271     // Start detection
00272     //printf("Detection timeout(%d)\n", (int)now);
00273     
00274     // 1. Update the current rate from measurement
00275     updateCur();
00276 
00277     // 2. Detect change and Trigger alarm if necessary
00278     // Compare the current rate with the long term average
00279     detect();
00280     
00281     // 3. Update the long term averages
00282     updateAvg();
00283 
00284     // setup the sleeping timer
00285     dt_timer = (int)now + dt_inv;
00286     //printf("%d\n", dt_inv);
00287 
00288     change = 1;
00289   }
00290   
00291   // Schedule debug
00292   if (db_inv && now >= db_timer) {
00293     //printf("debugB ");
00294     trace();
00295     db_timer = (int)now + db_inv;
00296   }
00297 }
00298 
00299 // end of EW detector
00300 
00301 // EW bit rate detector
00302 //Constructor.  
00303 EWdetectorB::EWdetectorB() : EWdetector() {
00304   drop_p = 0;
00305   arr_count = 0;
00306 
00307   adjustor = 1.0;
00308 
00309   // Initialize ALIST
00310   alist.head = alist.tail = NULL;
00311   alist.count = 0;
00312 
00313   swin.head = swin.tail = NULL;
00314   swin.count = swin.ravg = 0;
00315 }
00316 
00317 //Deconstructor.
00318 EWdetectorB::~EWdetectorB(){
00319   resetAList();
00320   resetSWin();
00321 }
00322 
00323 // Initialize the EW parameters
00324 void EWdetectorB::init(int ew_adj) {
00325   // EW adjustor (g = resp rate / request rate)
00326   adjustor = ew_adj;
00327 }
00328 
00329 // Update current measurement 
00330 void EWdetectorB::measure(Packet *pkt) {
00331   //printf(" before UA");
00332   // Conduct detection continously
00333   updateAList(pkt);
00334   //printf(" after UA");
00335 }
00336 
00337 // Update current measurement 
00338 void EWdetectorB::updateCur() {
00339   //printAList();
00340   // Record current aggregated response rate
00341   cur_rate = computeARR();
00342 }
00343 
00344 // Check if the packet belongs to existing flow
00345 int EWdetectorB::exFlow(Packet *pkt) {
00346   // Should check SYN packets to protect existing connections
00347   //   need to use FullTCP
00348   return(0);
00349 }
00350 
00351 // Conduct the measurement
00352 void EWdetectorB::updateAList(Packet *pkt) {
00353   hdr_cmn* hdr = hdr_cmn::access(pkt);
00354   hdr_ip* iph = hdr_ip::access(pkt);
00355   int dst_id = iph->daddr();
00356   int src_id = iph->saddr();
00357   int f_id = iph->flowid(); 
00358 
00359   // Get the corresponding id.
00360   //printf("EW[%d:%d] in detector\n", ew_src, ew_dst);
00361 
00362   AListEntry *p;
00363   p = searchAList(src_id, dst_id, f_id);
00364 
00365   // Add new entry to AList
00366   // keep the bytes sent by each aggregate in AList
00367   if (!p) {
00368     p = newAListEntry(src_id, dst_id, f_id);
00369   }
00370 
00371   // update the existing (or just created) entry in AList
00372   assert(p && p->f_id == f_id && p->src_id == src_id && p->dst_id == dst_id);
00373 
00374   // update the flow's arrival rate using TSW
00375   double bytesInTSW, newBytes;
00376   bytesInTSW = p->avg_rate * p->win_length;
00377   newBytes = bytesInTSW + (double) hdr->size();
00378   p->avg_rate = newBytes / (now - p->t_front + p->win_length);
00379   p->t_front = now;
00380 
00381   //printAListEntry(p, 0);
00382 }
00383 
00384 // Get the median for a part of AList 
00385 //   starting from index with count entries
00386 int EWdetectorB::getMedianAList(int index, int count) {
00387   int m;
00388   
00389   if (!count)
00390     return 0;
00391 
00392   sortAList();
00393   //printAList();
00394 
00395   // Pick the entry with median avg_rate
00396   m = (int) (count / 2);
00397   if (2 * m == count) {
00398     return((getRateAList(index + m - 1) + getRateAList(index + m)) / 2);
00399   } else {
00400     return(getRateAList(index + m));
00401   }
00402 }
00403 
00404 // Get the rate given the index in the list
00405 int EWdetectorB::getRateAList(int index) {
00406   struct AListEntry *p;
00407 
00408   //printf("%d\n", index);
00409   p = alist.head;
00410   for (int i = 0; i < index; i++) {
00411     if (p)
00412       p = p->next;
00413   }
00414   
00415   if (p)
00416     return ((int)p->avg_rate);
00417 
00418   printf("Error in AList!\n");
00419   return(0);
00420 }
00421 
00422 // Calculate the aggragated response rate for high-bandwidth flows
00423 int EWdetectorB::computeARR() {
00424   int i, agg_rate;
00425 
00426   // Explicit garbage collection first 
00427   //  before both choosing HBFs and searching AList
00428   //printf("before timeout ");
00429   timeoutAList();
00430   //printf("after timeout ");
00431 
00432   // do nothing if no entry exists
00433   if (!alist.count) 
00434     return 0;
00435 
00436   // Pick the 10% highest bandwidth flows
00437   arr_count = (int) (alist.count * 0.1 + 1);
00438 
00439   // Sort AList first
00440   sortAList();
00441 
00442   // Calculate the ARR for HBFs
00443   // Use mean
00444   agg_rate = 0;
00445   for (i = 0; i < arr_count; i++) {
00446     agg_rate += getRateAList(i);
00447   }
00448   
00449   if (i)
00450     agg_rate = (int) (agg_rate / i);
00451   else {
00452     printf("No MAX returned from ALIST!!!\n");
00453   }
00454   
00455   // Use median (the median for the list or median for HBFs?)
00456   //agg_rate = getMedianAList(0, k);
00457   //printf("%f %d %d %d\n", now, k, agg_rate, getMedianAList(0, k));
00458   
00459   return(agg_rate);
00460 }
00461 
00462 // Find the matched AList entry
00463 struct AListEntry * EWdetectorB::searchAList(int src_id, int dst_id, int f_id){
00464   AListEntry *p;
00465 
00466   // Explicit garbage collection first.
00467   //printf("before timeout ");
00468   timeoutAList();
00469   //printf("after timeout ");
00470   // Use src and dest pair and flow id:
00471   //   aggregate flows within the same request-response exchange
00472   // Timeout need to be set to a very small value in order to
00473   //   seperate different exchanges.
00474   p = alist.head;
00475   while (p && 
00476          (p->f_id != f_id || p->src_id != src_id || p->dst_id != dst_id)) {
00477     p = p->next;
00478   }
00479   
00480   return(p);
00481 }
00482 
00483 // Add new entry to AList
00484 struct AListEntry * EWdetectorB::newAListEntry(int src_id, int dst_id, int f_id) {
00485   AListEntry *p;
00486 
00487   p = new AListEntry;
00488   p->src_id = src_id;
00489   p->dst_id = dst_id;
00490   p->f_id = f_id;
00491   p->last_update = now;
00492   p->avg_rate = 0;
00493   // Since we are doing random sampling, 
00494   // the t_front should set to the beginning of this period instead of 0.
00495   p->t_front = now;
00496   p->win_length = 1;
00497   p->next = NULL;
00498   
00499   // Add new entry to AList
00500   if (alist.tail)
00501     alist.tail->next = p;
00502   alist.tail = p;
00503   
00504   if (!alist.head)
00505     alist.head = p;
00506   
00507   alist.count++;
00508 
00509   return(p);
00510 }
00511 
00512 // Find the entry with max avg_rate in AList
00513 struct AListEntry * EWdetectorB::getMaxAList() {
00514   struct AListEntry *p, *pp, *max, *pm;
00515 
00516   //printAList();
00517   // find the max entry and remove
00518   p = pp = alist.head;
00519   max = pm = p;
00520   
00521   while (p) {
00522     if (p->avg_rate > max->avg_rate) {
00523       pm = pp;
00524       max = p;
00525     }
00526     
00527     pp = p;
00528     p = p->next;
00529   }
00530   
00531   // remove max from AList
00532   if (alist.head == max)
00533     alist.head = max->next;
00534   
00535   if (pm != max)
00536     pm->next = max->next;
00537   
00538   max->next = NULL;
00539   //printAList();
00540 
00541   return(max);
00542 }
00543 
00544 // Sort AList based on the avg_rate
00545 void EWdetectorB::sortAList() {
00546   struct AListEntry *max, *head, *tail;
00547   
00548   if (!alist.head)
00549     return;
00550 
00551   //printAList();
00552   head = tail = NULL;
00553 
00554   while (alist.head) {
00555     // Get the entry with the max avg_rate
00556     max = getMaxAList();
00557     //printAListEntry(max, i);
00558     
00559     if (max) {
00560       // Add max to the tail of the new list
00561       if (tail)
00562         tail->next = max;
00563       tail = max;
00564       
00565       if (!head)
00566         head = max;
00567     }
00568   }
00569 
00570   alist.head = head;
00571   alist.tail = tail;
00572 
00573   //printAList();
00574 }
00575 
00576 // Timeout AList entries
00577 void EWdetectorB::timeoutAList() {
00578   AListEntry *p, *q;
00579   float to;
00580 
00581   to = EW_FLOW_TIME_OUT;
00582   if (dt_inv)
00583     to = dt_inv;
00584 
00585   // Expire the old entries in AList
00586   p = q = alist.head;
00587   while (p) {
00588     // Garbage collection
00589     if (p->last_update + to < now){
00590       // The coresponding flow is expired.      
00591       if (p == alist.head){
00592         if (p == alist.tail) {
00593           alist.head = alist.tail = NULL;
00594           free(p);
00595           p = q = NULL;
00596         } else {
00597           alist.head = p->next;
00598           free(p);
00599           p = q = alist.head;
00600         }
00601       } else {
00602         q->next = p->next;
00603         if (p == alist.tail)
00604           alist.tail = q;
00605         free(p);
00606         p = q->next;
00607       }
00608       alist.count--;
00609     } else {
00610       q = p;
00611       p = q->next;
00612     }
00613   }
00614 }
00615 
00616 // Reset AList
00617 void EWdetectorB::resetAList() {
00618   struct AListEntry *ap, *aq;
00619 
00620   ap = aq = alist.head;
00621   while (ap) {
00622     aq = ap;
00623     ap = ap->next;
00624     free(aq);
00625   }
00626   
00627   ap = aq = NULL;
00628   alist.head = alist.tail = NULL;  
00629   alist.count = 0;
00630 }
00631 
00632 
00633 
00634 // Reset SWin
00635 void EWdetectorB::resetSWin() {
00636   struct SWinEntry *p, *q;
00637 
00638   p = q = swin.head;
00639   while (p) {
00640     q = p;
00641     p = p->next;
00642     free(q);
00643   }
00644   
00645   p = q = NULL;
00646   swin.head = swin.tail = NULL;  
00647   swin.count = swin.ravg = 0;
00648 }
00649 
00650 // update swin with the latest measurement of aggregated response rate
00651 void EWdetectorB::updateSWin(int rate) {
00652   struct SWinEntry *p, *new_entry;
00653 
00654   new_entry = new SWinEntry;
00655   new_entry->rate = rate;
00656   new_entry->weight = 1;
00657   new_entry->next = NULL;
00658   
00659   if (swin.tail)
00660     swin.tail->next = new_entry;
00661   swin.tail = new_entry;
00662   
00663   if (!swin.head)
00664     swin.head = new_entry;
00665 
00666   // Reset current rate.
00667   if (swin.count < EW_SWIN_SIZE) {
00668     swin.count++;
00669   } else {
00670     p = swin.head;
00671     swin.head = p->next;
00672     free(p);
00673   }
00674 }
00675 
00676 // Calculate the running average over the sliding window
00677 void EWdetectorB::ravgSWin() {
00678   struct SWinEntry *p;
00679   float sum = 0;
00680   float t_weight = 0;
00681 
00682   //printf("Calculate running average over the sliding window:\n");
00683   p = swin.head;
00684   //printf("after p\n");
00685 
00686   while (p) {
00687     //printSWinEntry(p, i++);
00688     sum += p->rate * p->weight;
00689     t_weight += p->weight;
00690     p = p->next;
00691   }
00692   p = NULL;
00693   //printf("\n");  
00694 
00695   swin.ravg = (int)(sum / t_weight);
00696 
00697   //  printf("Ravg: %d\n", swin.ravg);
00698 }
00699 
00700 // detect the traffic change by 
00701 // comparing the current measurement with the long-term average
00702 //   trigger alarm if necessary.
00703 void EWdetectorB::detect() {
00704   // When ALARM:
00705   //  detect if it is the time to release the alarm
00706   // When NO ALARM:
00707   //  detect if it is the time to trigger the alarm
00708   if (alarm) {
00709     // Determine if an alarm should be released
00710     if (cur_rate > avg_rate * (1 + EW_DETECT_RANGE)) {
00711       // reset alarm
00712       resetAlarm();
00713     } 
00714   } else {
00715     // Determine if an alarm should be triggered
00716     //   need to be conservative!
00717     if (cur_rate < avg_rate) {
00718       setAlarm();
00719       
00720       // Initial drop_p to the MAX value whenever alarm triggered
00721       if (drop_p < EW_MAX_DROP_P)
00722         drop_p = EW_MAX_DROP_P;
00723     } else {
00724     }
00725   }
00726   
00727   // Determine the dropping probability
00728   //computeDropP();
00729 }
00730 
00731 // Determine the dropping probability based on current measurement
00732 void EWdetectorB::computeDropP() {
00733   double p = 0;
00734 
00735   if (alarm) {
00736     // Compute the dropping probability as a linear function of current rate
00737     //  p is computed towards the current measurement.
00738     p = 1;
00739     if (cur_rate)
00740       p = (avg_rate - cur_rate) * adjustor / cur_rate;
00741     
00742     // p could be greater than 1
00743     if (p > 1)
00744       p = 1;
00745     // p could also be negative
00746     if (p < 0)
00747       p = 0;
00748     
00749     // Compute the actual drop probability
00750     drop_p = EW_FADING_FACTOR * drop_p + (1 - EW_FADING_FACTOR) * p;    
00751     // adjust drop_p
00752     if (drop_p < EW_MIN_DROP_P)
00753       drop_p = EW_MIN_DROP_P;
00754     if (drop_p > EW_MAX_DROP_P)
00755       drop_p = EW_MAX_DROP_P;
00756   } else {
00757     // Fade out the drop_p when no alarm
00758     if (drop_p > 0) {
00759       if (drop_p <= EW_MIN_DROP_P)
00760         drop_p = 0;
00761       else {
00762         drop_p = EW_FADING_FACTOR * drop_p;
00763       }
00764     }
00765   }
00766 }
00767 
00768 // Decreas the sample interval
00769 void EWdetectorB::decSInv() {
00770   // Need some investigation for the min allowed detection interval
00771   //  if (s_inv / 2 > EW_MIN_SAMPLE_INTERVAL) {
00772   // s_inv = s_inv / 2;
00773     
00774     //printf("SINV decreased by 2.\n");
00775   //}
00776 }
00777 
00778 // Increase the sample interval
00779 void EWdetectorB::incSInv() {
00780   //if(s_inv * 2 <= init_s_inv) {
00781   //  s_inv = s_inv * 2;
00782   
00783   //printf("SINV increased by 2.\n");
00784   // }
00785 }
00786 
00787 // Prints one entry in SWin
00788 void EWdetectorB::printSWin() {
00789   struct SWinEntry *p;
00790   printf("%f SWIN[%d, %d]", now, swin.ravg, swin.count);
00791   p = swin.head;
00792   int i = 0;
00793   while (p) {
00794     printSWinEntry(p, i++);
00795     p = p->next;
00796   }
00797   p = NULL;
00798   printf("\n");
00799 }
00800 
00801 // Print the contents in SWin
00802 void EWdetectorB::printSWinEntry(struct SWinEntry *p, int i) {
00803   if (p)
00804     printf("[%d: %d %.2f] ", i, p->rate, p->weight);
00805 }
00806 
00807 // Print one entry in AList
00808 void EWdetectorB::printAListEntry(struct AListEntry *p, int i) {
00809   if (!p)
00810     return;
00811 
00812   printf("[%d] %d (%d %d) %.2f %.2f\n", i, p->f_id, p->src_id, p->dst_id, 
00813          p->avg_rate, p->last_update);
00814 }
00815 
00816 
00817 // Print the entries in AList
00818 void EWdetectorB::printAList() {
00819   struct AListEntry *p;
00820   printf("%f AList(%d):\n", now, alist.count);
00821 
00822   p = alist.head;
00823   int i = 0;
00824   while (p) {
00825     printAListEntry(p, i);
00826     i++;
00827     p = p->next;
00828   }
00829   p = NULL;
00830   printf("\n");
00831 }
00832 
00833 // Trace bit rate (resp rate)
00834 void EWdetectorB::trace() {
00835   double db_rate = 0;
00836   double m_rate = 0;
00837 
00838   timeoutAList();
00839   m_rate = getMedianAList(0, alist.count);
00840   //printf("B ");
00841   db_rate = computeARR();
00842 
00843   if (!m_rate || !db_rate);
00844     //printAList();
00845 
00846   printf("B %d %.2f %.2f %d %d %.2f %.2f\n", 
00847          (int)now, cur_rate, avg_rate, arr_count, alarm, db_rate, m_rate);
00848 }
00849 
00850 // EW packet detector
00851 EWdetectorP::EWdetectorP() : EWdetector() {
00852   // Packet stats
00853   cur_p.arrival = cur_p.dept = cur_p.drop = 0;
00854   last_p.arrival = last_p.dept = last_p.drop = 0;
00855   last_p_db.arrival = last_p_db.dept = last_p_db.drop = 0;
00856 }
00857 
00858 EWdetectorP::~EWdetectorP(){
00859   // Packet stats
00860   cur_p.arrival = cur_p.dept = cur_p.drop = 0;
00861   last_p.arrival = last_p.dept = last_p.drop = 0;
00862 }
00863 
00864 // get the current request rate
00865 double EWdetectorP::getRate() {
00866   return(cur_rate);
00867 }
00868 
00869 // update packet stats
00870 void EWdetectorP::updateStats(int flag) {
00871   // Packet arrival
00872   if (flag == PKT_ARRIVAL) {
00873     cur_p.arrival++;
00874     return;
00875   }
00876 
00877   // Packet departure
00878   if (flag == PKT_DEPT) {
00879     cur_p.dept++;
00880     return;
00881   }
00882 
00883   // Packet dropped
00884   if (flag == PKT_DROP) {
00885     cur_p.drop++;
00886     return;
00887   }
00888 }
00889 
00890 // Detect changes in packet rate
00891 void EWdetectorP::detect() {
00892   if (cur_rate > avg_rate * (1 + EW_DETECT_RANGE)) {
00893     if (!alarm) {
00894       setAlarm();
00895     }
00896   } else if (cur_rate < avg_rate * (1 - EW_DETECT_RANGE)) {
00897     if (alarm)
00898       resetAlarm();
00899   }
00900 }
00901 
00902 // Update current measurement
00903 void EWdetectorP::updateCur() {
00904   // measure the accepted packet rate (rather than arrival rate)
00905   cur_rate = (cur_p.dept - last_p.dept) / dt_inv;
00906 
00907   // keep the current value
00908   last_p.arrival = cur_p.arrival;
00909   last_p.dept = cur_p.dept;
00910   last_p.drop = cur_p.drop;
00911 }
00912 
00913 // Update long term average
00914 /*void EWdetectorP::updateAvg() {
00915   avg_rate = (int)(hlf.alpha * avg_rate + (1 - hlf.alpha) * cur_rate);
00916 }
00917 */
00918 // Update stats
00919 void EWdetectorP::measure(Packet *pkt) {
00920   
00921 
00922   // stats on packet departure and drop are collect in policer 
00923 }
00924 
00925 // Trace packet incoming rate (req rate)
00926 void EWdetectorP::trace() {
00927   printf("P %d %.2f %.2f %d %d %d %d %d %d %d\n", 
00928          (int)now, cur_rate, avg_rate, alarm,
00929          cur_p.arrival - last_p_db.arrival,
00930          cur_p.dept - last_p_db.dept,
00931          cur_p.drop - last_p_db.drop,  
00932          cur_p.arrival, cur_p.dept, cur_p.drop);
00933 
00934   last_p_db.arrival = cur_p.arrival;
00935   last_p_db.dept = cur_p.dept;
00936   last_p_db.drop = cur_p.drop;
00937 }
00938 
00939 // EW Policy: deal with queueing stuffs.
00940 //Constructor.  
00941 EWPolicy::EWPolicy() : Policy() {
00942   // Initialize detectors
00943   ewB = cewB = NULL;
00944   ewP = cewP = NULL;
00945   
00946   // Initialize rate limitor
00947   rlP = rlB = NULL;
00948 
00949   max_p = max_b = 0;
00950   alarm = pre_alarm = 0;
00951   change = 0;
00952 }
00953 
00954 //Deconstructor.
00955 EWPolicy::~EWPolicy(){
00956   if (ewB)
00957     free(ewB);
00958 
00959   if (ewP)
00960     free(ewP);
00961 
00962   if (cewB)
00963     free(cewB);
00964 
00965   if (cewP)
00966     free(cewP);
00967 }
00968 
00969 // Initialize the EW parameters
00970 void EWPolicy::init(int adj, int src, int dst) {
00971   ew_adj = adj;
00972   qsrc = src;
00973   qdst = dst;
00974 }
00975 
00976 // EW meter: do nothing.
00977 //  measurement is done in policer: we need to know whether the packet is
00978 //    dropped or not.
00979 void EWPolicy::applyMeter(policyTableEntry *policy, Packet *pkt) {
00980   return;
00981 }
00982 
00983 // EW Policer
00984 //  1. do measurement: P: both arrival and departure; B: only departure
00985 //  2. make packet drop decisions
00986 int EWPolicy::applyPolicer(policyTableEntry *policy, policerTableEntry *policer, Packet *pkt) {
00987   //printf("enter applyPolicer ");
00988 
00989   // can't count/penalize ACKs:
00990   //   with resp: may cause inaccurate calculation with TSW(??)
00991   //   with req:  may cause resp retransmission.
00992   // just pass them through
00993   hdr_cmn *th = hdr_cmn::access(pkt);
00994   if (th->ptype() == PT_ACK)
00995     return(policer->initialCodePt);
00996 
00997   // for other packets...
00998 
00999   // Get time
01000   now = Scheduler::instance().clock();
01001 
01002   // keep arrival packet stats
01003   if (ewP)
01004     ewP->updateStats(PKT_ARRIVAL);
01005 
01006   // For other packets:
01007   if (dropPacket(pkt)) {
01008     // keep packet stats
01009     if (ewP)
01010       ewP->updateStats(PKT_DROP);
01011     
01012     //printf("downgrade!\n");   
01013     return(policer->downgrade1);
01014   } else {
01015     // keep packet stats
01016     if (ewP)
01017       ewP->updateStats(PKT_DEPT);
01018 
01019     // conduct EW detection
01020     if (ewP)
01021       ewP->run(pkt);
01022     
01023     if (ewB)
01024       ewB->run(pkt);    
01025 
01026     //printf("initial!\n");     
01027     return(policer->initialCodePt);
01028   }
01029 }
01030 
01031 // detect if there is alarm triggered
01032 void EWPolicy::detect(Packet *pkt) {
01033   int alarm_b, alarm_p;
01034 
01035   alarm_b = alarm_p = 0;
01036 
01037   if (!ewP || ! cewB)
01038     return;
01039   
01040   alarm_b = cewB->testAlarm();
01041   alarm_p = ewP->testAlarm();
01042   
01043 
01044   if (alarm_p == EW_UNCHANGE || alarm_b == EW_UNCHANGE)
01045     return;
01046 
01047   // Need to get info from both parts to make a decision
01048   // Reset change flags
01049   ewP->resetChange();
01050   cewB->resetChange();
01051 
01052   change = 1;
01053   // keep the old value of alarm
01054   pre_alarm = alarm;
01055 
01056   // As long as alarm_b is 0, reset the alarm
01057   if (alarm_b == 0)
01058     alarm = 0;
01059   else if (alarm_p == 0)
01060     alarm = 0;
01061   else 
01062     alarm = 1;
01063 
01064   printf("ALARM %d %d\n", pre_alarm, alarm);
01065 }
01066 
01067 //  make packet drop decisions
01068 int EWPolicy::dropPacket(Packet *pkt) {
01069   // 1. arrival stats is measured in meter (departure and drops here)
01070   // 2. No penalty to response traffic!!
01071   // 3. need to protect existing connections
01072 
01073   // pass EW if there is any
01074   if (cewB && ewP) {
01075     // protecting existing connections
01076     //  drop requests for new connection (SYN packet)
01077     //    if (cewB->exFlow(pkt))
01078     hdr_tcp *tcph = hdr_tcp::access(pkt);
01079     // Protecting non-SYN packets: existing connections
01080     if ((tcph->flags() & TH_SYN) == 0) {
01081       //return(0);
01082     }
01083 
01084     // Check alarm
01085     detect(pkt);
01086 
01087     if (change) {
01088       // for new incoming requests:
01089       //   use EW measurement to adjust the rate limit (to current Rq)
01090       // see if the alarm should be reset
01091       
01092       if (pre_alarm) {
01093         if (alarm) {
01094           // The rate is not right:
01095           //   too low: too few connection admitted;
01096           //   too high: congestion in response
01097           // Adjustment is needed.
01098           if (rlP)
01099             rlP->adjustRate();
01100         } else {
01101           // the current rate is ok, award the current choice
01102           if (rlP)
01103             rlP->adjustScore(1);
01104         }
01105       } else {
01106         if (alarm) {
01107           if (rlP) {
01108             // Start a new round
01109             rlP->resetScore();
01110             // Use current request rate as the rate limit
01111             rlP->setRate(ewP->getRate());
01112           }
01113         } else {
01114           // the current rate is ok
01115         }
01116       }    
01117       
01118       change = 0;
01119     }  
01120   }
01121 
01122   // Passing rate limitor if there is any
01123   if (rlP) {
01124     // rate limiting
01125     return(rlP->run(1));
01126   };
01127   
01128   // through by default
01129   return(0);
01130 }
01131 
01132 // Enable detecting on packet incoming rate (req rate)
01133 void EWPolicy::detectPr(int dt_inv, int db_inv) {
01134   ewP = new EWdetectorP;
01135   ewP->setLink(qsrc, qdst);
01136   ewP->setDt(dt_inv);
01137   ewP->setDb(db_inv);
01138 }
01139 
01140 void EWPolicy::detectPr(int dt_inv) {
01141   detectPr(dt_inv, dt_inv);
01142 }
01143 
01144 void EWPolicy::detectPr() {
01145   detectPr(EW_DT_INV, EW_DB_INV);
01146 }
01147 
01148 // Enable detecting and debugging bit rate (eg: resp rate)
01149 void EWPolicy::detectBr(int dt_inv, int db_inv) {
01150   ewB = new EWdetectorB;
01151   ewB->init(ew_adj);
01152   ewB->setLink(qsrc, qdst);
01153   ewB->setDt(dt_inv);
01154   ewB->setDb(db_inv);
01155 }
01156 
01157 void EWPolicy::detectBr(int dt_inv) {
01158   detectBr(dt_inv, dt_inv);
01159 }
01160 
01161 void EWPolicy::detectBr() {
01162   detectBr(EW_DT_INV, EW_DB_INV);
01163 }
01164 
01165 // Rate limitor: packet rate
01166 void EWPolicy::limitPr(double rate) {
01167   //printf("PR %d\n", rate);
01168   rlP = new TBrateLimitor(rate);
01169 };
01170 
01171 // Rate limitor: bit rate
01172 void EWPolicy::limitBr(double rate) {
01173   //printf("BR %d\n", rate);
01174   rlB = new TBrateLimitor(rate);
01175 };
01176 
01177 // Rate limitor: packet rate
01178 void EWPolicy::limitPr() {
01179   limitPr(DEFAULT_TB_RATE_P);
01180 };
01181 
01182 // Rate limitor: bit rate
01183 void EWPolicy::limitBr() {
01184   limitBr(DEFAULT_TB_RATE_B);
01185 };
01186 
01187 // couple EW detector
01188 void EWPolicy::coupleEW(EWPolicy *ewpc) {
01189   coupleEW(ewpc, 0);
01190 }
01191 
01192 // couple EW detector
01193 void EWPolicy::coupleEW(EWPolicy *ewpc, double rate) {
01194   // couple the EW detector 
01195   cewB = ewpc->ewB;
01196   
01197   // Setup rate limitor with the default limit
01198   if (rate)
01199     limitPr(rate);
01200   else
01201     limitPr();
01202 }
01203 // End of EWP

Generated on Tue Apr 20 12:14:16 2004 for NS2.26SourcesOriginal by doxygen 1.3.3