Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

mftp_snd.cc

Go to the documentation of this file.
00001 /*
00002  * (c) 1997-98 StarBurst Communications Inc.
00003  *
00004  * THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND
00005  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00006  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00007  * ARE DISCLAIMED.  IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE
00008  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00009  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00010  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00011  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00012  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00013  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00014  * SUCH DAMAGE.
00015  *
00016  * Author: Christoph Haenle, chris@cs.vu.nl
00017  * File: mftp_snd.cc
00018  * Last change: Dec 07, 1998
00019  *
00020  * This software may freely be used only for non-commercial purposes
00021  *
00022  * $Header: /nfs/jade/vint/CVSROOT/ns-2/apps/mftp_snd.cc,v 1.9 2000/09/01 03:04:06 haoboy Exp $
00023  */
00024 
00025 
00026 // This file contains functionality specific to the MFTP sender.
00027 
00028 #include <stdlib.h>     // strtoul, etc.
00029 #include <assert.h>
00030 
00031 #include <stdio.h>
00032 
00033 #include "config.h"
00034 #include "tclcl.h"
00035 #include "agent.h"
00036 #include "packet.h"
00037 #include "ip.h"
00038 #include "mftp_snd.h"
00039 #include "trace.h"
00040 #include "bitops.h"       // due to IS_BITSET, etc.
00041 
00042 #define min(a, b)       ((a) < (b) ? (a) : (b))
00043 
00044 
00045 
00046 static class MFTPSndAgentClass : public TclClass {
00047 public:
00048     MFTPSndAgentClass() : TclClass("Agent/MFTP/Snd") {}
00049     TclObject* create(int, const char*const*) {
00050         return (new MFTPSndAgent());
00051     }
00052 } class_mftpsnd_agent;
00053 
00054 int hdr_mftp::offset_;
00055 static class MFTPHeaderClass : public PacketHeaderClass {
00056 public:
00057     MFTPHeaderClass() : PacketHeaderClass("PacketHeader/MFTP",
00058                                           sizeof(hdr_mftp)) {
00059             bind_offset(&hdr_mftp::offset_);
00060     }
00061 } class_mftphdr;
00062 
00063 
00064 MFTPSndAgent::MFTPSndAgent()
00065     : MFTPAgent(),
00066       naks(0),
00067       retx(0),
00068       fseek_offset(0),
00069       read_ahead_bufsize(0),
00070       CurrentPass(0),
00071       CurrentGroup(0),
00072       CwPat(0),
00073       MinGroupNbInBuf(0),
00074       NbGroupsInBuf(0)
00075 {
00076     bind("readAheadBufsize_", &readAheadBufsize_);
00077     bind_time("txStatusDelay_", &txStatusDelay_);
00078     bind("nakCount_", &nakCount_);
00079 }
00080 
00081 MFTPSndAgent::~MFTPSndAgent()
00082 {
00083     delete [] naks;  // NOTE: delete on NULL pointer has no effect
00084     delete [] retx;
00085 }
00086 
00087 int MFTPSndAgent::command(int argc, const char*const* argv)
00088 {
00089     Tcl& tcl = Tcl::instance();
00090 
00091     if(strcmp(argv[1], "send") == 0) {
00092         if(strcmp(argv[2], "data") == 0) {
00093             return send_data();
00094         } else if(strcmp(argv[2], "statreq") == 0) {
00095             unsigned long pass_nb, block_lo, block_hi;
00096             double rsp_backoff_window=2343.2343;
00097             int nb_scanned = 0;
00098             if(argc == 7) {
00099                 nb_scanned += sscanf(argv[3], "%lu", &pass_nb);
00100                 nb_scanned += sscanf(argv[4], "%lu", &block_lo);
00101                 nb_scanned += sscanf(argv[5], "%lu", &block_hi);
00102                 nb_scanned += sscanf(argv[6], "%lf", &rsp_backoff_window);
00103             }
00104             if(nb_scanned != 4) {
00105                 tcl.resultf("%s: wrong number of parameters for \"send statreq\"", name_);
00106                 return TCL_ERROR;
00107             }
00108             send_status_request(pass_nb, block_lo, block_hi, rsp_backoff_window); 
00109             return TCL_OK;
00110         }
00111     }
00112     if(strcmp(argv[1], "start") == 0) {
00113         if(MFTPAgent::init() == TCL_ERROR) {
00114             return TCL_ERROR;
00115         };
00116         init_user_file((unsigned long) readAheadBufsize_);
00117         return TCL_OK;
00118     }
00119     return Agent::command(argc, argv);
00120 }
00121 
00122 void MFTPSndAgent::recv(Packet* p, Handler* h)
00123 {
00124     hdr_ip* ih = hdr_ip::access(p);
00125     hdr_mftp* mh = hdr_mftp::access(p);
00126 
00127     if(ih->daddr() == 0) {
00128         assert(false);        // Packet from local agent.
00129     } else {
00130         switch(mh->type) {
00131         case hdr_mftp::PDU_DATA_TRANSFER:
00132         case hdr_mftp::PDU_STATUS_REQUEST:
00133             // as the sender is a member of the multicast group as well,
00134             // it receives all data it has sent. So just ignore it.
00135             break;
00136         case hdr_mftp::PDU_NAK:
00137             process_nak(mh->spec.nak, p->accessdata(), CurrentPass-1); // -1 because we have
00138             // incremented the pass-number already in send_data.
00139             break;
00140         default:
00141             assert(false);  // unknown packet type (also possible: just ignore packet rather than exit)
00142         }
00143         Packet::free(p);
00144     }
00145 }
00146 
00147 void MFTPSndAgent::send_status_request(unsigned long pass_nb,
00148                                        unsigned long block_lo,
00149                                        unsigned long block_hi,
00150                                        double rsp_backoff_window)
00151 {
00152     Packet* p = Agent::allocpkt();
00153     hdr_mftp* hdr = hdr_mftp::access(p);
00154 
00155     assert(FileDGrams > 0);                    // we need this requirement here
00156 
00157     // initialize the header of the status request packet:
00158     hdr->type = hdr_mftp::PDU_STATUS_REQUEST;
00159     hdr->spec.statReq.pass_nb  = pass_nb;
00160     hdr->spec.statReq.block_lo = block_lo;
00161     hdr->spec.statReq.block_hi = block_hi;
00162     hdr->spec.statReq.RspBackoffWindow = rsp_backoff_window;
00163 
00164     // transmit packet
00165     hdr_cmn* ch = hdr_cmn::access(p);
00166     ch->size() = sizeof(hdr_mftp);
00167     target_->recv(p);
00168 }
00169 
00170 
00171 // process incoming nak:
00172 void MFTPSndAgent::process_nak(hdr_mftp::Spec::Nak& nak,
00173                                   unsigned char* nak_bitmap,
00174                                   unsigned long currentPass)
00175 {
00176     assert(1 <= nak.nak_count && nak.nak_count <= nb_groups); // or else some receiver is fooling us.
00177     assert(nak.pass_nb <= currentPass);  // pass greater than requested? => a receiver is fooling us.
00178 
00179     Tcl& tcl = Tcl::instance();
00180     tcl.evalf("%s recv nak %lu %lu %lu", name_,
00181               (unsigned long) nak.pass_nb,
00182               (unsigned long) nak.block_nb,
00183               (unsigned long) nak.nak_count);
00184 
00185     assert(dtus_per_block % 8 == 0);         // This property is required for the following
00186 
00187     // start_group_nb corresponds to first bit of NAK-bitmap:
00188     const unsigned long start_group_nb = dtus_per_block * nak.block_nb;
00189 
00190     // end_group_nb corresponds to last group number of NAK-bitmap plus one
00191     const unsigned long end_group_nb = min(nb_groups, dtus_per_block * (nak.block_nb + 1));
00192 
00193     // get starting index into naks-array for this block
00194     const unsigned long nak_index = start_group_nb / 8;
00195 
00196     // number of status bytes in pdu
00197     const unsigned long nak_bytes = (end_group_nb - start_group_nb + 7) / 8;
00198 
00199     // pointer to location in array at which the received nak bitmap must be
00200     // or'd to the sender-bitmap (the bitmap in which the sender collects the naks)
00201     unsigned char* nak_array = naks + nak_index;
00202 
00203     // if this nak pdu is from a previous pass (i.e. a delayed nak), ignore the status
00204     // bits for dtu's that we've just retransmitted in the current pass:
00205     if(nak.pass_nb < currentPass) {
00206         unsigned char* retx_array = retx + nak_index;
00207         for(unsigned long i = 0; i < nak_bytes; i++) {
00208             if(*nak_bitmap) {
00209                 // "AND out" bits for already transmitted packets and
00210                 // "OR in" the result into newly constructed NAK bitmap
00211                 *nak_array |= (*nak_bitmap & (~*retx_array));
00212             }            
00213             nak_array++;
00214             retx_array++;
00215             nak_bitmap++;
00216         }
00217     }
00218     else {
00219         assert(nak.pass_nb == currentPass); // this nak belongs to the current pass
00220         for(unsigned long i = 0; i < nak_bytes; i++) {
00221             if(*nak_bitmap) {
00222                 // "OR in" NAK byte into newly constructed NAK bitmap
00223                 *nak_array |= *nak_bitmap;
00224             }
00225             nak_array++;
00226             nak_bitmap++;
00227         }
00228     }
00229     nakCount_++;    // increment total number of received nak-packets
00230 }
00231 
00232 
00233 
00234 void MFTPSndAgent::init_user_file(unsigned long readAheadBufsize)
00235 {
00236     read_ahead_bufsize = readAheadBufsize;
00237     fseek_offset = 0;
00238 
00239     // initialize codeword pattern
00240     iterator.setSourceWordLen(dtus_per_group);
00241     CwPat = iterator.getNextCwPat();
00242     // free arrays from a possible previous transmission (no effect on NULL pointers)
00243     delete [] naks;
00244     delete [] retx;
00245 
00246     // allocate naks bitmap init'd to all nak'd:
00247     naks = new unsigned char[(nb_groups + 7) / 8];
00248     assert(naks != NULL);           // or else we ran out of memory
00249     SET_ALL_BITS(naks, nb_groups);
00250 
00251     // allocate retransmission bitmap init'd to none retransmitted:
00252     retx = new unsigned char[(nb_groups + 7) / 8];
00253     assert(retx != NULL);           // or else we ran out of memory
00254     RESET_ALL_BITS(retx, nb_groups);
00255 
00256     CurrentPass = CurrentGroup = MinGroupNbInBuf = NbGroupsInBuf = 0;
00257 }
00258 
00259 
00260 
00261 
00262 // reads as many groups into the read-ahead-buffer as there is space, starting with
00263 // group CurrentGroup. The groups that were not not NACK'd by anyone will be
00264 // skipped (and the corresponding areas in the read-ahead-buffer will be skipped
00265 // as well).
00266 void MFTPSndAgent::fill_read_ahead_buf()
00267 {
00268     unsigned int dtu_pos;        // loops over [0..dtus_per_group)
00269     unsigned long seek_offset;   // where to position the head for disk seeks
00270     unsigned long buf_pos = 0;   // position where data is written (into main memory) when
00271                                  // read from disk, relative to the start of read_ahead_buf
00272     CW_PATTERN_t cw_pat_tmp = CwPat;
00273     unsigned long i;
00274     unsigned long len;
00275 
00276     // switch to next group that must be read:
00277     MinGroupNbInBuf = CurrentGroup;
00278     NbGroupsInBuf = min(read_ahead_bufsize / (bitcount(CwPat) * dtu_size),
00279                         nb_groups - MinGroupNbInBuf);
00280     while(cw_pat_tmp != 0) {
00281         dtu_pos = minbit(cw_pat_tmp);
00282         assert(0 <= dtu_pos && dtu_pos < dtus_per_group);
00283         assert(MinGroupNbInBuf + NbGroupsInBuf <= nb_groups);
00284 
00285         cw_pat_tmp &= ~((CW_PATTERN_t) 1 << dtu_pos);  // clear bit at position "dtu_pos"
00286 
00287         for(i = MinGroupNbInBuf;
00288             i < MinGroupNbInBuf + NbGroupsInBuf; ++i) {
00289 
00290             // continue with for-loop if group i was not NACKed by anyone
00291             if(IS_BIT_CLEARED(naks, i)) {
00292                 buf_pos += dtu_size;
00293                 continue;
00294             }
00295 
00296             // Note: there is never data accessed "outside" the file as the while-loop
00297             // is left as soon as the last (possibly partial) DTU has been read.
00298             seek_offset = (dtu_pos * nb_groups + i) * dtu_size;
00299             if(seek_offset >= FileSize) {
00300                 // we can get there if the last group(s) have fewer than
00301                 // dtus_per_group packets. If we get here, we are ready.
00302                 return; // OK
00303             }
00304 
00305             if (fseek_offset != seek_offset) {
00306                 // do the fseek here (omitted)
00307                 seekCount_++;
00308                 fseek_offset = seek_offset;
00309             }
00310 
00311             // determine number of bytes to read
00312             len = min(dtu_size, FileSize - fseek_offset);
00313 
00314             // read len bytes from file here (omitted)
00315             fseek_offset += len;
00316 
00317             buf_pos += len;
00318             if(len < dtu_size) {
00319                 // we get here if the last dtu is smaller than dtu_size and if
00320                 // we have just read that last dtu
00321 
00322                 assert(fseek_offset == FileSize); // we must be at EOF
00323 
00324                 // clear rest of read-ahead-buffer here (omitted)
00325                 buf_pos = bitcount(CwPat) * NbGroupsInBuf * dtu_size;
00326                 return;                  // that's it, no more packets to process
00327              }
00328             assert(len == dtu_size);
00329             assert(buf_pos <= bitcount(CwPat) * NbGroupsInBuf * dtu_size);
00330         } // for
00331     } // while
00332     // we get here only if no group was read with less than dtus_per_group packets and
00333     // the if not the last packet was read (in case it is too short)
00334     assert(buf_pos == bitcount(CwPat) * NbGroupsInBuf * dtu_size);
00335 }
00336 
00337 
00338 // send_data() sends next data packet.
00339 // In tcl's result buffer, return
00340 //    0, if current pass not yet finished
00341 //   -1, if reached the end of the current pass 
00342 int MFTPSndAgent::send_data()
00343 {
00344     Packet* p = Agent::allocpkt();
00345     hdr_mftp* hdr = hdr_mftp::access(p);
00346     CW_PATTERN_t mask;
00347     Tcl& tcl = Tcl::instance();
00348 
00349     assert(0 <= CurrentGroup && CurrentGroup < nb_groups);
00350     assert(NbGroupsInBuf >= 0);
00351     assert(0 <= MinGroupNbInBuf && MinGroupNbInBuf + NbGroupsInBuf <= nb_groups);
00352 
00353     // now comes NACK processing: loop until end of file or until
00354     // a nak bit is detected:
00355     while(CurrentGroup < nb_groups && IS_BIT_CLEARED(naks, CurrentGroup)) {
00356         CurrentGroup++;         // proceed to next bit of the nak bitmap
00357     }
00358 
00359     // do not transmit packet if
00360     // (1) CurrentGroup has reached the total number of groups ("end of pass") or
00361     // (2) CwPat has only bits set that refer to some packets that are cut off in
00362     //     the current group (for example, if CurrentGroup has only 5 packets
00363     //     with nb_groups=8 and if CwPat=64+32)
00364     if(CurrentGroup != nb_groups &&
00365        ((mask = (~(CW_PATTERN_t) 0) >> (8 * sizeof(CW_PATTERN_t) - get_dtus_per_group(CurrentGroup)))
00366         & CwPat) != 0) {
00367         assert(CurrentGroup < nb_groups);
00368 
00369         // see if the read-ahead-buffer is exhausted so that we must load new data
00370         // from file. Only groups with a corresponding NAK-bit are read, that is,
00371         // those that were requested for retransmission
00372         assert(MinGroupNbInBuf <= CurrentGroup);
00373         if(CurrentGroup >= MinGroupNbInBuf + NbGroupsInBuf) { // exhausted?
00374             fill_read_ahead_buf();               // load new data from file
00375         }
00376         assert(MinGroupNbInBuf <= CurrentGroup &&
00377                CurrentGroup < MinGroupNbInBuf + NbGroupsInBuf);
00378 
00379         // produce an encoded packet here (omitted)
00380 
00381         // generate the header
00382         hdr->type = hdr_mftp::PDU_DATA_TRANSFER;
00383         hdr->spec.data.pass_nb  = CurrentPass;
00384         hdr->spec.data.group_nb = CurrentGroup;
00385         hdr->spec.data.cw_pat   = CwPat & mask;
00386 
00387         char buf[8 * sizeof(CW_PATTERN_t) + 1];
00388         (CwPat & mask).print(buf);
00389         tcl.evalf("%s send notify %lu %lu %s",
00390                   name_,
00391                   (unsigned long) CurrentPass,
00392                   (unsigned long) CurrentGroup,
00393                   (char*) buf);
00394 
00395         hdr_cmn* ch = hdr_cmn::access(p);
00396         ch->size() = sizeof(hdr_mftp) + dtu_size;
00397 
00398         // transmit packet
00399         target_->recv(p);
00400 
00401         RESET_BIT(naks, CurrentGroup); // reset the dtu status bit in the nak bitmap
00402         SET_BIT(retx, CurrentGroup);   // set the dtus status bit in the retransmission bitmap
00403 
00404         CurrentGroup++;
00405     } // if
00406 
00407     // if last group of the file:
00408     if(CurrentGroup == nb_groups || !(CwPat & mask)) { // end of pass?
00409         do {
00410             CwPat = iterator.getNextCwPat(); // get next codeword for new pass
00411         } while(!(CwPat & ((~(CW_PATTERN_t) 0) >> (8 * sizeof(CW_PATTERN_t) - get_dtus_per_group(0)))));
00412         //        } while(!(CwPat & ((((CW_PATTERN_t) 1) << get_dtus_per_group(0)) - 1)));
00413 
00414         // prepare a new pas:
00415         MinGroupNbInBuf = 0;
00416         NbGroupsInBuf = 0;
00417         CurrentGroup = 0;
00418 
00419         // reset retransmission bitmap for dealing with of latent naks
00420         RESET_ALL_BITS(retx, nb_groups);
00421 
00422         // the first dtus_per_group passes must be transmitted "in full"
00423         if(CurrentPass < dtus_per_group - 1) {
00424             SET_ALL_BITS(naks, nb_groups);
00425         }
00426         tcl.evalf("%s pass-finished %lu %lu", name_,
00427                   (unsigned long) CurrentPass,
00428                   (unsigned long) nb_blocks());
00429         CurrentPass++;
00430         tcl.result("-1");  // return end-of-pass to the caller
00431 
00432         return TCL_OK;
00433     }
00434     tcl.result("0");       // end-of-pass not yet reached
00435     return TCL_OK;
00436 }
00437 
00438 

Generated on Tue Apr 20 12:14:24 2004 for NS2.26SourcesOriginal by doxygen 1.3.3