001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.Collections; 024 import java.util.Comparator; 025 import java.util.HashSet; 026 import java.util.IdentityHashMap; 027 import java.util.LinkedList; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 import org.apache.hadoop.classification.InterfaceAudience; 035 import org.apache.hadoop.classification.InterfaceStability; 036 import org.apache.hadoop.fs.BlockLocation; 037 import org.apache.hadoop.fs.FileStatus; 038 import org.apache.hadoop.fs.FileSystem; 039 import org.apache.hadoop.fs.Path; 040 import org.apache.hadoop.fs.PathFilter; 041 import org.apache.hadoop.mapreduce.security.TokenCache; 042 import org.apache.hadoop.net.NetworkTopology; 043 import org.apache.hadoop.net.Node; 044 import org.apache.hadoop.net.NodeBase; 045 import org.apache.hadoop.util.ReflectionUtils; 046 import org.apache.hadoop.util.StringUtils; 047 048 /** 049 * A base class for file-based {@link InputFormat}. 050 * 051 * <p><code>FileInputFormat</code> is the base class for all file-based 052 * <code>InputFormat</code>s. This provides a generic implementation of 053 * {@link #getSplits(JobConf, int)}. 054 * Subclasses of <code>FileInputFormat</code> can also override the 055 * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are 056 * not split-up and are processed as a whole by {@link Mapper}s. 057 */ 058 @InterfaceAudience.Public 059 @InterfaceStability.Stable 060 public abstract class FileInputFormat<K, V> implements InputFormat<K, V> { 061 062 public static final Log LOG = 063 LogFactory.getLog(FileInputFormat.class); 064 065 public static final String NUM_INPUT_FILES = 066 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; 067 068 private static final double SPLIT_SLOP = 1.1; // 10% slop 069 070 private long minSplitSize = 1; 071 private static final PathFilter hiddenFileFilter = new PathFilter(){ 072 public boolean accept(Path p){ 073 String name = p.getName(); 074 return !name.startsWith("_") && !name.startsWith("."); 075 } 076 }; 077 protected void setMinSplitSize(long minSplitSize) { 078 this.minSplitSize = minSplitSize; 079 } 080 081 /** 082 * Proxy PathFilter that accepts a path only if all filters given in the 083 * constructor do. Used by the listPaths() to apply the built-in 084 * hiddenFileFilter together with a user provided one (if any). 085 */ 086 private static class MultiPathFilter implements PathFilter { 087 private List<PathFilter> filters; 088 089 public MultiPathFilter(List<PathFilter> filters) { 090 this.filters = filters; 091 } 092 093 public boolean accept(Path path) { 094 for (PathFilter filter : filters) { 095 if (!filter.accept(path)) { 096 return false; 097 } 098 } 099 return true; 100 } 101 } 102 103 /** 104 * Is the given filename splitable? Usually, true, but if the file is 105 * stream compressed, it will not be. 106 * 107 * <code>FileInputFormat</code> implementations can override this and return 108 * <code>false</code> to ensure that individual input files are never split-up 109 * so that {@link Mapper}s process entire files. 110 * 111 * @param fs the file system that the file is on 112 * @param filename the file name to check 113 * @return is this file splitable? 114 */ 115 protected boolean isSplitable(FileSystem fs, Path filename) { 116 return true; 117 } 118 119 public abstract RecordReader<K, V> getRecordReader(InputSplit split, 120 JobConf job, 121 Reporter reporter) 122 throws IOException; 123 124 /** 125 * Set a PathFilter to be applied to the input paths for the map-reduce job. 126 * 127 * @param filter the PathFilter class use for filtering the input paths. 128 */ 129 public static void setInputPathFilter(JobConf conf, 130 Class<? extends PathFilter> filter) { 131 conf.setClass(org.apache.hadoop.mapreduce.lib.input. 132 FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class); 133 } 134 135 /** 136 * Get a PathFilter instance of the filter set for the input paths. 137 * 138 * @return the PathFilter instance set for the job, NULL if none has been set. 139 */ 140 public static PathFilter getInputPathFilter(JobConf conf) { 141 Class<? extends PathFilter> filterClass = conf.getClass( 142 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS, 143 null, PathFilter.class); 144 return (filterClass != null) ? 145 ReflectionUtils.newInstance(filterClass, conf) : null; 146 } 147 148 /** 149 * Add files in the input path recursively into the results. 150 * @param result 151 * The List to store all files. 152 * @param fs 153 * The FileSystem. 154 * @param path 155 * The input path. 156 * @param inputFilter 157 * The input filter that can be used to filter files/dirs. 158 * @throws IOException 159 */ 160 protected void addInputPathRecursively(List<FileStatus> result, 161 FileSystem fs, Path path, PathFilter inputFilter) 162 throws IOException { 163 for(FileStatus stat: fs.listStatus(path, inputFilter)) { 164 if (stat.isDirectory()) { 165 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 166 } else { 167 result.add(stat); 168 } 169 } 170 } 171 172 /** List input directories. 173 * Subclasses may override to, e.g., select only files matching a regular 174 * expression. 175 * 176 * @param job the job to list input paths for 177 * @return array of FileStatus objects 178 * @throws IOException if zero items. 179 */ 180 protected FileStatus[] listStatus(JobConf job) throws IOException { 181 Path[] dirs = getInputPaths(job); 182 if (dirs.length == 0) { 183 throw new IOException("No input paths specified in job"); 184 } 185 186 // get tokens for all the required FileSystems.. 187 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); 188 189 // Whether we need to recursive look into the directory structure 190 boolean recursive = job.getBoolean("mapred.input.dir.recursive", false); 191 192 List<FileStatus> result = new ArrayList<FileStatus>(); 193 List<IOException> errors = new ArrayList<IOException>(); 194 195 // creates a MultiPathFilter with the hiddenFileFilter and the 196 // user provided one (if any). 197 List<PathFilter> filters = new ArrayList<PathFilter>(); 198 filters.add(hiddenFileFilter); 199 PathFilter jobFilter = getInputPathFilter(job); 200 if (jobFilter != null) { 201 filters.add(jobFilter); 202 } 203 PathFilter inputFilter = new MultiPathFilter(filters); 204 205 for (Path p: dirs) { 206 FileSystem fs = p.getFileSystem(job); 207 FileStatus[] matches = fs.globStatus(p, inputFilter); 208 if (matches == null) { 209 errors.add(new IOException("Input path does not exist: " + p)); 210 } else if (matches.length == 0) { 211 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 212 } else { 213 for (FileStatus globStat: matches) { 214 if (globStat.isDirectory()) { 215 for(FileStatus stat: fs.listStatus(globStat.getPath(), 216 inputFilter)) { 217 if (recursive && stat.isDirectory()) { 218 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 219 } else { 220 result.add(stat); 221 } 222 } 223 } else { 224 result.add(globStat); 225 } 226 } 227 } 228 } 229 230 if (!errors.isEmpty()) { 231 throw new InvalidInputException(errors); 232 } 233 LOG.info("Total input paths to process : " + result.size()); 234 return result.toArray(new FileStatus[result.size()]); 235 } 236 237 /** 238 * A factory that makes the split for this class. It can be overridden 239 * by sub-classes to make sub-types 240 */ 241 protected FileSplit makeSplit(Path file, long start, long length, 242 String[] hosts) { 243 return new FileSplit(file, start, length, hosts); 244 } 245 246 /** Splits files returned by {@link #listStatus(JobConf)} when 247 * they're too big.*/ 248 @SuppressWarnings("deprecation") 249 public InputSplit[] getSplits(JobConf job, int numSplits) 250 throws IOException { 251 FileStatus[] files = listStatus(job); 252 253 // Save the number of input files for metrics/loadgen 254 job.setLong(NUM_INPUT_FILES, files.length); 255 long totalSize = 0; // compute total size 256 for (FileStatus file: files) { // check we have valid files 257 if (file.isDirectory()) { 258 throw new IOException("Not a file: "+ file.getPath()); 259 } 260 totalSize += file.getLen(); 261 } 262 263 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 264 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. 265 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); 266 267 // generate splits 268 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 269 NetworkTopology clusterMap = new NetworkTopology(); 270 for (FileStatus file: files) { 271 Path path = file.getPath(); 272 FileSystem fs = path.getFileSystem(job); 273 long length = file.getLen(); 274 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); 275 if ((length != 0) && isSplitable(fs, path)) { 276 long blockSize = file.getBlockSize(); 277 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 278 279 long bytesRemaining = length; 280 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 281 String[] splitHosts = getSplitHosts(blkLocations, 282 length-bytesRemaining, splitSize, clusterMap); 283 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 284 splitHosts)); 285 bytesRemaining -= splitSize; 286 } 287 288 if (bytesRemaining != 0) { 289 String[] splitHosts = getSplitHosts(blkLocations, length 290 - bytesRemaining, bytesRemaining, clusterMap); 291 splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, 292 splitHosts)); 293 } 294 } else if (length != 0) { 295 String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); 296 splits.add(makeSplit(path, 0, length, splitHosts)); 297 } else { 298 //Create empty hosts array for zero length files 299 splits.add(makeSplit(path, 0, length, new String[0])); 300 } 301 } 302 LOG.debug("Total # of splits: " + splits.size()); 303 return splits.toArray(new FileSplit[splits.size()]); 304 } 305 306 protected long computeSplitSize(long goalSize, long minSize, 307 long blockSize) { 308 return Math.max(minSize, Math.min(goalSize, blockSize)); 309 } 310 311 protected int getBlockIndex(BlockLocation[] blkLocations, 312 long offset) { 313 for (int i = 0 ; i < blkLocations.length; i++) { 314 // is the offset inside this block? 315 if ((blkLocations[i].getOffset() <= offset) && 316 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 317 return i; 318 } 319 } 320 BlockLocation last = blkLocations[blkLocations.length -1]; 321 long fileLength = last.getOffset() + last.getLength() -1; 322 throw new IllegalArgumentException("Offset " + offset + 323 " is outside of file (0.." + 324 fileLength + ")"); 325 } 326 327 /** 328 * Sets the given comma separated paths as the list of inputs 329 * for the map-reduce job. 330 * 331 * @param conf Configuration of the job 332 * @param commaSeparatedPaths Comma separated paths to be set as 333 * the list of inputs for the map-reduce job. 334 */ 335 public static void setInputPaths(JobConf conf, String commaSeparatedPaths) { 336 setInputPaths(conf, StringUtils.stringToPath( 337 getPathStrings(commaSeparatedPaths))); 338 } 339 340 /** 341 * Add the given comma separated paths to the list of inputs for 342 * the map-reduce job. 343 * 344 * @param conf The configuration of the job 345 * @param commaSeparatedPaths Comma separated paths to be added to 346 * the list of inputs for the map-reduce job. 347 */ 348 public static void addInputPaths(JobConf conf, String commaSeparatedPaths) { 349 for (String str : getPathStrings(commaSeparatedPaths)) { 350 addInputPath(conf, new Path(str)); 351 } 352 } 353 354 /** 355 * Set the array of {@link Path}s as the list of inputs 356 * for the map-reduce job. 357 * 358 * @param conf Configuration of the job. 359 * @param inputPaths the {@link Path}s of the input directories/files 360 * for the map-reduce job. 361 */ 362 public static void setInputPaths(JobConf conf, Path... inputPaths) { 363 Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); 364 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 365 for(int i = 1; i < inputPaths.length;i++) { 366 str.append(StringUtils.COMMA_STR); 367 path = new Path(conf.getWorkingDirectory(), inputPaths[i]); 368 str.append(StringUtils.escapeString(path.toString())); 369 } 370 conf.set(org.apache.hadoop.mapreduce.lib.input. 371 FileInputFormat.INPUT_DIR, str.toString()); 372 } 373 374 /** 375 * Add a {@link Path} to the list of inputs for the map-reduce job. 376 * 377 * @param conf The configuration of the job 378 * @param path {@link Path} to be added to the list of inputs for 379 * the map-reduce job. 380 */ 381 public static void addInputPath(JobConf conf, Path path ) { 382 path = new Path(conf.getWorkingDirectory(), path); 383 String dirStr = StringUtils.escapeString(path.toString()); 384 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 385 FileInputFormat.INPUT_DIR); 386 conf.set(org.apache.hadoop.mapreduce.lib.input. 387 FileInputFormat.INPUT_DIR, dirs == null ? dirStr : 388 dirs + StringUtils.COMMA_STR + dirStr); 389 } 390 391 // This method escapes commas in the glob pattern of the given paths. 392 private static String[] getPathStrings(String commaSeparatedPaths) { 393 int length = commaSeparatedPaths.length(); 394 int curlyOpen = 0; 395 int pathStart = 0; 396 boolean globPattern = false; 397 List<String> pathStrings = new ArrayList<String>(); 398 399 for (int i=0; i<length; i++) { 400 char ch = commaSeparatedPaths.charAt(i); 401 switch(ch) { 402 case '{' : { 403 curlyOpen++; 404 if (!globPattern) { 405 globPattern = true; 406 } 407 break; 408 } 409 case '}' : { 410 curlyOpen--; 411 if (curlyOpen == 0 && globPattern) { 412 globPattern = false; 413 } 414 break; 415 } 416 case ',' : { 417 if (!globPattern) { 418 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 419 pathStart = i + 1 ; 420 } 421 break; 422 } 423 default: 424 continue; // nothing special to do for this character 425 } 426 } 427 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 428 429 return pathStrings.toArray(new String[0]); 430 } 431 432 /** 433 * Get the list of input {@link Path}s for the map-reduce job. 434 * 435 * @param conf The configuration of the job 436 * @return the list of input {@link Path}s for the map-reduce job. 437 */ 438 public static Path[] getInputPaths(JobConf conf) { 439 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 440 FileInputFormat.INPUT_DIR, ""); 441 String [] list = StringUtils.split(dirs); 442 Path[] result = new Path[list.length]; 443 for (int i = 0; i < list.length; i++) { 444 result[i] = new Path(StringUtils.unEscapeString(list[i])); 445 } 446 return result; 447 } 448 449 450 private void sortInDescendingOrder(List<NodeInfo> mylist) { 451 Collections.sort(mylist, new Comparator<NodeInfo> () { 452 public int compare(NodeInfo obj1, NodeInfo obj2) { 453 454 if (obj1 == null || obj2 == null) 455 return -1; 456 457 if (obj1.getValue() == obj2.getValue()) { 458 return 0; 459 } 460 else { 461 return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); 462 } 463 } 464 } 465 ); 466 } 467 468 /** 469 * This function identifies and returns the hosts that contribute 470 * most for a given split. For calculating the contribution, rack 471 * locality is treated on par with host locality, so hosts from racks 472 * that contribute the most are preferred over hosts on racks that 473 * contribute less 474 * @param blkLocations The list of block locations 475 * @param offset 476 * @param splitSize 477 * @return array of hosts that contribute most to this split 478 * @throws IOException 479 */ 480 protected String[] getSplitHosts(BlockLocation[] blkLocations, 481 long offset, long splitSize, NetworkTopology clusterMap) 482 throws IOException { 483 484 int startIndex = getBlockIndex(blkLocations, offset); 485 486 long bytesInThisBlock = blkLocations[startIndex].getOffset() + 487 blkLocations[startIndex].getLength() - offset; 488 489 //If this is the only block, just return 490 if (bytesInThisBlock >= splitSize) { 491 return blkLocations[startIndex].getHosts(); 492 } 493 494 long bytesInFirstBlock = bytesInThisBlock; 495 int index = startIndex + 1; 496 splitSize -= bytesInThisBlock; 497 498 while (splitSize > 0) { 499 bytesInThisBlock = 500 Math.min(splitSize, blkLocations[index++].getLength()); 501 splitSize -= bytesInThisBlock; 502 } 503 504 long bytesInLastBlock = bytesInThisBlock; 505 int endIndex = index - 1; 506 507 Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>(); 508 Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>(); 509 String [] allTopos = new String[0]; 510 511 // Build the hierarchy and aggregate the contribution of 512 // bytes at each level. See TestGetSplitHosts.java 513 514 for (index = startIndex; index <= endIndex; index++) { 515 516 // Establish the bytes in this block 517 if (index == startIndex) { 518 bytesInThisBlock = bytesInFirstBlock; 519 } 520 else if (index == endIndex) { 521 bytesInThisBlock = bytesInLastBlock; 522 } 523 else { 524 bytesInThisBlock = blkLocations[index].getLength(); 525 } 526 527 allTopos = blkLocations[index].getTopologyPaths(); 528 529 // If no topology information is available, just 530 // prefix a fakeRack 531 if (allTopos.length == 0) { 532 allTopos = fakeRacks(blkLocations, index); 533 } 534 535 // NOTE: This code currently works only for one level of 536 // hierarchy (rack/host). However, it is relatively easy 537 // to extend this to support aggregation at different 538 // levels 539 540 for (String topo: allTopos) { 541 542 Node node, parentNode; 543 NodeInfo nodeInfo, parentNodeInfo; 544 545 node = clusterMap.getNode(topo); 546 547 if (node == null) { 548 node = new NodeBase(topo); 549 clusterMap.add(node); 550 } 551 552 nodeInfo = hostsMap.get(node); 553 554 if (nodeInfo == null) { 555 nodeInfo = new NodeInfo(node); 556 hostsMap.put(node,nodeInfo); 557 parentNode = node.getParent(); 558 parentNodeInfo = racksMap.get(parentNode); 559 if (parentNodeInfo == null) { 560 parentNodeInfo = new NodeInfo(parentNode); 561 racksMap.put(parentNode,parentNodeInfo); 562 } 563 parentNodeInfo.addLeaf(nodeInfo); 564 } 565 else { 566 nodeInfo = hostsMap.get(node); 567 parentNode = node.getParent(); 568 parentNodeInfo = racksMap.get(parentNode); 569 } 570 571 nodeInfo.addValue(index, bytesInThisBlock); 572 parentNodeInfo.addValue(index, bytesInThisBlock); 573 574 } // for all topos 575 576 } // for all indices 577 578 return identifyHosts(allTopos.length, racksMap); 579 } 580 581 private String[] identifyHosts(int replicationFactor, 582 Map<Node,NodeInfo> racksMap) { 583 584 String [] retVal = new String[replicationFactor]; 585 586 List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 587 588 rackList.addAll(racksMap.values()); 589 590 // Sort the racks based on their contribution to this split 591 sortInDescendingOrder(rackList); 592 593 boolean done = false; 594 int index = 0; 595 596 // Get the host list for all our aggregated items, sort 597 // them and return the top entries 598 for (NodeInfo ni: rackList) { 599 600 Set<NodeInfo> hostSet = ni.getLeaves(); 601 602 List<NodeInfo>hostList = new LinkedList<NodeInfo>(); 603 hostList.addAll(hostSet); 604 605 // Sort the hosts in this rack based on their contribution 606 sortInDescendingOrder(hostList); 607 608 for (NodeInfo host: hostList) { 609 // Strip out the port number from the host name 610 retVal[index++] = host.node.getName().split(":")[0]; 611 if (index == replicationFactor) { 612 done = true; 613 break; 614 } 615 } 616 617 if (done == true) { 618 break; 619 } 620 } 621 return retVal; 622 } 623 624 private String[] fakeRacks(BlockLocation[] blkLocations, int index) 625 throws IOException { 626 String[] allHosts = blkLocations[index].getHosts(); 627 String[] allTopos = new String[allHosts.length]; 628 for (int i = 0; i < allHosts.length; i++) { 629 allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; 630 } 631 return allTopos; 632 } 633 634 635 private static class NodeInfo { 636 final Node node; 637 final Set<Integer> blockIds; 638 final Set<NodeInfo> leaves; 639 640 private long value; 641 642 NodeInfo(Node node) { 643 this.node = node; 644 blockIds = new HashSet<Integer>(); 645 leaves = new HashSet<NodeInfo>(); 646 } 647 648 long getValue() {return value;} 649 650 void addValue(int blockIndex, long value) { 651 if (blockIds.add(blockIndex) == true) { 652 this.value += value; 653 } 654 } 655 656 Set<NodeInfo> getLeaves() { return leaves;} 657 658 void addLeaf(NodeInfo nodeInfo) { 659 leaves.add(nodeInfo); 660 } 661 } 662 }