001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.fs; 020 021 import java.io.*; 022 import java.util.Arrays; 023 024 import org.apache.commons.logging.Log; 025 import org.apache.commons.logging.LogFactory; 026 import org.apache.hadoop.classification.InterfaceAudience; 027 import org.apache.hadoop.classification.InterfaceStability; 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.permission.FsPermission; 030 import org.apache.hadoop.util.Progressable; 031 import org.apache.hadoop.util.PureJavaCrc32; 032 033 /**************************************************************** 034 * Abstract Checksumed FileSystem. 035 * It provide a basice implementation of a Checksumed FileSystem, 036 * which creates a checksum file for each raw file. 037 * It generates & verifies checksums at the client side. 038 * 039 *****************************************************************/ 040 @InterfaceAudience.Public 041 @InterfaceStability.Stable 042 public abstract class ChecksumFileSystem extends FilterFileSystem { 043 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; 044 private int bytesPerChecksum = 512; 045 private boolean verifyChecksum = true; 046 private boolean writeChecksum = true; 047 048 public static double getApproxChkSumLength(long size) { 049 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; 050 } 051 052 public ChecksumFileSystem(FileSystem fs) { 053 super(fs); 054 } 055 056 @Override 057 public void setConf(Configuration conf) { 058 super.setConf(conf); 059 if (conf != null) { 060 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY, 061 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT); 062 } 063 } 064 065 /** 066 * Set whether to verify checksum. 067 */ 068 @Override 069 public void setVerifyChecksum(boolean verifyChecksum) { 070 this.verifyChecksum = verifyChecksum; 071 } 072 073 @Override 074 public void setWriteChecksum(boolean writeChecksum) { 075 this.writeChecksum = writeChecksum; 076 } 077 078 /** get the raw file system */ 079 @Override 080 public FileSystem getRawFileSystem() { 081 return fs; 082 } 083 084 /** Return the name of the checksum file associated with a file.*/ 085 public Path getChecksumFile(Path file) { 086 return new Path(file.getParent(), "." + file.getName() + ".crc"); 087 } 088 089 /** Return true iff file is a checksum file name.*/ 090 public static boolean isChecksumFile(Path file) { 091 String name = file.getName(); 092 return name.startsWith(".") && name.endsWith(".crc"); 093 } 094 095 /** Return the length of the checksum file given the size of the 096 * actual file. 097 **/ 098 public long getChecksumFileLength(Path file, long fileSize) { 099 return getChecksumLength(fileSize, getBytesPerSum()); 100 } 101 102 /** Return the bytes Per Checksum */ 103 public int getBytesPerSum() { 104 return bytesPerChecksum; 105 } 106 107 private int getSumBufferSize(int bytesPerSum, int bufferSize) { 108 int defaultBufferSize = getConf().getInt( 109 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 110 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT); 111 int proportionalBufferSize = bufferSize / bytesPerSum; 112 return Math.max(bytesPerSum, 113 Math.max(proportionalBufferSize, defaultBufferSize)); 114 } 115 116 /******************************************************* 117 * For open()'s FSInputStream 118 * It verifies that data matches checksums. 119 *******************************************************/ 120 private static class ChecksumFSInputChecker extends FSInputChecker { 121 public static final Log LOG 122 = LogFactory.getLog(FSInputChecker.class); 123 124 private ChecksumFileSystem fs; 125 private FSDataInputStream datas; 126 private FSDataInputStream sums; 127 128 private static final int HEADER_LENGTH = 8; 129 130 private int bytesPerSum = 1; 131 132 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) 133 throws IOException { 134 this(fs, file, fs.getConf().getInt( 135 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 136 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); 137 } 138 139 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) 140 throws IOException { 141 super( file, fs.getFileStatus(file).getReplication() ); 142 this.datas = fs.getRawFileSystem().open(file, bufferSize); 143 this.fs = fs; 144 Path sumFile = fs.getChecksumFile(file); 145 try { 146 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize); 147 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize); 148 149 byte[] version = new byte[CHECKSUM_VERSION.length]; 150 sums.readFully(version); 151 if (!Arrays.equals(version, CHECKSUM_VERSION)) 152 throw new IOException("Not a checksum file: "+sumFile); 153 this.bytesPerSum = sums.readInt(); 154 set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4); 155 } catch (FileNotFoundException e) { // quietly ignore 156 set(fs.verifyChecksum, null, 1, 0); 157 } catch (IOException e) { // loudly ignore 158 LOG.warn("Problem opening checksum file: "+ file + 159 ". Ignoring exception: " , e); 160 set(fs.verifyChecksum, null, 1, 0); 161 } 162 } 163 164 private long getChecksumFilePos( long dataPos ) { 165 return HEADER_LENGTH + 4*(dataPos/bytesPerSum); 166 } 167 168 @Override 169 protected long getChunkPosition( long dataPos ) { 170 return dataPos/bytesPerSum*bytesPerSum; 171 } 172 173 @Override 174 public int available() throws IOException { 175 return datas.available() + super.available(); 176 } 177 178 @Override 179 public int read(long position, byte[] b, int off, int len) 180 throws IOException { 181 // parameter check 182 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 183 throw new IndexOutOfBoundsException(); 184 } else if (len == 0) { 185 return 0; 186 } 187 if( position<0 ) { 188 throw new IllegalArgumentException( 189 "Parameter position can not to be negative"); 190 } 191 192 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file); 193 checker.seek(position); 194 int nread = checker.read(b, off, len); 195 checker.close(); 196 return nread; 197 } 198 199 @Override 200 public void close() throws IOException { 201 datas.close(); 202 if( sums != null ) { 203 sums.close(); 204 } 205 set(fs.verifyChecksum, null, 1, 0); 206 } 207 208 209 @Override 210 public boolean seekToNewSource(long targetPos) throws IOException { 211 long sumsPos = getChecksumFilePos(targetPos); 212 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos); 213 boolean newDataSource = datas.seekToNewSource(targetPos); 214 return sums.seekToNewSource(sumsPos) || newDataSource; 215 } 216 217 @Override 218 protected int readChunk(long pos, byte[] buf, int offset, int len, 219 byte[] checksum) throws IOException { 220 221 boolean eof = false; 222 if (needChecksum()) { 223 assert checksum != null; // we have a checksum buffer 224 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length 225 assert len >= bytesPerSum; // we must read at least one chunk 226 227 final int checksumsToRead = Math.min( 228 len/bytesPerSum, // number of checksums based on len to read 229 checksum.length / CHECKSUM_SIZE); // size of checksum buffer 230 long checksumPos = getChecksumFilePos(pos); 231 if(checksumPos != sums.getPos()) { 232 sums.seek(checksumPos); 233 } 234 235 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead); 236 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) { 237 throw new ChecksumException( 238 "Checksum file not a length multiple of checksum size " + 239 "in " + file + " at " + pos + " checksumpos: " + checksumPos + 240 " sumLenread: " + sumLenRead, 241 pos); 242 } 243 if (sumLenRead <= 0) { // we're at the end of the file 244 eof = true; 245 } else { 246 // Adjust amount of data to read based on how many checksum chunks we read 247 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE)); 248 } 249 } 250 if(pos != datas.getPos()) { 251 datas.seek(pos); 252 } 253 int nread = readFully(datas, buf, offset, len); 254 if (eof && nread > 0) { 255 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos); 256 } 257 return nread; 258 } 259 } 260 261 private static class FSDataBoundedInputStream extends FSDataInputStream { 262 private FileSystem fs; 263 private Path file; 264 private long fileLen = -1L; 265 266 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) 267 throws IOException { 268 super(in); 269 this.fs = fs; 270 this.file = file; 271 } 272 273 @Override 274 public boolean markSupported() { 275 return false; 276 } 277 278 /* Return the file length */ 279 private long getFileLength() throws IOException { 280 if( fileLen==-1L ) { 281 fileLen = fs.getContentSummary(file).getLength(); 282 } 283 return fileLen; 284 } 285 286 /** 287 * Skips over and discards <code>n</code> bytes of data from the 288 * input stream. 289 * 290 *The <code>skip</code> method skips over some smaller number of bytes 291 * when reaching end of file before <code>n</code> bytes have been skipped. 292 * The actual number of bytes skipped is returned. If <code>n</code> is 293 * negative, no bytes are skipped. 294 * 295 * @param n the number of bytes to be skipped. 296 * @return the actual number of bytes skipped. 297 * @exception IOException if an I/O error occurs. 298 * ChecksumException if the chunk to skip to is corrupted 299 */ 300 @Override 301 public synchronized long skip(long n) throws IOException { 302 long curPos = getPos(); 303 long fileLength = getFileLength(); 304 if( n+curPos > fileLength ) { 305 n = fileLength - curPos; 306 } 307 return super.skip(n); 308 } 309 310 /** 311 * Seek to the given position in the stream. 312 * The next read() will be from that position. 313 * 314 * <p>This method does not allow seek past the end of the file. 315 * This produces IOException. 316 * 317 * @param pos the postion to seek to. 318 * @exception IOException if an I/O error occurs or seeks after EOF 319 * ChecksumException if the chunk to seek to is corrupted 320 */ 321 322 @Override 323 public synchronized void seek(long pos) throws IOException { 324 if(pos>getFileLength()) { 325 throw new IOException("Cannot seek after EOF"); 326 } 327 super.seek(pos); 328 } 329 330 } 331 332 /** 333 * Opens an FSDataInputStream at the indicated Path. 334 * @param f the file name to open 335 * @param bufferSize the size of the buffer to be used. 336 */ 337 @Override 338 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 339 FileSystem fs; 340 InputStream in; 341 if (verifyChecksum) { 342 fs = this; 343 in = new ChecksumFSInputChecker(this, f, bufferSize); 344 } else { 345 fs = getRawFileSystem(); 346 in = fs.open(f, bufferSize); 347 } 348 return new FSDataBoundedInputStream(fs, f, in); 349 } 350 351 @Override 352 public FSDataOutputStream append(Path f, int bufferSize, 353 Progressable progress) throws IOException { 354 throw new IOException("Not supported"); 355 } 356 357 /** 358 * Calculated the length of the checksum file in bytes. 359 * @param size the length of the data file in bytes 360 * @param bytesPerSum the number of bytes in a checksum block 361 * @return the number of bytes in the checksum file 362 */ 363 public static long getChecksumLength(long size, int bytesPerSum) { 364 //the checksum length is equal to size passed divided by bytesPerSum + 365 //bytes written in the beginning of the checksum file. 366 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + 367 CHECKSUM_VERSION.length + 4; 368 } 369 370 /** This class provides an output stream for a checksummed file. 371 * It generates checksums for data. */ 372 private static class ChecksumFSOutputSummer extends FSOutputSummer { 373 private FSDataOutputStream datas; 374 private FSDataOutputStream sums; 375 private static final float CHKSUM_AS_FRACTION = 0.01f; 376 377 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 378 Path file, 379 boolean overwrite, 380 short replication, 381 long blockSize, 382 Configuration conf) 383 throws IOException { 384 this(fs, file, overwrite, 385 conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 386 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT), 387 replication, blockSize, null); 388 } 389 390 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 391 Path file, 392 boolean overwrite, 393 int bufferSize, 394 short replication, 395 long blockSize, 396 Progressable progress) 397 throws IOException { 398 super(new PureJavaCrc32(), fs.getBytesPerSum(), 4); 399 int bytesPerSum = fs.getBytesPerSum(); 400 this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 401 replication, blockSize, progress); 402 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize); 403 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 404 sumBufferSize, replication, 405 blockSize); 406 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); 407 sums.writeInt(bytesPerSum); 408 } 409 410 @Override 411 public void close() throws IOException { 412 flushBuffer(); 413 sums.close(); 414 datas.close(); 415 } 416 417 @Override 418 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) 419 throws IOException { 420 datas.write(b, offset, len); 421 sums.write(checksum); 422 } 423 } 424 425 @Override 426 public FSDataOutputStream create(Path f, FsPermission permission, 427 boolean overwrite, int bufferSize, short replication, long blockSize, 428 Progressable progress) throws IOException { 429 return create(f, permission, overwrite, true, bufferSize, 430 replication, blockSize, progress); 431 } 432 433 private FSDataOutputStream create(Path f, FsPermission permission, 434 boolean overwrite, boolean createParent, int bufferSize, 435 short replication, long blockSize, 436 Progressable progress) throws IOException { 437 Path parent = f.getParent(); 438 if (parent != null) { 439 if (!createParent && !exists(parent)) { 440 throw new FileNotFoundException("Parent directory doesn't exist: " 441 + parent); 442 } else if (!mkdirs(parent)) { 443 throw new IOException("Mkdirs failed to create " + parent); 444 } 445 } 446 final FSDataOutputStream out; 447 if (writeChecksum) { 448 out = new FSDataOutputStream( 449 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication, 450 blockSize, progress), null); 451 } else { 452 out = fs.create(f, permission, overwrite, bufferSize, replication, 453 blockSize, progress); 454 // remove the checksum file since we aren't writing one 455 Path checkFile = getChecksumFile(f); 456 if (fs.exists(checkFile)) { 457 fs.delete(checkFile, true); 458 } 459 } 460 if (permission != null) { 461 setPermission(f, permission); 462 } 463 return out; 464 } 465 466 @Override 467 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 468 boolean overwrite, int bufferSize, short replication, long blockSize, 469 Progressable progress) throws IOException { 470 return create(f, permission, overwrite, false, bufferSize, replication, 471 blockSize, progress); 472 } 473 474 /** 475 * Set replication for an existing file. 476 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt> 477 * @param src file name 478 * @param replication new replication 479 * @throws IOException 480 * @return true if successful; 481 * false if file does not exist or is a directory 482 */ 483 @Override 484 public boolean setReplication(Path src, short replication) throws IOException { 485 boolean value = fs.setReplication(src, replication); 486 if (!value) 487 return false; 488 489 Path checkFile = getChecksumFile(src); 490 if (exists(checkFile)) 491 fs.setReplication(checkFile, replication); 492 493 return true; 494 } 495 496 /** 497 * Rename files/dirs 498 */ 499 @Override 500 public boolean rename(Path src, Path dst) throws IOException { 501 if (fs.isDirectory(src)) { 502 return fs.rename(src, dst); 503 } else { 504 if (fs.isDirectory(dst)) { 505 dst = new Path(dst, src.getName()); 506 } 507 508 boolean value = fs.rename(src, dst); 509 if (!value) 510 return false; 511 512 Path srcCheckFile = getChecksumFile(src); 513 Path dstCheckFile = getChecksumFile(dst); 514 if (fs.exists(srcCheckFile)) { //try to rename checksum 515 value = fs.rename(srcCheckFile, dstCheckFile); 516 } else if (fs.exists(dstCheckFile)) { 517 // no src checksum, so remove dst checksum 518 value = fs.delete(dstCheckFile, true); 519 } 520 521 return value; 522 } 523 } 524 525 /** 526 * Implement the delete(Path, boolean) in checksum 527 * file system. 528 */ 529 @Override 530 public boolean delete(Path f, boolean recursive) throws IOException{ 531 FileStatus fstatus = null; 532 try { 533 fstatus = fs.getFileStatus(f); 534 } catch(FileNotFoundException e) { 535 return false; 536 } 537 if (fstatus.isDirectory()) { 538 //this works since the crcs are in the same 539 //directories and the files. so we just delete 540 //everything in the underlying filesystem 541 return fs.delete(f, recursive); 542 } else { 543 Path checkFile = getChecksumFile(f); 544 if (fs.exists(checkFile)) { 545 fs.delete(checkFile, true); 546 } 547 return fs.delete(f, true); 548 } 549 } 550 551 final private static PathFilter DEFAULT_FILTER = new PathFilter() { 552 @Override 553 public boolean accept(Path file) { 554 return !isChecksumFile(file); 555 } 556 }; 557 558 /** 559 * List the statuses of the files/directories in the given path if the path is 560 * a directory. 561 * 562 * @param f 563 * given path 564 * @return the statuses of the files/directories in the given patch 565 * @throws IOException 566 */ 567 @Override 568 public FileStatus[] listStatus(Path f) throws IOException { 569 return fs.listStatus(f, DEFAULT_FILTER); 570 } 571 572 /** 573 * List the statuses of the files/directories in the given path if the path is 574 * a directory. 575 * 576 * @param f 577 * given path 578 * @return the statuses of the files/directories in the given patch 579 * @throws IOException 580 */ 581 @Override 582 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) 583 throws IOException { 584 return fs.listLocatedStatus(f, DEFAULT_FILTER); 585 } 586 587 @Override 588 public boolean mkdirs(Path f) throws IOException { 589 return fs.mkdirs(f); 590 } 591 592 @Override 593 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) 594 throws IOException { 595 Configuration conf = getConf(); 596 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf); 597 } 598 599 /** 600 * The src file is under FS, and the dst is on the local disk. 601 * Copy it from FS control to the local dst name. 602 */ 603 @Override 604 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 605 throws IOException { 606 Configuration conf = getConf(); 607 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf); 608 } 609 610 /** 611 * The src file is under FS, and the dst is on the local disk. 612 * Copy it from FS control to the local dst name. 613 * If src and dst are directories, the copyCrc parameter 614 * determines whether to copy CRC files. 615 */ 616 public void copyToLocalFile(Path src, Path dst, boolean copyCrc) 617 throws IOException { 618 if (!fs.isDirectory(src)) { // source is a file 619 fs.copyToLocalFile(src, dst); 620 FileSystem localFs = getLocal(getConf()).getRawFileSystem(); 621 if (localFs.isDirectory(dst)) { 622 dst = new Path(dst, src.getName()); 623 } 624 dst = getChecksumFile(dst); 625 if (localFs.exists(dst)) { //remove old local checksum file 626 localFs.delete(dst, true); 627 } 628 Path checksumFile = getChecksumFile(src); 629 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file 630 fs.copyToLocalFile(checksumFile, dst); 631 } 632 } else { 633 FileStatus[] srcs = listStatus(src); 634 for (FileStatus srcFile : srcs) { 635 copyToLocalFile(srcFile.getPath(), 636 new Path(dst, srcFile.getPath().getName()), copyCrc); 637 } 638 } 639 } 640 641 @Override 642 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 643 throws IOException { 644 return tmpLocalFile; 645 } 646 647 @Override 648 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 649 throws IOException { 650 moveFromLocalFile(tmpLocalFile, fsOutputFile); 651 } 652 653 /** 654 * Report a checksum error to the file system. 655 * @param f the file name containing the error 656 * @param in the stream open on the file 657 * @param inPos the position of the beginning of the bad data in the file 658 * @param sums the stream open on the checksum file 659 * @param sumsPos the position of the beginning of the bad data in the checksum file 660 * @return if retry is neccessary 661 */ 662 public boolean reportChecksumFailure(Path f, FSDataInputStream in, 663 long inPos, FSDataInputStream sums, long sumsPos) { 664 return false; 665 } 666 }