001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io.compress; 020 021 import java.io.BufferedInputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.OutputStream; 025 026 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.fs.Seekable; 030 import org.apache.hadoop.io.compress.bzip2.BZip2Constants; 031 import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor; 032 import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor; 033 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; 034 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; 035 036 /** 037 * This class provides CompressionOutputStream and CompressionInputStream for 038 * compression and decompression. Currently we dont have an implementation of 039 * the Compressor and Decompressor interfaces, so those methods of 040 * CompressionCodec which have a Compressor or Decompressor type argument, throw 041 * UnsupportedOperationException. 042 */ 043 @InterfaceAudience.Public 044 @InterfaceStability.Evolving 045 public class BZip2Codec implements SplittableCompressionCodec { 046 047 private static final String HEADER = "BZ"; 048 private static final int HEADER_LEN = HEADER.length(); 049 private static final String SUB_HEADER = "h9"; 050 private static final int SUB_HEADER_LEN = SUB_HEADER.length(); 051 052 /** 053 * Creates a new instance of BZip2Codec 054 */ 055 public BZip2Codec() { } 056 057 /** 058 * Creates CompressionOutputStream for BZip2 059 * 060 * @param out 061 * The output Stream 062 * @return The BZip2 CompressionOutputStream 063 * @throws java.io.IOException 064 * Throws IO exception 065 */ 066 @Override 067 public CompressionOutputStream createOutputStream(OutputStream out) 068 throws IOException { 069 return new BZip2CompressionOutputStream(out); 070 } 071 072 /** 073 * Creates a compressor using given OutputStream. 074 * 075 * @return CompressionOutputStream 076 @throws java.io.IOException 077 */ 078 @Override 079 public CompressionOutputStream createOutputStream(OutputStream out, 080 Compressor compressor) throws IOException { 081 return createOutputStream(out); 082 } 083 084 /** 085 * This functionality is currently not supported. 086 * 087 * @return BZip2DummyCompressor.class 088 */ 089 @Override 090 public Class<? extends org.apache.hadoop.io.compress.Compressor> getCompressorType() { 091 return BZip2DummyCompressor.class; 092 } 093 094 /** 095 * This functionality is currently not supported. 096 * 097 * @return Compressor 098 */ 099 @Override 100 public Compressor createCompressor() { 101 return new BZip2DummyCompressor(); 102 } 103 104 /** 105 * Creates CompressionInputStream to be used to read off uncompressed data. 106 * 107 * @param in 108 * The InputStream 109 * @return Returns CompressionInputStream for BZip2 110 * @throws java.io.IOException 111 * Throws IOException 112 */ 113 @Override 114 public CompressionInputStream createInputStream(InputStream in) 115 throws IOException { 116 return new BZip2CompressionInputStream(in); 117 } 118 119 /** 120 * This functionality is currently not supported. 121 * 122 * @return CompressionInputStream 123 */ 124 @Override 125 public CompressionInputStream createInputStream(InputStream in, 126 Decompressor decompressor) throws IOException { 127 return createInputStream(in); 128 } 129 130 /** 131 * Creates CompressionInputStream to be used to read off uncompressed data 132 * in one of the two reading modes. i.e. Continuous or Blocked reading modes 133 * 134 * @param seekableIn The InputStream 135 * @param start The start offset into the compressed stream 136 * @param end The end offset into the compressed stream 137 * @param readMode Controls whether progress is reported continuously or 138 * only at block boundaries. 139 * 140 * @return CompressionInputStream for BZip2 aligned at block boundaries 141 */ 142 @Override 143 public SplitCompressionInputStream createInputStream(InputStream seekableIn, 144 Decompressor decompressor, long start, long end, READ_MODE readMode) 145 throws IOException { 146 147 if (!(seekableIn instanceof Seekable)) { 148 throw new IOException("seekableIn must be an instance of " + 149 Seekable.class.getName()); 150 } 151 152 //find the position of first BZip2 start up marker 153 ((Seekable)seekableIn).seek(0); 154 155 // BZip2 start of block markers are of 6 bytes. But the very first block 156 // also has "BZh9", making it 10 bytes. This is the common case. But at 157 // time stream might start without a leading BZ. 158 final long FIRST_BZIP2_BLOCK_MARKER_POSITION = 159 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); 160 long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION); 161 162 ((Seekable)seekableIn).seek(adjStart); 163 SplitCompressionInputStream in = 164 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode); 165 166 167 // The following if clause handles the following case: 168 // Assume the following scenario in BZip2 compressed stream where 169 // . represent compressed data. 170 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]... 171 // ........................[47 bits][1 bit].....[48 bit Block]... 172 // ................................^[Assume a Byte alignment here] 173 // ........................................^^[current position of stream] 174 // .....................^^[We go back 10 Bytes in stream and find a Block marker] 175 // ........................................^^[We align at wrong position!] 176 // ...........................................................^^[While this pos is correct] 177 178 if (in.getPos() <= start) { 179 ((Seekable)seekableIn).seek(start); 180 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode); 181 } 182 183 return in; 184 } 185 186 /** 187 * This functionality is currently not supported. 188 * 189 * @return BZip2DummyDecompressor.class 190 */ 191 @Override 192 public Class<? extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() { 193 return BZip2DummyDecompressor.class; 194 } 195 196 /** 197 * This functionality is currently not supported. 198 * 199 * @return Decompressor 200 */ 201 @Override 202 public Decompressor createDecompressor() { 203 return new BZip2DummyDecompressor(); 204 } 205 206 /** 207 * .bz2 is recognized as the default extension for compressed BZip2 files 208 * 209 * @return A String telling the default bzip2 file extension 210 */ 211 @Override 212 public String getDefaultExtension() { 213 return ".bz2"; 214 } 215 216 private static class BZip2CompressionOutputStream extends 217 CompressionOutputStream { 218 219 // class data starts here// 220 private CBZip2OutputStream output; 221 private boolean needsReset; 222 // class data ends here// 223 224 public BZip2CompressionOutputStream(OutputStream out) 225 throws IOException { 226 super(out); 227 needsReset = true; 228 } 229 230 private void writeStreamHeader() throws IOException { 231 if (super.out != null) { 232 // The compressed bzip2 stream should start with the 233 // identifying characters BZ. Caller of CBZip2OutputStream 234 // i.e. this class must write these characters. 235 out.write(HEADER.getBytes()); 236 } 237 } 238 239 @Override 240 public void finish() throws IOException { 241 if (needsReset) { 242 // In the case that nothing is written to this stream, we still need to 243 // write out the header before closing, otherwise the stream won't be 244 // recognized by BZip2CompressionInputStream. 245 internalReset(); 246 } 247 this.output.finish(); 248 needsReset = true; 249 } 250 251 private void internalReset() throws IOException { 252 if (needsReset) { 253 needsReset = false; 254 writeStreamHeader(); 255 this.output = new CBZip2OutputStream(out); 256 } 257 } 258 259 @Override 260 public void resetState() throws IOException { 261 // Cannot write to out at this point because out might not be ready 262 // yet, as in SequenceFile.Writer implementation. 263 needsReset = true; 264 } 265 266 @Override 267 public void write(int b) throws IOException { 268 if (needsReset) { 269 internalReset(); 270 } 271 this.output.write(b); 272 } 273 274 @Override 275 public void write(byte[] b, int off, int len) throws IOException { 276 if (needsReset) { 277 internalReset(); 278 } 279 this.output.write(b, off, len); 280 } 281 282 @Override 283 public void close() throws IOException { 284 if (needsReset) { 285 // In the case that nothing is written to this stream, we still need to 286 // write out the header before closing, otherwise the stream won't be 287 // recognized by BZip2CompressionInputStream. 288 internalReset(); 289 } 290 this.output.flush(); 291 this.output.close(); 292 needsReset = true; 293 } 294 295 }// end of class BZip2CompressionOutputStream 296 297 /** 298 * This class is capable to de-compress BZip2 data in two modes; 299 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to 300 * do decompression starting any arbitrary position in the stream. 301 * 302 * So this facility can easily be used to parallelize decompression 303 * of a large BZip2 file for performance reasons. (It is exactly 304 * done so for Hadoop framework. See LineRecordReader for an 305 * example). So one can break the file (of course logically) into 306 * chunks for parallel processing. These "splits" should be like 307 * default Hadoop splits (e.g as in FileInputFormat getSplit metod). 308 * So this code is designed and tested for FileInputFormat's way 309 * of splitting only. 310 */ 311 312 private static class BZip2CompressionInputStream extends 313 SplitCompressionInputStream { 314 315 // class data starts here// 316 private CBZip2InputStream input; 317 boolean needsReset; 318 private BufferedInputStream bufferedIn; 319 private boolean isHeaderStripped = false; 320 private boolean isSubHeaderStripped = false; 321 private READ_MODE readMode = READ_MODE.CONTINUOUS; 322 private long startingPos = 0L; 323 324 // Following state machine handles different states of compressed stream 325 // position 326 // HOLD : Don't advertise compressed stream position 327 // ADVERTISE : Read 1 more character and advertise stream position 328 // See more comments about it before updatePos method. 329 private enum POS_ADVERTISEMENT_STATE_MACHINE { 330 HOLD, ADVERTISE 331 }; 332 333 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 334 long compressedStreamPosition = 0; 335 336 // class data ends here// 337 338 public BZip2CompressionInputStream(InputStream in) throws IOException { 339 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS); 340 } 341 342 public BZip2CompressionInputStream(InputStream in, long start, long end, 343 READ_MODE readMode) throws IOException { 344 super(in, start, end); 345 needsReset = false; 346 bufferedIn = new BufferedInputStream(super.in); 347 this.startingPos = super.getPos(); 348 this.readMode = readMode; 349 if (this.startingPos == 0) { 350 // We only strip header if it is start of file 351 bufferedIn = readStreamHeader(); 352 } 353 input = new CBZip2InputStream(bufferedIn, readMode); 354 if (this.isHeaderStripped) { 355 input.updateReportedByteCount(HEADER_LEN); 356 } 357 358 if (this.isSubHeaderStripped) { 359 input.updateReportedByteCount(SUB_HEADER_LEN); 360 } 361 362 this.updatePos(false); 363 } 364 365 private BufferedInputStream readStreamHeader() throws IOException { 366 // We are flexible enough to allow the compressed stream not to 367 // start with the header of BZ. So it works fine either we have 368 // the header or not. 369 if (super.in != null) { 370 bufferedIn.mark(HEADER_LEN); 371 byte[] headerBytes = new byte[HEADER_LEN]; 372 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); 373 if (actualRead != -1) { 374 String header = new String(headerBytes); 375 if (header.compareTo(HEADER) != 0) { 376 bufferedIn.reset(); 377 } else { 378 this.isHeaderStripped = true; 379 // In case of BYBLOCK mode, we also want to strip off 380 // remaining two character of the header. 381 if (this.readMode == READ_MODE.BYBLOCK) { 382 actualRead = bufferedIn.read(headerBytes, 0, 383 SUB_HEADER_LEN); 384 if (actualRead != -1) { 385 this.isSubHeaderStripped = true; 386 } 387 } 388 } 389 } 390 } 391 392 if (bufferedIn == null) { 393 throw new IOException("Failed to read bzip2 stream."); 394 } 395 396 return bufferedIn; 397 398 }// end of method 399 400 @Override 401 public void close() throws IOException { 402 if (!needsReset) { 403 input.close(); 404 needsReset = true; 405 } 406 } 407 408 /** 409 * This method updates compressed stream position exactly when the 410 * client of this code has read off at least one byte passed any BZip2 411 * end of block marker. 412 * 413 * This mechanism is very helpful to deal with data level record 414 * boundaries. Please see constructor and next methods of 415 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this 416 * feature. We elaborate it with an example in the following: 417 * 418 * Assume two different scenarios of the BZip2 compressed stream, where 419 * [m] represent end of block, \n is line delimiter and . represent compressed 420 * data. 421 * 422 * ............[m]......\n....... 423 * 424 * ..........\n[m]......\n....... 425 * 426 * Assume that end is right after [m]. In the first case the reading 427 * will stop at \n and there is no need to read one more line. (To see the 428 * reason of reading one more line in the next() method is explained in LineRecordReader.) 429 * While in the second example LineRecordReader needs to read one more line 430 * (till the second \n). Now since BZip2Codecs only update position 431 * at least one byte passed a maker, so it is straight forward to differentiate 432 * between the two cases mentioned. 433 * 434 */ 435 436 @Override 437 public int read(byte[] b, int off, int len) throws IOException { 438 if (needsReset) { 439 internalReset(); 440 } 441 442 int result = 0; 443 result = this.input.read(b, off, len); 444 if (result == BZip2Constants.END_OF_BLOCK) { 445 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; 446 } 447 448 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { 449 result = this.input.read(b, off, off + 1); 450 // This is the precise time to update compressed stream position 451 // to the client of this code. 452 this.updatePos(true); 453 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 454 } 455 456 return result; 457 458 } 459 460 @Override 461 public int read() throws IOException { 462 byte b[] = new byte[1]; 463 int result = this.read(b, 0, 1); 464 return (result < 0) ? result : (b[0] & 0xff); 465 } 466 467 private void internalReset() throws IOException { 468 if (needsReset) { 469 needsReset = false; 470 BufferedInputStream bufferedIn = readStreamHeader(); 471 input = new CBZip2InputStream(bufferedIn, this.readMode); 472 } 473 } 474 475 @Override 476 public void resetState() throws IOException { 477 // Cannot read from bufferedIn at this point because bufferedIn 478 // might not be ready 479 // yet, as in SequenceFile.Reader implementation. 480 needsReset = true; 481 } 482 483 @Override 484 public long getPos() { 485 return this.compressedStreamPosition; 486 } 487 488 /* 489 * As the comments before read method tell that 490 * compressed stream is advertised when at least 491 * one byte passed EOB have been read off. But 492 * there is an exception to this rule. When we 493 * construct the stream we advertise the position 494 * exactly at EOB. In the following method 495 * shouldAddOn boolean captures this exception. 496 * 497 */ 498 private void updatePos(boolean shouldAddOn) { 499 int addOn = shouldAddOn ? 1 : 0; 500 this.compressedStreamPosition = this.startingPos 501 + this.input.getProcessedByteCount() + addOn; 502 } 503 504 }// end of BZip2CompressionInputStream 505 506 }