00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include <stdlib.h>
00029 #include <assert.h>
00030
00031 #include <stdio.h>
00032
00033 #include "config.h"
00034 #include "tclcl.h"
00035 #include "agent.h"
00036 #include "packet.h"
00037 #include "ip.h"
00038 #include "mftp_snd.h"
00039 #include "trace.h"
00040 #include "bitops.h"
00041
00042 #define min(a, b) ((a) < (b) ? (a) : (b))
00043
00044
00045
00046 static class MFTPSndAgentClass : public TclClass {
00047 public:
00048 MFTPSndAgentClass() : TclClass("Agent/MFTP/Snd") {}
00049 TclObject* create(int, const char*const*) {
00050 return (new MFTPSndAgent());
00051 }
00052 } class_mftpsnd_agent;
00053
00054 int hdr_mftp::offset_;
00055 static class MFTPHeaderClass : public PacketHeaderClass {
00056 public:
00057 MFTPHeaderClass() : PacketHeaderClass("PacketHeader/MFTP",
00058 sizeof(hdr_mftp)) {
00059 bind_offset(&hdr_mftp::offset_);
00060 }
00061 } class_mftphdr;
00062
00063
00064 MFTPSndAgent::MFTPSndAgent()
00065 : MFTPAgent(),
00066 naks(0),
00067 retx(0),
00068 fseek_offset(0),
00069 read_ahead_bufsize(0),
00070 CurrentPass(0),
00071 CurrentGroup(0),
00072 CwPat(0),
00073 MinGroupNbInBuf(0),
00074 NbGroupsInBuf(0)
00075 {
00076 bind("readAheadBufsize_", &readAheadBufsize_);
00077 bind_time("txStatusDelay_", &txStatusDelay_);
00078 bind("nakCount_", &nakCount_);
00079 }
00080
00081 MFTPSndAgent::~MFTPSndAgent()
00082 {
00083 delete [] naks;
00084 delete [] retx;
00085 }
00086
00087 int MFTPSndAgent::command(int argc, const char*const* argv)
00088 {
00089 Tcl& tcl = Tcl::instance();
00090
00091 if(strcmp(argv[1], "send") == 0) {
00092 if(strcmp(argv[2], "data") == 0) {
00093 return send_data();
00094 } else if(strcmp(argv[2], "statreq") == 0) {
00095 unsigned long pass_nb, block_lo, block_hi;
00096 double rsp_backoff_window=2343.2343;
00097 int nb_scanned = 0;
00098 if(argc == 7) {
00099 nb_scanned += sscanf(argv[3], "%lu", &pass_nb);
00100 nb_scanned += sscanf(argv[4], "%lu", &block_lo);
00101 nb_scanned += sscanf(argv[5], "%lu", &block_hi);
00102 nb_scanned += sscanf(argv[6], "%lf", &rsp_backoff_window);
00103 }
00104 if(nb_scanned != 4) {
00105 tcl.resultf("%s: wrong number of parameters for \"send statreq\"", name_);
00106 return TCL_ERROR;
00107 }
00108 send_status_request(pass_nb, block_lo, block_hi, rsp_backoff_window);
00109 return TCL_OK;
00110 }
00111 }
00112 if(strcmp(argv[1], "start") == 0) {
00113 if(MFTPAgent::init() == TCL_ERROR) {
00114 return TCL_ERROR;
00115 };
00116 init_user_file((unsigned long) readAheadBufsize_);
00117 return TCL_OK;
00118 }
00119 return Agent::command(argc, argv);
00120 }
00121
00122 void MFTPSndAgent::recv(Packet* p, Handler* h)
00123 {
00124 hdr_ip* ih = hdr_ip::access(p);
00125 hdr_mftp* mh = hdr_mftp::access(p);
00126
00127 if(ih->daddr() == 0) {
00128 assert(false);
00129 } else {
00130 switch(mh->type) {
00131 case hdr_mftp::PDU_DATA_TRANSFER:
00132 case hdr_mftp::PDU_STATUS_REQUEST:
00133
00134
00135 break;
00136 case hdr_mftp::PDU_NAK:
00137 process_nak(mh->spec.nak, p->accessdata(), CurrentPass-1);
00138
00139 break;
00140 default:
00141 assert(false);
00142 }
00143 Packet::free(p);
00144 }
00145 }
00146
00147 void MFTPSndAgent::send_status_request(unsigned long pass_nb,
00148 unsigned long block_lo,
00149 unsigned long block_hi,
00150 double rsp_backoff_window)
00151 {
00152 Packet* p = Agent::allocpkt();
00153 hdr_mftp* hdr = hdr_mftp::access(p);
00154
00155 assert(FileDGrams > 0);
00156
00157
00158 hdr->type = hdr_mftp::PDU_STATUS_REQUEST;
00159 hdr->spec.statReq.pass_nb = pass_nb;
00160 hdr->spec.statReq.block_lo = block_lo;
00161 hdr->spec.statReq.block_hi = block_hi;
00162 hdr->spec.statReq.RspBackoffWindow = rsp_backoff_window;
00163
00164
00165 hdr_cmn* ch = hdr_cmn::access(p);
00166 ch->size() = sizeof(hdr_mftp);
00167 target_->recv(p);
00168 }
00169
00170
00171
00172 void MFTPSndAgent::process_nak(hdr_mftp::Spec::Nak& nak,
00173 unsigned char* nak_bitmap,
00174 unsigned long currentPass)
00175 {
00176 assert(1 <= nak.nak_count && nak.nak_count <= nb_groups);
00177 assert(nak.pass_nb <= currentPass);
00178
00179 Tcl& tcl = Tcl::instance();
00180 tcl.evalf("%s recv nak %lu %lu %lu", name_,
00181 (unsigned long) nak.pass_nb,
00182 (unsigned long) nak.block_nb,
00183 (unsigned long) nak.nak_count);
00184
00185 assert(dtus_per_block % 8 == 0);
00186
00187
00188 const unsigned long start_group_nb = dtus_per_block * nak.block_nb;
00189
00190
00191 const unsigned long end_group_nb = min(nb_groups, dtus_per_block * (nak.block_nb + 1));
00192
00193
00194 const unsigned long nak_index = start_group_nb / 8;
00195
00196
00197 const unsigned long nak_bytes = (end_group_nb - start_group_nb + 7) / 8;
00198
00199
00200
00201 unsigned char* nak_array = naks + nak_index;
00202
00203
00204
00205 if(nak.pass_nb < currentPass) {
00206 unsigned char* retx_array = retx + nak_index;
00207 for(unsigned long i = 0; i < nak_bytes; i++) {
00208 if(*nak_bitmap) {
00209
00210
00211 *nak_array |= (*nak_bitmap & (~*retx_array));
00212 }
00213 nak_array++;
00214 retx_array++;
00215 nak_bitmap++;
00216 }
00217 }
00218 else {
00219 assert(nak.pass_nb == currentPass);
00220 for(unsigned long i = 0; i < nak_bytes; i++) {
00221 if(*nak_bitmap) {
00222
00223 *nak_array |= *nak_bitmap;
00224 }
00225 nak_array++;
00226 nak_bitmap++;
00227 }
00228 }
00229 nakCount_++;
00230 }
00231
00232
00233
00234 void MFTPSndAgent::init_user_file(unsigned long readAheadBufsize)
00235 {
00236 read_ahead_bufsize = readAheadBufsize;
00237 fseek_offset = 0;
00238
00239
00240 iterator.setSourceWordLen(dtus_per_group);
00241 CwPat = iterator.getNextCwPat();
00242
00243 delete [] naks;
00244 delete [] retx;
00245
00246
00247 naks = new unsigned char[(nb_groups + 7) / 8];
00248 assert(naks != NULL);
00249 SET_ALL_BITS(naks, nb_groups);
00250
00251
00252 retx = new unsigned char[(nb_groups + 7) / 8];
00253 assert(retx != NULL);
00254 RESET_ALL_BITS(retx, nb_groups);
00255
00256 CurrentPass = CurrentGroup = MinGroupNbInBuf = NbGroupsInBuf = 0;
00257 }
00258
00259
00260
00261
00262
00263
00264
00265
00266 void MFTPSndAgent::fill_read_ahead_buf()
00267 {
00268 unsigned int dtu_pos;
00269 unsigned long seek_offset;
00270 unsigned long buf_pos = 0;
00271
00272 CW_PATTERN_t cw_pat_tmp = CwPat;
00273 unsigned long i;
00274 unsigned long len;
00275
00276
00277 MinGroupNbInBuf = CurrentGroup;
00278 NbGroupsInBuf = min(read_ahead_bufsize / (bitcount(CwPat) * dtu_size),
00279 nb_groups - MinGroupNbInBuf);
00280 while(cw_pat_tmp != 0) {
00281 dtu_pos = minbit(cw_pat_tmp);
00282 assert(0 <= dtu_pos && dtu_pos < dtus_per_group);
00283 assert(MinGroupNbInBuf + NbGroupsInBuf <= nb_groups);
00284
00285 cw_pat_tmp &= ~((CW_PATTERN_t) 1 << dtu_pos);
00286
00287 for(i = MinGroupNbInBuf;
00288 i < MinGroupNbInBuf + NbGroupsInBuf; ++i) {
00289
00290
00291 if(IS_BIT_CLEARED(naks, i)) {
00292 buf_pos += dtu_size;
00293 continue;
00294 }
00295
00296
00297
00298 seek_offset = (dtu_pos * nb_groups + i) * dtu_size;
00299 if(seek_offset >= FileSize) {
00300
00301
00302 return;
00303 }
00304
00305 if (fseek_offset != seek_offset) {
00306
00307 seekCount_++;
00308 fseek_offset = seek_offset;
00309 }
00310
00311
00312 len = min(dtu_size, FileSize - fseek_offset);
00313
00314
00315 fseek_offset += len;
00316
00317 buf_pos += len;
00318 if(len < dtu_size) {
00319
00320
00321
00322 assert(fseek_offset == FileSize);
00323
00324
00325 buf_pos = bitcount(CwPat) * NbGroupsInBuf * dtu_size;
00326 return;
00327 }
00328 assert(len == dtu_size);
00329 assert(buf_pos <= bitcount(CwPat) * NbGroupsInBuf * dtu_size);
00330 }
00331 }
00332
00333
00334 assert(buf_pos == bitcount(CwPat) * NbGroupsInBuf * dtu_size);
00335 }
00336
00337
00338
00339
00340
00341
00342 int MFTPSndAgent::send_data()
00343 {
00344 Packet* p = Agent::allocpkt();
00345 hdr_mftp* hdr = hdr_mftp::access(p);
00346 CW_PATTERN_t mask;
00347 Tcl& tcl = Tcl::instance();
00348
00349 assert(0 <= CurrentGroup && CurrentGroup < nb_groups);
00350 assert(NbGroupsInBuf >= 0);
00351 assert(0 <= MinGroupNbInBuf && MinGroupNbInBuf + NbGroupsInBuf <= nb_groups);
00352
00353
00354
00355 while(CurrentGroup < nb_groups && IS_BIT_CLEARED(naks, CurrentGroup)) {
00356 CurrentGroup++;
00357 }
00358
00359
00360
00361
00362
00363
00364 if(CurrentGroup != nb_groups &&
00365 ((mask = (~(CW_PATTERN_t) 0) >> (8 * sizeof(CW_PATTERN_t) - get_dtus_per_group(CurrentGroup)))
00366 & CwPat) != 0) {
00367 assert(CurrentGroup < nb_groups);
00368
00369
00370
00371
00372 assert(MinGroupNbInBuf <= CurrentGroup);
00373 if(CurrentGroup >= MinGroupNbInBuf + NbGroupsInBuf) {
00374 fill_read_ahead_buf();
00375 }
00376 assert(MinGroupNbInBuf <= CurrentGroup &&
00377 CurrentGroup < MinGroupNbInBuf + NbGroupsInBuf);
00378
00379
00380
00381
00382 hdr->type = hdr_mftp::PDU_DATA_TRANSFER;
00383 hdr->spec.data.pass_nb = CurrentPass;
00384 hdr->spec.data.group_nb = CurrentGroup;
00385 hdr->spec.data.cw_pat = CwPat & mask;
00386
00387 char buf[8 * sizeof(CW_PATTERN_t) + 1];
00388 (CwPat & mask).print(buf);
00389 tcl.evalf("%s send notify %lu %lu %s",
00390 name_,
00391 (unsigned long) CurrentPass,
00392 (unsigned long) CurrentGroup,
00393 (char*) buf);
00394
00395 hdr_cmn* ch = hdr_cmn::access(p);
00396 ch->size() = sizeof(hdr_mftp) + dtu_size;
00397
00398
00399 target_->recv(p);
00400
00401 RESET_BIT(naks, CurrentGroup);
00402 SET_BIT(retx, CurrentGroup);
00403
00404 CurrentGroup++;
00405 }
00406
00407
00408 if(CurrentGroup == nb_groups || !(CwPat & mask)) {
00409 do {
00410 CwPat = iterator.getNextCwPat();
00411 } while(!(CwPat & ((~(CW_PATTERN_t) 0) >> (8 * sizeof(CW_PATTERN_t) - get_dtus_per_group(0)))));
00412
00413
00414
00415 MinGroupNbInBuf = 0;
00416 NbGroupsInBuf = 0;
00417 CurrentGroup = 0;
00418
00419
00420 RESET_ALL_BITS(retx, nb_groups);
00421
00422
00423 if(CurrentPass < dtus_per_group - 1) {
00424 SET_ALL_BITS(naks, nb_groups);
00425 }
00426 tcl.evalf("%s pass-finished %lu %lu", name_,
00427 (unsigned long) CurrentPass,
00428 (unsigned long) nb_blocks());
00429 CurrentPass++;
00430 tcl.result("-1");
00431
00432 return TCL_OK;
00433 }
00434 tcl.result("0");
00435 return TCL_OK;
00436 }
00437
00438