001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs;
020    
021    import java.io.*;
022    import java.util.Arrays;
023    
024    import org.apache.commons.logging.Log;
025    import org.apache.commons.logging.LogFactory;
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.permission.FsPermission;
030    import org.apache.hadoop.util.Progressable;
031    import org.apache.hadoop.util.PureJavaCrc32;
032    
033    /****************************************************************
034     * Abstract Checksumed FileSystem.
035     * It provide a basice implementation of a Checksumed FileSystem,
036     * which creates a checksum file for each raw file.
037     * It generates & verifies checksums at the client side.
038     *
039     *****************************************************************/
040    @InterfaceAudience.Public
041    @InterfaceStability.Stable
042    public abstract class ChecksumFileSystem extends FilterFileSystem {
043      private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
044      private int bytesPerChecksum = 512;
045      private boolean verifyChecksum = true;
046      private boolean writeChecksum = true;
047    
048      public static double getApproxChkSumLength(long size) {
049        return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
050      }
051      
052      public ChecksumFileSystem(FileSystem fs) {
053        super(fs);
054      }
055    
056      @Override
057      public void setConf(Configuration conf) {
058        super.setConf(conf);
059        if (conf != null) {
060          bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
061                                         LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
062        }
063      }
064      
065      /**
066       * Set whether to verify checksum.
067       */
068      @Override
069      public void setVerifyChecksum(boolean verifyChecksum) {
070        this.verifyChecksum = verifyChecksum;
071      }
072    
073      @Override
074      public void setWriteChecksum(boolean writeChecksum) {
075        this.writeChecksum = writeChecksum;
076      }
077      
078      /** get the raw file system */
079      @Override
080      public FileSystem getRawFileSystem() {
081        return fs;
082      }
083    
084      /** Return the name of the checksum file associated with a file.*/
085      public Path getChecksumFile(Path file) {
086        return new Path(file.getParent(), "." + file.getName() + ".crc");
087      }
088    
089      /** Return true iff file is a checksum file name.*/
090      public static boolean isChecksumFile(Path file) {
091        String name = file.getName();
092        return name.startsWith(".") && name.endsWith(".crc");
093      }
094    
095      /** Return the length of the checksum file given the size of the 
096       * actual file.
097       **/
098      public long getChecksumFileLength(Path file, long fileSize) {
099        return getChecksumLength(fileSize, getBytesPerSum());
100      }
101    
102      /** Return the bytes Per Checksum */
103      public int getBytesPerSum() {
104        return bytesPerChecksum;
105      }
106    
107      private int getSumBufferSize(int bytesPerSum, int bufferSize) {
108        int defaultBufferSize = getConf().getInt(
109                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
110                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
111        int proportionalBufferSize = bufferSize / bytesPerSum;
112        return Math.max(bytesPerSum,
113                        Math.max(proportionalBufferSize, defaultBufferSize));
114      }
115    
116      /*******************************************************
117       * For open()'s FSInputStream
118       * It verifies that data matches checksums.
119       *******************************************************/
120      private static class ChecksumFSInputChecker extends FSInputChecker {
121        public static final Log LOG 
122          = LogFactory.getLog(FSInputChecker.class);
123        
124        private ChecksumFileSystem fs;
125        private FSDataInputStream datas;
126        private FSDataInputStream sums;
127        
128        private static final int HEADER_LENGTH = 8;
129        
130        private int bytesPerSum = 1;
131        
132        public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
133          throws IOException {
134          this(fs, file, fs.getConf().getInt(
135                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 
136                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
137        }
138        
139        public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
140          throws IOException {
141          super( file, fs.getFileStatus(file).getReplication() );
142          this.datas = fs.getRawFileSystem().open(file, bufferSize);
143          this.fs = fs;
144          Path sumFile = fs.getChecksumFile(file);
145          try {
146            int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
147            sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
148    
149            byte[] version = new byte[CHECKSUM_VERSION.length];
150            sums.readFully(version);
151            if (!Arrays.equals(version, CHECKSUM_VERSION))
152              throw new IOException("Not a checksum file: "+sumFile);
153            this.bytesPerSum = sums.readInt();
154            set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
155          } catch (FileNotFoundException e) {         // quietly ignore
156            set(fs.verifyChecksum, null, 1, 0);
157          } catch (IOException e) {                   // loudly ignore
158            LOG.warn("Problem opening checksum file: "+ file + 
159                     ".  Ignoring exception: " , e); 
160            set(fs.verifyChecksum, null, 1, 0);
161          }
162        }
163        
164        private long getChecksumFilePos( long dataPos ) {
165          return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
166        }
167        
168        @Override
169        protected long getChunkPosition( long dataPos ) {
170          return dataPos/bytesPerSum*bytesPerSum;
171        }
172        
173        @Override
174        public int available() throws IOException {
175          return datas.available() + super.available();
176        }
177        
178        @Override
179        public int read(long position, byte[] b, int off, int len)
180          throws IOException {
181          // parameter check
182          if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
183            throw new IndexOutOfBoundsException();
184          } else if (len == 0) {
185            return 0;
186          }
187          if( position<0 ) {
188            throw new IllegalArgumentException(
189                "Parameter position can not to be negative");
190          }
191    
192          ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
193          checker.seek(position);
194          int nread = checker.read(b, off, len);
195          checker.close();
196          return nread;
197        }
198        
199        @Override
200        public void close() throws IOException {
201          datas.close();
202          if( sums != null ) {
203            sums.close();
204          }
205          set(fs.verifyChecksum, null, 1, 0);
206        }
207        
208    
209        @Override
210        public boolean seekToNewSource(long targetPos) throws IOException {
211          long sumsPos = getChecksumFilePos(targetPos);
212          fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
213          boolean newDataSource = datas.seekToNewSource(targetPos);
214          return sums.seekToNewSource(sumsPos) || newDataSource;
215        }
216    
217        @Override
218        protected int readChunk(long pos, byte[] buf, int offset, int len,
219            byte[] checksum) throws IOException {
220    
221          boolean eof = false;
222          if (needChecksum()) {
223            assert checksum != null; // we have a checksum buffer
224            assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
225            assert len >= bytesPerSum; // we must read at least one chunk
226    
227            final int checksumsToRead = Math.min(
228              len/bytesPerSum, // number of checksums based on len to read
229              checksum.length / CHECKSUM_SIZE); // size of checksum buffer
230            long checksumPos = getChecksumFilePos(pos); 
231            if(checksumPos != sums.getPos()) {
232              sums.seek(checksumPos);
233            }
234    
235            int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
236            if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
237              throw new ChecksumException(
238                "Checksum file not a length multiple of checksum size " +
239                "in " + file + " at " + pos + " checksumpos: " + checksumPos +
240                " sumLenread: " + sumLenRead,
241                pos);
242            }
243            if (sumLenRead <= 0) { // we're at the end of the file
244              eof = true;
245            } else {
246              // Adjust amount of data to read based on how many checksum chunks we read
247              len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
248            }
249          }
250          if(pos != datas.getPos()) {
251            datas.seek(pos);
252          }
253          int nread = readFully(datas, buf, offset, len);
254          if (eof && nread > 0) {
255            throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
256          }
257          return nread;
258        }
259      }
260      
261      private static class FSDataBoundedInputStream extends FSDataInputStream {
262        private FileSystem fs;
263        private Path file;
264        private long fileLen = -1L;
265    
266        FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in)
267            throws IOException {
268          super(in);
269          this.fs = fs;
270          this.file = file;
271        }
272        
273        @Override
274        public boolean markSupported() {
275          return false;
276        }
277        
278        /* Return the file length */
279        private long getFileLength() throws IOException {
280          if( fileLen==-1L ) {
281            fileLen = fs.getContentSummary(file).getLength();
282          }
283          return fileLen;
284        }
285        
286        /**
287         * Skips over and discards <code>n</code> bytes of data from the
288         * input stream.
289         *
290         *The <code>skip</code> method skips over some smaller number of bytes
291         * when reaching end of file before <code>n</code> bytes have been skipped.
292         * The actual number of bytes skipped is returned.  If <code>n</code> is
293         * negative, no bytes are skipped.
294         *
295         * @param      n   the number of bytes to be skipped.
296         * @return     the actual number of bytes skipped.
297         * @exception  IOException  if an I/O error occurs.
298         *             ChecksumException if the chunk to skip to is corrupted
299         */
300        @Override
301        public synchronized long skip(long n) throws IOException {
302          long curPos = getPos();
303          long fileLength = getFileLength();
304          if( n+curPos > fileLength ) {
305            n = fileLength - curPos;
306          }
307          return super.skip(n);
308        }
309        
310        /**
311         * Seek to the given position in the stream.
312         * The next read() will be from that position.
313         * 
314         * <p>This method does not allow seek past the end of the file.
315         * This produces IOException.
316         *
317         * @param      pos   the postion to seek to.
318         * @exception  IOException  if an I/O error occurs or seeks after EOF
319         *             ChecksumException if the chunk to seek to is corrupted
320         */
321    
322        @Override
323        public synchronized void seek(long pos) throws IOException {
324          if(pos>getFileLength()) {
325            throw new IOException("Cannot seek after EOF");
326          }
327          super.seek(pos);
328        }
329    
330      }
331    
332      /**
333       * Opens an FSDataInputStream at the indicated Path.
334       * @param f the file name to open
335       * @param bufferSize the size of the buffer to be used.
336       */
337      @Override
338      public FSDataInputStream open(Path f, int bufferSize) throws IOException {
339        FileSystem fs;
340        InputStream in;
341        if (verifyChecksum) {
342          fs = this;
343          in = new ChecksumFSInputChecker(this, f, bufferSize);
344        } else {
345          fs = getRawFileSystem();
346          in = fs.open(f, bufferSize);
347        }
348        return new FSDataBoundedInputStream(fs, f, in);
349      }
350    
351      @Override
352      public FSDataOutputStream append(Path f, int bufferSize,
353          Progressable progress) throws IOException {
354        throw new IOException("Not supported");
355      }
356    
357      /**
358       * Calculated the length of the checksum file in bytes.
359       * @param size the length of the data file in bytes
360       * @param bytesPerSum the number of bytes in a checksum block
361       * @return the number of bytes in the checksum file
362       */
363      public static long getChecksumLength(long size, int bytesPerSum) {
364        //the checksum length is equal to size passed divided by bytesPerSum +
365        //bytes written in the beginning of the checksum file.  
366        return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
367                 CHECKSUM_VERSION.length + 4;  
368      }
369    
370      /** This class provides an output stream for a checksummed file.
371       * It generates checksums for data. */
372      private static class ChecksumFSOutputSummer extends FSOutputSummer {
373        private FSDataOutputStream datas;    
374        private FSDataOutputStream sums;
375        private static final float CHKSUM_AS_FRACTION = 0.01f;
376        
377        public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
378                              Path file, 
379                              boolean overwrite, 
380                              short replication,
381                              long blockSize,
382                              Configuration conf)
383          throws IOException {
384          this(fs, file, overwrite, 
385               conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
386                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT),
387               replication, blockSize, null);
388        }
389        
390        public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
391                              Path file, 
392                              boolean overwrite,
393                              int bufferSize,
394                              short replication,
395                              long blockSize,
396                              Progressable progress)
397          throws IOException {
398          super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
399          int bytesPerSum = fs.getBytesPerSum();
400          this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 
401                                             replication, blockSize, progress);
402          int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
403          this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 
404                                                   sumBufferSize, replication,
405                                                   blockSize);
406          sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
407          sums.writeInt(bytesPerSum);
408        }
409        
410        @Override
411        public void close() throws IOException {
412          flushBuffer();
413          sums.close();
414          datas.close();
415        }
416        
417        @Override
418        protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
419        throws IOException {
420          datas.write(b, offset, len);
421          sums.write(checksum);
422        }
423      }
424    
425      @Override
426      public FSDataOutputStream create(Path f, FsPermission permission,
427          boolean overwrite, int bufferSize, short replication, long blockSize,
428          Progressable progress) throws IOException {
429        return create(f, permission, overwrite, true, bufferSize,
430            replication, blockSize, progress);
431      }
432    
433      private FSDataOutputStream create(Path f, FsPermission permission,
434          boolean overwrite, boolean createParent, int bufferSize,
435          short replication, long blockSize,
436          Progressable progress) throws IOException {
437        Path parent = f.getParent();
438        if (parent != null) {
439          if (!createParent && !exists(parent)) {
440            throw new FileNotFoundException("Parent directory doesn't exist: "
441                + parent);
442          } else if (!mkdirs(parent)) {
443            throw new IOException("Mkdirs failed to create " + parent);
444          }
445        }
446        final FSDataOutputStream out;
447        if (writeChecksum) {
448          out = new FSDataOutputStream(
449              new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
450                  blockSize, progress), null);
451        } else {
452          out = fs.create(f, permission, overwrite, bufferSize, replication,
453              blockSize, progress);
454          // remove the checksum file since we aren't writing one
455          Path checkFile = getChecksumFile(f);
456          if (fs.exists(checkFile)) {
457            fs.delete(checkFile, true);
458          }
459        }
460        if (permission != null) {
461          setPermission(f, permission);
462        }
463        return out;
464      }
465    
466      @Override
467      public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
468          boolean overwrite, int bufferSize, short replication, long blockSize,
469          Progressable progress) throws IOException {
470        return create(f, permission, overwrite, false, bufferSize, replication,
471            blockSize, progress);
472      }
473    
474      /**
475       * Set replication for an existing file.
476       * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
477       * @param src file name
478       * @param replication new replication
479       * @throws IOException
480       * @return true if successful;
481       *         false if file does not exist or is a directory
482       */
483      @Override
484      public boolean setReplication(Path src, short replication) throws IOException {
485        boolean value = fs.setReplication(src, replication);
486        if (!value)
487          return false;
488    
489        Path checkFile = getChecksumFile(src);
490        if (exists(checkFile))
491          fs.setReplication(checkFile, replication);
492    
493        return true;
494      }
495    
496      /**
497       * Rename files/dirs
498       */
499      @Override
500      public boolean rename(Path src, Path dst) throws IOException {
501        if (fs.isDirectory(src)) {
502          return fs.rename(src, dst);
503        } else {
504          if (fs.isDirectory(dst)) {
505            dst = new Path(dst, src.getName());
506          }
507    
508          boolean value = fs.rename(src, dst);
509          if (!value)
510            return false;
511    
512          Path srcCheckFile = getChecksumFile(src);
513          Path dstCheckFile = getChecksumFile(dst);
514          if (fs.exists(srcCheckFile)) { //try to rename checksum
515            value = fs.rename(srcCheckFile, dstCheckFile);
516          } else if (fs.exists(dstCheckFile)) {
517            // no src checksum, so remove dst checksum
518            value = fs.delete(dstCheckFile, true); 
519          }
520    
521          return value;
522        }
523      }
524    
525      /**
526       * Implement the delete(Path, boolean) in checksum
527       * file system.
528       */
529      @Override
530      public boolean delete(Path f, boolean recursive) throws IOException{
531        FileStatus fstatus = null;
532        try {
533          fstatus = fs.getFileStatus(f);
534        } catch(FileNotFoundException e) {
535          return false;
536        }
537        if (fstatus.isDirectory()) {
538          //this works since the crcs are in the same
539          //directories and the files. so we just delete
540          //everything in the underlying filesystem
541          return fs.delete(f, recursive);
542        } else {
543          Path checkFile = getChecksumFile(f);
544          if (fs.exists(checkFile)) {
545            fs.delete(checkFile, true);
546          }
547          return fs.delete(f, true);
548        }
549      }
550        
551      final private static PathFilter DEFAULT_FILTER = new PathFilter() {
552        @Override
553        public boolean accept(Path file) {
554          return !isChecksumFile(file);
555        }
556      };
557    
558      /**
559       * List the statuses of the files/directories in the given path if the path is
560       * a directory.
561       * 
562       * @param f
563       *          given path
564       * @return the statuses of the files/directories in the given patch
565       * @throws IOException
566       */
567      @Override
568      public FileStatus[] listStatus(Path f) throws IOException {
569        return fs.listStatus(f, DEFAULT_FILTER);
570      }
571      
572      /**
573       * List the statuses of the files/directories in the given path if the path is
574       * a directory.
575       * 
576       * @param f
577       *          given path
578       * @return the statuses of the files/directories in the given patch
579       * @throws IOException
580       */
581      @Override
582      public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
583      throws IOException {
584        return fs.listLocatedStatus(f, DEFAULT_FILTER);
585      }
586      
587      @Override
588      public boolean mkdirs(Path f) throws IOException {
589        return fs.mkdirs(f);
590      }
591    
592      @Override
593      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
594        throws IOException {
595        Configuration conf = getConf();
596        FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
597      }
598    
599      /**
600       * The src file is under FS, and the dst is on the local disk.
601       * Copy it from FS control to the local dst name.
602       */
603      @Override
604      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
605        throws IOException {
606        Configuration conf = getConf();
607        FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
608      }
609    
610      /**
611       * The src file is under FS, and the dst is on the local disk.
612       * Copy it from FS control to the local dst name.
613       * If src and dst are directories, the copyCrc parameter
614       * determines whether to copy CRC files.
615       */
616      public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
617        throws IOException {
618        if (!fs.isDirectory(src)) { // source is a file
619          fs.copyToLocalFile(src, dst);
620          FileSystem localFs = getLocal(getConf()).getRawFileSystem();
621          if (localFs.isDirectory(dst)) {
622            dst = new Path(dst, src.getName());
623          }
624          dst = getChecksumFile(dst);
625          if (localFs.exists(dst)) { //remove old local checksum file
626            localFs.delete(dst, true);
627          }
628          Path checksumFile = getChecksumFile(src);
629          if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
630            fs.copyToLocalFile(checksumFile, dst);
631          }
632        } else {
633          FileStatus[] srcs = listStatus(src);
634          for (FileStatus srcFile : srcs) {
635            copyToLocalFile(srcFile.getPath(), 
636                            new Path(dst, srcFile.getPath().getName()), copyCrc);
637          }
638        }
639      }
640    
641      @Override
642      public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
643        throws IOException {
644        return tmpLocalFile;
645      }
646    
647      @Override
648      public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
649        throws IOException {
650        moveFromLocalFile(tmpLocalFile, fsOutputFile);
651      }
652    
653      /**
654       * Report a checksum error to the file system.
655       * @param f the file name containing the error
656       * @param in the stream open on the file
657       * @param inPos the position of the beginning of the bad data in the file
658       * @param sums the stream open on the checksum file
659       * @param sumsPos the position of the beginning of the bad data in the checksum file
660       * @return if retry is neccessary
661       */
662      public boolean reportChecksumFailure(Path f, FSDataInputStream in,
663                                           long inPos, FSDataInputStream sums, long sumsPos) {
664        return false;
665      }
666    }