00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "rate-limit-strategy.h"
00038
00039 RateLimitStrategy::RateLimitStrategy(double rate, int ptype, double share, double estimate) {
00040
00041 target_rate_ = rate;
00042 reset_time_ = Scheduler::instance().clock();
00043
00044 ptype_ = ptype;
00045 ptype_share_ = share;
00046
00047
00048
00049 rateEstimator_ = new RateEstimator(estimate);
00050 rateLimiter_ = new TokenBucketRateLimiter();
00051
00052 ptypeRateEstimator_ = new RateEstimator();
00053 ptypeRateLimiter_ = new TokenBucketRateLimiter();
00054
00055 ptypeLog_ = new PacketTypeLog(this);
00056 }
00057
00058 double
00059 RateLimitStrategy::process(Packet *p, int mine, int lowDemand) {
00060
00061 rateEstimator_->estimateRate(p);
00062 ptypeLog_->log(p);
00063
00064 hdr_cmn* hdr = HDR_CMN(p);
00065
00066 int dropped = 0;
00067 if (hdr->ptype() == ptype_) {
00068 ptypeRateEstimator_->estimateRate(p);
00069 dropped = ptypeRateLimiter_->rateLimit(p, ptypeRateEstimator_->estRate_,
00070 target_rate_*ptype_share_, mine, lowDemand);
00071 }
00072
00073 if (dropped) return 1;
00074
00075 dropped = rateLimiter_->rateLimit(p, rateEstimator_->estRate_, target_rate_, mine, lowDemand);
00076 return dropped;
00077 }
00078
00079 void
00080 RateLimitStrategy::restrictPacketType(int type, double share, double actual) {
00081
00082 if (type == ptype_) {
00083 return;
00084 }
00085
00086 printf("RLSt: Restricting type %d to %g hogging=%g at %g \n", type, share, actual,
00087 Scheduler::instance().clock());
00088 ptype_ = type;
00089 ptype_share_ = share;
00090 ptypeRateEstimator_->estRate_ = rateEstimator_->estRate_*actual;
00091 }
00092
00093 double
00094 RateLimitStrategy::getDropRate() {
00095 double inRate = rateEstimator_->estRate_;
00096 double dropRate = (inRate - target_rate_)/inRate;
00097 if (dropRate < 0) dropRate=0;
00098 return dropRate;
00099 }
00100
00101 double
00102 RateLimitStrategy::getArrivalRate() {
00103 return rateEstimator_->estRate_;
00104 }
00105
00106 void
00107 RateLimitStrategy::reset() {
00108 rateLimiter_->reset();
00109 ptypeRateLimiter_->reset();
00110 rateEstimator_->reset();
00111 ptypeRateEstimator_->reset();
00112 }
00113
00114 RateLimitStrategy::~RateLimitStrategy() {
00115 delete(rateLimiter_);
00116 delete(ptypeRateLimiter_);
00117 delete(rateEstimator_);
00118 delete(ptypeRateEstimator_);
00119 delete(ptypeLog_);
00120 }
00121
00122
00123
00124 PacketTypeLog::PacketTypeLog(RateLimitStrategy * rlst) {
00125 count_=0;
00126 for (int i=0; i<MAX_PACKET_TYPES; i++) {
00127 typeCount_[i]=0;
00128 }
00129 rlStrategy_ = rlst;
00130
00131 resched(PACKET_TYPE_TIMER);
00132 }
00133
00134 void
00135 PacketTypeLog::log(Packet *p) {
00136 hdr_cmn* hdr = HDR_CMN(p);
00137 int type = hdr->ptype();
00138
00139 int index = mapTypeToIndex(type);
00140
00141 typeCount_[index]++;
00142 count_++;
00143 }
00144
00145 void
00146 PacketTypeLog::expire(Event * e) {
00147
00148
00149
00150 if (!count_) {
00151 resched(PACKET_TYPE_TIMER);
00152 return;
00153 }
00154
00155 for (int i=0; i<MAX_PACKET_TYPES; i++) {
00156 if (typeCount_[i]!=0) {
00157 double actualShare = ((double)typeCount_[i])/count_;
00158 int type = mapIndexToType(i);
00159 double maxShare = mapTypeToShare(type);
00160 if (actualShare>maxShare) {
00161
00162 rlStrategy_->restrictPacketType(type, maxShare, actualShare);
00163 resched(PACKET_TYPE_TIMER);
00164 return;
00165 }
00166 }
00167 }
00168
00169
00170 rlStrategy_->restrictPacketType(-1, 1, 1);
00171 resched(PACKET_TYPE_TIMER);
00172 return;
00173 }
00174
00175 int
00176 PacketTypeLog::mapTypeToIndex(int type) {
00177
00178 switch (type) {
00179 case PT_PING: return 0;
00180 case PT_UDP: return 1;
00181 case PT_CBR: return 2;
00182 default: return MAX_PACKET_TYPES-1;
00183 }
00184 }
00185
00186 int
00187 PacketTypeLog::mapIndexToType(int index) {
00188 switch (index) {
00189 case 0: return PT_PING;
00190 case 1: return PT_UDP;
00191 case 2: return PT_CBR;
00192 case MAX_PACKET_TYPES-1: return -1;
00193 default: printf("PTLog: invalid index\n"); exit(-1);
00194 }
00195 }
00196
00197
00198 double
00199 PacketTypeLog::mapTypeToShare(int type) {
00200 switch (type) {
00201 case PT_PING: return 1.0;
00202 case PT_UDP: return 1.0;
00203 case PT_CBR: return 1.0;
00204 default: return 1;
00205 }
00206 }
00207
00208 PacketTypeLog::~PacketTypeLog() {
00209 cancel();
00210 }
00211
00212
00213
00214 TokenBucketRateLimiter::TokenBucketRateLimiter() {
00215
00216 bucket_depth_ = DEFAULT_BUCKET_DEPTH;
00217 tbucket_ = bucket_depth_;
00218 total_passed_ = 0.0;
00219 total_dropped_ = 0.0;
00220 time_last_token_ = Scheduler::instance().clock();
00221 }
00222
00223 int
00224 TokenBucketRateLimiter::rateLimit(Packet * p, double arrRate, double targetRate, int mine, int lowDemand) {
00225
00226 hdr_cmn* hdr = HDR_CMN(p);
00227 double now = Scheduler::instance().clock();
00228 double time_elapsed = now - time_last_token_;
00229
00230
00231 tbucket_ += (time_elapsed * targetRate)/8.0;
00232 time_last_token_ = now;
00233
00234 if (tbucket_ > bucket_depth_)
00235 tbucket_ = bucket_depth_;
00236
00237
00238
00239
00240 if ((double)hdr->size_ < tbucket_ || (mine && lowDemand) ) {
00241 tbucket_ -= hdr->size_;
00242 total_passed_ += (double) hdr->size_;
00243
00244 return 0;
00245 }
00246 else {
00247 total_dropped_ += (double) hdr->size_;
00248
00249 return 1;
00250 }
00251 }
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261 void
00262 TokenBucketRateLimiter::reset() {
00263 total_dropped_ = 0;
00264 total_passed_ = 0;
00265 tbucket_ = 0;
00266 time_last_token_ = Scheduler::instance().clock();
00267 }
00268