Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

mftp_rcv.cc

Go to the documentation of this file.
00001 /*
00002  * (c) 1997-98 StarBurst Communications Inc.
00003  *
00004  * THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND
00005  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00006  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00007  * ARE DISCLAIMED.  IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE
00008  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00009  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00010  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00011  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00012  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00013  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00014  * SUCH DAMAGE.
00015  *
00016  * Author: Christoph Haenle, chris@cs.vu.nl
00017  * File: mftp_rcv.cc
00018  * Last change: Dec 14, 1998
00019  *
00020  * This software may freely be used only for non-commercial purposes
00021  *
00022  * $Header: /nfs/jade/vint/CVSROOT/ns-2/apps/mftp_rcv.cc,v 1.8 2000/09/01 03:04:06 haoboy Exp $
00023  */
00024 
00025 // This file contains functionality specific to an MFTP receiver.
00026 
00027 
00028 #include <assert.h>
00029 
00030 #include "config.h"
00031 #include "tclcl.h"
00032 #include "mftp_rcv.h"
00033 
00034 #include "ip.h"            // due to declaration of hdr_ip
00035 
00036 #define min(a, b)       ((a) < (b) ? (a) : (b))
00037 
00038 static class MFTPRcvAgentClass : public TclClass {
00039 public:
00040         MFTPRcvAgentClass() : TclClass("Agent/MFTP/Rcv") {}
00041         TclObject* create(int, const char*const*) {
00042                 return (new MFTPRcvAgent());
00043         }
00044 } class_mftprcv_agent;
00045 
00046 
00047 MFTPRcvAgent::MFTPRcvAgent() 
00048     : MFTPAgent(),
00049       CurrentPass(0),
00050       CurrentGroup(0),
00051       CwPat(0),
00052       FileDGramsReceived(0),
00053       FseekOffset(0),
00054       cw_matrixline_buf(NULL)
00055 {
00056     bind("reply_addr_", (int*)&reply_.addr_);
00057     bind("reply_port_", (int*)&reply_.port_);
00058 }
00059 
00060 // inspect a Tcl command (overloaded function):
00061 int MFTPRcvAgent::command(int argc, const char*const* argv)
00062 {
00063     Tcl& tcl = Tcl::instance();
00064     if(strcmp(argv[1], "send") == 0) {
00065         if(strcmp(argv[2], "nak") == 0) {
00066             unsigned long pass_nb, block_nb;
00067             int nb_scanned = 0;
00068 
00069             nb_scanned += sscanf(argv[3], "%lu", &pass_nb);
00070             nb_scanned += sscanf(argv[4], "%lu", &block_nb);
00071             assert(nb_scanned == 2);
00072             send_nak(pass_nb, block_nb); 
00073             return TCL_OK;
00074         }
00075     } else if (strcmp(argv[1], "start") == 0) {
00076         if(dtusPerBlock_ % 8 != 0) {
00077             tcl.resultf("%s: dtusPerBlock_ must be a multiple of 8", name_);
00078             return TCL_ERROR;
00079         }
00080         init();
00081         return TCL_OK;
00082     }
00083     return Agent::command(argc, argv);
00084 }
00085 
00086 // process reception of a packet
00087 void MFTPRcvAgent::recv(Packet* p, Handler* h)
00088 {
00089     hdr_ip* ih = hdr_ip::access(p);
00090     hdr_mftp* mh = hdr_mftp::access(p);
00091 
00092     if(ih->daddr() == 0) {
00093         // packet from local agent
00094         fprintf(stderr, "%s: send not allowed with Agent/MFTP/Rcv\n", name_);
00095         assert(false);
00096     } else {
00097         switch(mh->type) {
00098         case hdr_mftp::PDU_DATA_TRANSFER:
00099             recv_data(mh->spec.data);
00100             break;
00101         case hdr_mftp::PDU_STATUS_REQUEST:
00102             recv_status_req(mh->spec.statReq);
00103             break;
00104         case hdr_mftp::PDU_NAK:
00105             // as we are a member of the group as well, we receive all data we have sent.
00106             break;
00107         default:
00108             assert(false); // received unknown packet type
00109         }
00110         Packet::free(p);
00111     }
00112 }
00113 
00114 // Destructor:
00115 MFTPRcvAgent::~MFTPRcvAgent()
00116 {
00117     // Note: delete on a NULL-pointer has no effect
00118     delete [] cw_matrixline_buf;
00119 }
00120 
00121 
00122 void MFTPRcvAgent::init()
00123 {
00124     MFTPAgent::init();
00125 
00126     // allocate cw_matrix_line_buf
00127     assert(cw_matrixline_buf == NULL);
00128     cw_matrixline_buf = new CW_MATRIXLINE_t[FileDGrams];
00129     assert(cw_matrixline_buf != NULL);  // or else no memory is left!
00130     // should return an error instead of terminating the program
00131 
00132     // reset array:
00133     cw_matrixlines_reset();
00134 }
00135 
00136 
00137 /* process a received status request packet */
00138 void MFTPRcvAgent::recv_status_req(hdr_mftp::Spec::StatReq& statreq)
00139 {
00140     Tcl& tcl = Tcl::instance();
00141 
00142     // read the PDU_STATUS_REQUEST-specific fields:
00143     tcl.evalf("%s recv status-req %lu %lu %lu %lf", name_,
00144               (unsigned long) statreq.pass_nb,
00145               (unsigned long) statreq.block_lo,
00146               (unsigned long) statreq.block_hi,
00147               (double) statreq.RspBackoffWindow);
00148 }
00149 
00150 // send a nak packet:
00151 void MFTPRcvAgent::send_nak(unsigned long pass_nb, unsigned long block_nb)
00152 {
00153     assert(FileDGrams > 0);
00154     assert(0 <= block_nb && block_nb < nb_blocks());
00155 
00156     Tcl& tcl = Tcl::instance();
00157 
00158     // start_group_nb corresponds to first bit of NAK-bitmap:
00159     unsigned long start_group_nb = dtus_per_block * block_nb;
00160 
00161     // end_group_nb corresponds to last group number of NAK-bitmap plus one
00162     unsigned long end_group_nb = min(nb_groups, dtus_per_block * (block_nb + 1));
00163 
00164     // number of valid bits in the outgoing nak-bitmap
00165     unsigned long n = end_group_nb - start_group_nb;
00166 
00167     // number of status bytes in pdu
00168     const unsigned long nak_bytes = (n+7) / 8;
00169 
00170     unsigned long bit_count = 0;
00171 
00172     // allocate (get) new packet and dynamically allocate extra space for nak-bitmap:
00173     Packet* p = Agent::allocpkt((n+7) / 8);
00174 
00175     unsigned char* nak_bitmap = (unsigned char*) p->accessdata();
00176 
00177     // clear NAK-bitmap first:
00178     memset(nak_bitmap, 0, nak_bytes);
00179     
00180     // loop over all groups in rangs of nak and set nak-bit for those that are not still full
00181     for(unsigned long group_nb = start_group_nb, bit = 1 << (start_group_nb % 8);
00182         group_nb < end_group_nb; ++group_nb) {
00183         if(is_group_full(group_nb) == false) {
00184             *nak_bitmap |= bit;
00185             bit_count++;
00186         }
00187         if(bit == 128) {
00188             bit = 1;
00189             nak_bitmap++;
00190         } else {
00191             bit <<= 1;
00192         }
00193     }
00194 
00195     if(bit_count > 0) {
00196         hdr_ip* iph = hdr_ip::access(p);
00197         hdr_mftp* hdr = hdr_mftp::access(p);
00198         hdr_cmn* ch = hdr_cmn::access(p);
00199 
00200         // now generate the header
00201         iph->dst() = reply_;    // overwrite settings from Agent::allocpkt()
00202         ch->size() = sizeof(hdr_mftp);
00203 
00204         hdr->type = hdr_mftp::PDU_NAK;
00205         hdr->spec.nak.pass_nb   = pass_nb;
00206         hdr->spec.nak.block_nb  = block_nb;
00207         hdr->spec.nak.nak_count = bit_count;
00208 
00209         // transmit packet
00210         target_->recv(p);
00211     }
00212     else {
00213         Packet::free(p);  // do not transmit NAK-packet if it consists of 0 NAK-bits !!
00214                           // HACK: @ requires optimation still!
00215     }
00216     tcl.resultf("%lu", bit_count);
00217 }
00218 
00219 // process_packet: decides if a received packet is "useful" and
00220 // stores meta-information for proper decoding
00221 int MFTPRcvAgent::process_packet(CW_PATTERN_t cw_pat,
00222                                  unsigned long group_nb, unsigned long dtu_nb)
00223 {
00224     CW_PATTERN_t bit;
00225     CW_MATRIXLINE_t new_row;
00226 
00227     unsigned long j;    // j iterates over the dtus of group "group_nb"
00228     unsigned long finish = get_dtus_per_group(group_nb);
00229                         // finish counts the number of dtus in group "group_nb"
00230 
00231     new_row.left = cw_pat;
00232 
00233     for(j = 0; j < finish; j++) {
00234         CW_PATTERN_t line_pat = cw_matrixline_buf[j * nb_groups + group_nb].left;
00235         if(line_pat != 0) {
00236             bit = new_row.left & ((CW_PATTERN_t) 1 << minbit(line_pat));
00237             if(bit != 0) {
00238                 new_row.left ^= line_pat;
00239             }
00240         }
00241     }
00242     if(new_row.left != 0) { // linear independent?
00243         bit = (CW_PATTERN_t) 1 << minbit(new_row.left);
00244         for(j = 0; j < finish; j++) {
00245             if((bit & cw_matrixline_buf[j * nb_groups + group_nb].left) != 0) {
00246                 cw_matrixline_buf[j * nb_groups + group_nb].left ^= new_row.left;
00247             }
00248         }
00249         // register pattern of codeword the received packet is composed of (possibly altered).
00250         // must be done at last for that this line gets not erased by XORing with itself
00251         // in the previous loop.
00252         cw_matrixline_buf[dtu_nb * nb_groups + group_nb].left = new_row.left;
00253         return 1; // packet was a "useful" packet (i.e. is linear independent
00254                   // from the other ones received so far)
00255     }
00256     else {
00257         return 0; //linear dependent codeword-pattern received, i.e. useless
00258     }
00259 }
00260 
00261 
00262 int MFTPRcvAgent::findStoreLocation(unsigned long group_nb, unsigned long seek_offset, unsigned long* dtu_nb)
00263 {
00264     unsigned long start_dtu_nb;
00265 
00266     assert(0 <= group_nb && group_nb < nb_groups);
00267     assert(seek_offset % dtu_size == 0 ||
00268            seek_offset == FileSize);
00269 
00270     if(seek_offset == FileSize) {
00271         *dtu_nb = group_nb;    // start over from the beginning
00272     } else {
00273         unsigned long curr_dtu_nb = FseekOffset / dtu_size;
00274 
00275         // pay attention to "unsigned" when substracting
00276         *dtu_nb = curr_dtu_nb - curr_dtu_nb % nb_groups;
00277         *dtu_nb += group_nb;
00278 
00279         // check if seeking backwards. If yes, increment dtu_nb by nb_groups to
00280         // always seeks forwards (unless end of file is reached):
00281         if(*dtu_nb < curr_dtu_nb) {
00282             *dtu_nb += nb_groups;
00283         }
00284     }
00285     if(*dtu_nb >= FileDGrams) {
00286         // this might happen if some groups have less packets than
00287         // dtus_per_group:
00288         *dtu_nb = group_nb;    // start over from the beginning
00289     }
00290     start_dtu_nb = *dtu_nb;
00291     assert(start_dtu_nb < FileDGrams);
00292 
00293     do {
00294         if(! cw_matrixline_buf[*dtu_nb].left) {
00295             return 1;
00296         }
00297         *dtu_nb += nb_groups;
00298         if(*dtu_nb >= FileDGrams) {
00299             *dtu_nb = group_nb;    // start over from the beginning
00300         }
00301     } while(*dtu_nb != start_dtu_nb);
00302     return 0;    // group "group_nb" is already full
00303 }
00304 
00305 
00306 // initializes all matrix-lines to zero, i.e. no (encoded) packets
00307 // at all are received so far:
00308 void MFTPRcvAgent::cw_matrixlines_reset()
00309 {
00310     assert(0 <= FileDGrams);
00311     memset(cw_matrixline_buf, 0, sizeof(CW_MATRIXLINE_t) * FileDGrams);
00312 }
00313 
00314 
00315 // returns true if group "group_nb" is full, false if there is at least one packet missing
00316 bool MFTPRcvAgent::is_group_full(unsigned long group_nb)
00317 {
00318     unsigned long nb_dtus = get_dtus_per_group(group_nb);
00319     unsigned long i;
00320 
00321     assert(0 <= group_nb && group_nb < nb_groups);
00322 
00323     for(i = 0; i < nb_dtus &&
00324             cw_matrixline_buf[i * nb_groups + group_nb].left != 0; ++i)
00325         ;
00326     return (i == nb_dtus) ? true : false; // if loop was left before nb_dtus was reached,
00327                                           // then there is some line in the matrix that is
00328                                           // all "0", i.e. a packet is still missing.
00329 }
00330 
00331 // recv_data: process received data packet;
00332 // takes (received) coded packets and processes them.
00333 int MFTPRcvAgent::recv_data(hdr_mftp::Spec::Data& data)
00334 {
00335     Tcl& tcl = Tcl::instance();
00336     unsigned long seek_offset;
00337     unsigned long dtu_nb; // position (in terms of datagram number) where incoming
00338                           // packet is stored in file
00339     
00340     // read the PDU_DATA_TRANSFER-specific fields:
00341     CurrentPass  = data.pass_nb;
00342     CurrentGroup = data.group_nb;
00343     CwPat = data.cw_pat;
00344 
00345     // validate fields:
00346     // (actually, assert should be replaced by just ignoring the packet in case
00347     // the parameters are invalid, as some corrupt packet might reach the receiver
00348     // in real world, i.e. this would not be a bug in the software!)
00349     assert(0 <= CurrentPass);
00350     assert(0 <= CurrentGroup && CurrentGroup < nb_groups);
00351 
00352 
00353     if(findStoreLocation(CurrentGroup, FseekOffset, &dtu_nb)) {
00354         // arriving packet belongs to a not already full group:
00355         assert(dtu_nb % nb_groups == CurrentGroup);
00356         assert(0 <= dtu_nb && dtu_nb < FileDGrams);
00357 
00358         if(process_packet(CwPat,
00359                           CurrentGroup,
00360                           dtu_nb / nb_groups)) {
00361             cw_matrixline_buf[dtu_nb].right = CwPat;
00362             // arriving packet is useful (i.e. linearly independent from the others
00363             // of the group, thus store packet on disk:
00364 
00365             char buf[8 * sizeof(CW_PATTERN_t) + 1];
00366             CwPat.print(buf);
00367             tcl.evalf("%s recv useful %lu %lu %s",
00368                       name_,
00369                       (unsigned long) CurrentPass,
00370                       (unsigned long) CurrentGroup,
00371                       (char*) buf);
00372 
00373                 
00374             seek_offset = dtu_nb * dtu_size;
00375             if(dtu_nb == FileDGrams - 1) {
00376                 // the last dtu of the file might not fit into the file, as with
00377                 // erasure correction, the dtu size must always be the same,
00378                 // i.e. dtu_size. So we don't write the packet on disk.
00379                 // Rather, we store it in a special place in main memory.
00380                 // (ommitted)
00381             } else {
00382                 // prepare to write the new packet to the file system:
00383                 if(FseekOffset != seek_offset) {
00384                     // seek to file-position seek_offset (omitted)
00385                     FseekOffset = seek_offset;
00386                     seekCount_++;
00387                 }
00388                 // write data to file here (omitted)
00389                 FseekOffset += dtu_size;
00390             } // else
00391             // increment number of good dtus received
00392             FileDGramsReceived++;
00393             
00394             // if all packets have been received, decode the file and send a done-message
00395             if (FileDGramsReceived == FileDGrams) {
00396                 // decode file here. Involves the file, the cw_matrixline_buf-array and
00397                 // the last packet (cached in memory). Additional disk activity for the
00398                 // receivers will be required.
00399                 // (omitted)
00400                 char buf[8 * sizeof(CW_PATTERN_t) + 1];
00401                 CwPat.print(buf);
00402                 tcl.evalf("%s done-notify %lu %lu %s",
00403                           name_,
00404                           (unsigned long) CurrentPass,
00405                           (unsigned long) CurrentGroup,
00406                           (char*) buf);
00407                 return(0);   // we are ready!
00408             }
00409         } // if(process_packet...)
00410         else {
00411             char buf[8 * sizeof(CW_PATTERN_t) + 1];
00412             CwPat.print(buf);
00413             tcl.evalf("%s recv dependent %lu %lu %s",
00414                       name_,
00415                       (unsigned long) CurrentPass,
00416                       (unsigned long) CurrentGroup,
00417                       (char*) buf);
00418             return(0);   // we are ready!
00419         }                
00420     } // if(findStoreLocation...)
00421     else {
00422         // we received a packet that belongs to an already full group
00423         char buf[8 * sizeof(CW_PATTERN_t) + 1];
00424         CwPat.print(buf);
00425         tcl.evalf("%s recv group-full %lu %lu %s",
00426                   name_,
00427                   (unsigned long) CurrentPass,
00428                   (unsigned long) CurrentGroup,
00429                   (char*) buf);
00430     }
00431     return(0);
00432 }

Generated on Tue Apr 20 12:14:24 2004 for NS2.26SourcesOriginal by doxygen 1.3.3