00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include <assert.h>
00029
00030 #include "config.h"
00031 #include "tclcl.h"
00032 #include "mftp_rcv.h"
00033
00034 #include "ip.h"
00035
00036 #define min(a, b) ((a) < (b) ? (a) : (b))
00037
00038 static class MFTPRcvAgentClass : public TclClass {
00039 public:
00040 MFTPRcvAgentClass() : TclClass("Agent/MFTP/Rcv") {}
00041 TclObject* create(int, const char*const*) {
00042 return (new MFTPRcvAgent());
00043 }
00044 } class_mftprcv_agent;
00045
00046
00047 MFTPRcvAgent::MFTPRcvAgent()
00048 : MFTPAgent(),
00049 CurrentPass(0),
00050 CurrentGroup(0),
00051 CwPat(0),
00052 FileDGramsReceived(0),
00053 FseekOffset(0),
00054 cw_matrixline_buf(NULL)
00055 {
00056 bind("reply_addr_", (int*)&reply_.addr_);
00057 bind("reply_port_", (int*)&reply_.port_);
00058 }
00059
00060
00061 int MFTPRcvAgent::command(int argc, const char*const* argv)
00062 {
00063 Tcl& tcl = Tcl::instance();
00064 if(strcmp(argv[1], "send") == 0) {
00065 if(strcmp(argv[2], "nak") == 0) {
00066 unsigned long pass_nb, block_nb;
00067 int nb_scanned = 0;
00068
00069 nb_scanned += sscanf(argv[3], "%lu", &pass_nb);
00070 nb_scanned += sscanf(argv[4], "%lu", &block_nb);
00071 assert(nb_scanned == 2);
00072 send_nak(pass_nb, block_nb);
00073 return TCL_OK;
00074 }
00075 } else if (strcmp(argv[1], "start") == 0) {
00076 if(dtusPerBlock_ % 8 != 0) {
00077 tcl.resultf("%s: dtusPerBlock_ must be a multiple of 8", name_);
00078 return TCL_ERROR;
00079 }
00080 init();
00081 return TCL_OK;
00082 }
00083 return Agent::command(argc, argv);
00084 }
00085
00086
00087 void MFTPRcvAgent::recv(Packet* p, Handler* h)
00088 {
00089 hdr_ip* ih = hdr_ip::access(p);
00090 hdr_mftp* mh = hdr_mftp::access(p);
00091
00092 if(ih->daddr() == 0) {
00093
00094 fprintf(stderr, "%s: send not allowed with Agent/MFTP/Rcv\n", name_);
00095 assert(false);
00096 } else {
00097 switch(mh->type) {
00098 case hdr_mftp::PDU_DATA_TRANSFER:
00099 recv_data(mh->spec.data);
00100 break;
00101 case hdr_mftp::PDU_STATUS_REQUEST:
00102 recv_status_req(mh->spec.statReq);
00103 break;
00104 case hdr_mftp::PDU_NAK:
00105
00106 break;
00107 default:
00108 assert(false);
00109 }
00110 Packet::free(p);
00111 }
00112 }
00113
00114
00115 MFTPRcvAgent::~MFTPRcvAgent()
00116 {
00117
00118 delete [] cw_matrixline_buf;
00119 }
00120
00121
00122 void MFTPRcvAgent::init()
00123 {
00124 MFTPAgent::init();
00125
00126
00127 assert(cw_matrixline_buf == NULL);
00128 cw_matrixline_buf = new CW_MATRIXLINE_t[FileDGrams];
00129 assert(cw_matrixline_buf != NULL);
00130
00131
00132
00133 cw_matrixlines_reset();
00134 }
00135
00136
00137
00138 void MFTPRcvAgent::recv_status_req(hdr_mftp::Spec::StatReq& statreq)
00139 {
00140 Tcl& tcl = Tcl::instance();
00141
00142
00143 tcl.evalf("%s recv status-req %lu %lu %lu %lf", name_,
00144 (unsigned long) statreq.pass_nb,
00145 (unsigned long) statreq.block_lo,
00146 (unsigned long) statreq.block_hi,
00147 (double) statreq.RspBackoffWindow);
00148 }
00149
00150
00151 void MFTPRcvAgent::send_nak(unsigned long pass_nb, unsigned long block_nb)
00152 {
00153 assert(FileDGrams > 0);
00154 assert(0 <= block_nb && block_nb < nb_blocks());
00155
00156 Tcl& tcl = Tcl::instance();
00157
00158
00159 unsigned long start_group_nb = dtus_per_block * block_nb;
00160
00161
00162 unsigned long end_group_nb = min(nb_groups, dtus_per_block * (block_nb + 1));
00163
00164
00165 unsigned long n = end_group_nb - start_group_nb;
00166
00167
00168 const unsigned long nak_bytes = (n+7) / 8;
00169
00170 unsigned long bit_count = 0;
00171
00172
00173 Packet* p = Agent::allocpkt((n+7) / 8);
00174
00175 unsigned char* nak_bitmap = (unsigned char*) p->accessdata();
00176
00177
00178 memset(nak_bitmap, 0, nak_bytes);
00179
00180
00181 for(unsigned long group_nb = start_group_nb, bit = 1 << (start_group_nb % 8);
00182 group_nb < end_group_nb; ++group_nb) {
00183 if(is_group_full(group_nb) == false) {
00184 *nak_bitmap |= bit;
00185 bit_count++;
00186 }
00187 if(bit == 128) {
00188 bit = 1;
00189 nak_bitmap++;
00190 } else {
00191 bit <<= 1;
00192 }
00193 }
00194
00195 if(bit_count > 0) {
00196 hdr_ip* iph = hdr_ip::access(p);
00197 hdr_mftp* hdr = hdr_mftp::access(p);
00198 hdr_cmn* ch = hdr_cmn::access(p);
00199
00200
00201 iph->dst() = reply_;
00202 ch->size() = sizeof(hdr_mftp);
00203
00204 hdr->type = hdr_mftp::PDU_NAK;
00205 hdr->spec.nak.pass_nb = pass_nb;
00206 hdr->spec.nak.block_nb = block_nb;
00207 hdr->spec.nak.nak_count = bit_count;
00208
00209
00210 target_->recv(p);
00211 }
00212 else {
00213 Packet::free(p);
00214
00215 }
00216 tcl.resultf("%lu", bit_count);
00217 }
00218
00219
00220
00221 int MFTPRcvAgent::process_packet(CW_PATTERN_t cw_pat,
00222 unsigned long group_nb, unsigned long dtu_nb)
00223 {
00224 CW_PATTERN_t bit;
00225 CW_MATRIXLINE_t new_row;
00226
00227 unsigned long j;
00228 unsigned long finish = get_dtus_per_group(group_nb);
00229
00230
00231 new_row.left = cw_pat;
00232
00233 for(j = 0; j < finish; j++) {
00234 CW_PATTERN_t line_pat = cw_matrixline_buf[j * nb_groups + group_nb].left;
00235 if(line_pat != 0) {
00236 bit = new_row.left & ((CW_PATTERN_t) 1 << minbit(line_pat));
00237 if(bit != 0) {
00238 new_row.left ^= line_pat;
00239 }
00240 }
00241 }
00242 if(new_row.left != 0) {
00243 bit = (CW_PATTERN_t) 1 << minbit(new_row.left);
00244 for(j = 0; j < finish; j++) {
00245 if((bit & cw_matrixline_buf[j * nb_groups + group_nb].left) != 0) {
00246 cw_matrixline_buf[j * nb_groups + group_nb].left ^= new_row.left;
00247 }
00248 }
00249
00250
00251
00252 cw_matrixline_buf[dtu_nb * nb_groups + group_nb].left = new_row.left;
00253 return 1;
00254
00255 }
00256 else {
00257 return 0;
00258 }
00259 }
00260
00261
00262 int MFTPRcvAgent::findStoreLocation(unsigned long group_nb, unsigned long seek_offset, unsigned long* dtu_nb)
00263 {
00264 unsigned long start_dtu_nb;
00265
00266 assert(0 <= group_nb && group_nb < nb_groups);
00267 assert(seek_offset % dtu_size == 0 ||
00268 seek_offset == FileSize);
00269
00270 if(seek_offset == FileSize) {
00271 *dtu_nb = group_nb;
00272 } else {
00273 unsigned long curr_dtu_nb = FseekOffset / dtu_size;
00274
00275
00276 *dtu_nb = curr_dtu_nb - curr_dtu_nb % nb_groups;
00277 *dtu_nb += group_nb;
00278
00279
00280
00281 if(*dtu_nb < curr_dtu_nb) {
00282 *dtu_nb += nb_groups;
00283 }
00284 }
00285 if(*dtu_nb >= FileDGrams) {
00286
00287
00288 *dtu_nb = group_nb;
00289 }
00290 start_dtu_nb = *dtu_nb;
00291 assert(start_dtu_nb < FileDGrams);
00292
00293 do {
00294 if(! cw_matrixline_buf[*dtu_nb].left) {
00295 return 1;
00296 }
00297 *dtu_nb += nb_groups;
00298 if(*dtu_nb >= FileDGrams) {
00299 *dtu_nb = group_nb;
00300 }
00301 } while(*dtu_nb != start_dtu_nb);
00302 return 0;
00303 }
00304
00305
00306
00307
00308 void MFTPRcvAgent::cw_matrixlines_reset()
00309 {
00310 assert(0 <= FileDGrams);
00311 memset(cw_matrixline_buf, 0, sizeof(CW_MATRIXLINE_t) * FileDGrams);
00312 }
00313
00314
00315
00316 bool MFTPRcvAgent::is_group_full(unsigned long group_nb)
00317 {
00318 unsigned long nb_dtus = get_dtus_per_group(group_nb);
00319 unsigned long i;
00320
00321 assert(0 <= group_nb && group_nb < nb_groups);
00322
00323 for(i = 0; i < nb_dtus &&
00324 cw_matrixline_buf[i * nb_groups + group_nb].left != 0; ++i)
00325 ;
00326 return (i == nb_dtus) ? true : false;
00327
00328
00329 }
00330
00331
00332
00333 int MFTPRcvAgent::recv_data(hdr_mftp::Spec::Data& data)
00334 {
00335 Tcl& tcl = Tcl::instance();
00336 unsigned long seek_offset;
00337 unsigned long dtu_nb;
00338
00339
00340
00341 CurrentPass = data.pass_nb;
00342 CurrentGroup = data.group_nb;
00343 CwPat = data.cw_pat;
00344
00345
00346
00347
00348
00349 assert(0 <= CurrentPass);
00350 assert(0 <= CurrentGroup && CurrentGroup < nb_groups);
00351
00352
00353 if(findStoreLocation(CurrentGroup, FseekOffset, &dtu_nb)) {
00354
00355 assert(dtu_nb % nb_groups == CurrentGroup);
00356 assert(0 <= dtu_nb && dtu_nb < FileDGrams);
00357
00358 if(process_packet(CwPat,
00359 CurrentGroup,
00360 dtu_nb / nb_groups)) {
00361 cw_matrixline_buf[dtu_nb].right = CwPat;
00362
00363
00364
00365 char buf[8 * sizeof(CW_PATTERN_t) + 1];
00366 CwPat.print(buf);
00367 tcl.evalf("%s recv useful %lu %lu %s",
00368 name_,
00369 (unsigned long) CurrentPass,
00370 (unsigned long) CurrentGroup,
00371 (char*) buf);
00372
00373
00374 seek_offset = dtu_nb * dtu_size;
00375 if(dtu_nb == FileDGrams - 1) {
00376
00377
00378
00379
00380
00381 } else {
00382
00383 if(FseekOffset != seek_offset) {
00384
00385 FseekOffset = seek_offset;
00386 seekCount_++;
00387 }
00388
00389 FseekOffset += dtu_size;
00390 }
00391
00392 FileDGramsReceived++;
00393
00394
00395 if (FileDGramsReceived == FileDGrams) {
00396
00397
00398
00399
00400 char buf[8 * sizeof(CW_PATTERN_t) + 1];
00401 CwPat.print(buf);
00402 tcl.evalf("%s done-notify %lu %lu %s",
00403 name_,
00404 (unsigned long) CurrentPass,
00405 (unsigned long) CurrentGroup,
00406 (char*) buf);
00407 return(0);
00408 }
00409 }
00410 else {
00411 char buf[8 * sizeof(CW_PATTERN_t) + 1];
00412 CwPat.print(buf);
00413 tcl.evalf("%s recv dependent %lu %lu %s",
00414 name_,
00415 (unsigned long) CurrentPass,
00416 (unsigned long) CurrentGroup,
00417 (char*) buf);
00418 return(0);
00419 }
00420 }
00421 else {
00422
00423 char buf[8 * sizeof(CW_PATTERN_t) + 1];
00424 CwPat.print(buf);
00425 tcl.evalf("%s recv group-full %lu %lu %s",
00426 name_,
00427 (unsigned long) CurrentPass,
00428 (unsigned long) CurrentGroup,
00429 (char*) buf);
00430 }
00431 return(0);
00432 }