001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 022 import java.io.IOException; 023 import java.net.URL; 024 import java.net.URLDecoder; 025 import java.util.Enumeration; 026 import java.util.regex.Pattern; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import org.apache.hadoop.classification.InterfaceAudience; 031 import org.apache.hadoop.classification.InterfaceAudience.Private; 032 import org.apache.hadoop.classification.InterfaceStability; 033 import org.apache.hadoop.conf.Configuration; 034 import org.apache.hadoop.fs.FileStatus; 035 import org.apache.hadoop.fs.FileSystem; 036 import org.apache.hadoop.fs.Path; 037 import org.apache.hadoop.io.LongWritable; 038 import org.apache.hadoop.io.RawComparator; 039 import org.apache.hadoop.io.Text; 040 import org.apache.hadoop.io.WritableComparable; 041 import org.apache.hadoop.io.WritableComparator; 042 import org.apache.hadoop.io.compress.CompressionCodec; 043 import org.apache.hadoop.mapred.lib.HashPartitioner; 044 import org.apache.hadoop.mapred.lib.IdentityMapper; 045 import org.apache.hadoop.mapred.lib.IdentityReducer; 046 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator; 047 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner; 048 import org.apache.hadoop.mapreduce.MRConfig; 049 import org.apache.hadoop.mapreduce.MRJobConfig; 050 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 051 import org.apache.hadoop.mapreduce.util.ConfigUtil; 052 import org.apache.hadoop.security.Credentials; 053 import org.apache.hadoop.util.ReflectionUtils; 054 import org.apache.hadoop.util.Tool; 055 import org.apache.log4j.Level; 056 057 /** 058 * A map/reduce job configuration. 059 * 060 * <p><code>JobConf</code> is the primary interface for a user to describe a 061 * map-reduce job to the Hadoop framework for execution. The framework tries to 062 * faithfully execute the job as-is described by <code>JobConf</code>, however: 063 * <ol> 064 * <li> 065 * Some configuration parameters might have been marked as 066 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams"> 067 * final</a> by administrators and hence cannot be altered. 068 * </li> 069 * <li> 070 * While some job parameters are straight-forward to set 071 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 072 * rest of the framework and/or job-configuration and is relatively more 073 * complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}). 074 * </li> 075 * </ol></p> 076 * 077 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 078 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 079 * {@link OutputFormat} implementations to be used etc. 080 * 081 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 082 * of the job such as <code>Comparator</code>s to be used, files to be put in 083 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 084 * are to be compressed (and how), debugability via user-provided scripts 085 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}), 086 * for doing post-processing on task logs, task's stdout, stderr, syslog. 087 * and etc.</p> 088 * 089 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p> 090 * <p><blockquote><pre> 091 * // Create a new JobConf 092 * JobConf job = new JobConf(new Configuration(), MyJob.class); 093 * 094 * // Specify various job-specific parameters 095 * job.setJobName("myjob"); 096 * 097 * FileInputFormat.setInputPaths(job, new Path("in")); 098 * FileOutputFormat.setOutputPath(job, new Path("out")); 099 * 100 * job.setMapperClass(MyJob.MyMapper.class); 101 * job.setCombinerClass(MyJob.MyReducer.class); 102 * job.setReducerClass(MyJob.MyReducer.class); 103 * 104 * job.setInputFormat(SequenceFileInputFormat.class); 105 * job.setOutputFormat(SequenceFileOutputFormat.class); 106 * </pre></blockquote></p> 107 * 108 * @see JobClient 109 * @see ClusterStatus 110 * @see Tool 111 * @see DistributedCache 112 */ 113 @InterfaceAudience.Public 114 @InterfaceStability.Stable 115 public class JobConf extends Configuration { 116 117 private static final Log LOG = LogFactory.getLog(JobConf.class); 118 119 static{ 120 ConfigUtil.loadResources(); 121 } 122 123 /** 124 * @deprecated Use {@link #MAPRED_JOB_MAP_MEMORY_MB_PROPERTY} and 125 * {@link #MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY} 126 */ 127 @Deprecated 128 public static final String MAPRED_TASK_MAXVMEM_PROPERTY = 129 "mapred.task.maxvmem"; 130 131 /** 132 * @deprecated 133 */ 134 @Deprecated 135 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = 136 "mapred.task.limit.maxvmem"; 137 138 /** 139 * @deprecated 140 */ 141 @Deprecated 142 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = 143 "mapred.task.default.maxvmem"; 144 145 /** 146 * @deprecated 147 */ 148 @Deprecated 149 public static final String MAPRED_TASK_MAXPMEM_PROPERTY = 150 "mapred.task.maxpmem"; 151 152 /** 153 * A value which if set for memory related configuration options, 154 * indicates that the options are turned off. 155 */ 156 public static final long DISABLED_MEMORY_LIMIT = -1L; 157 158 /** 159 * Property name for the configuration property mapreduce.cluster.local.dir 160 */ 161 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR; 162 163 /** 164 * Name of the queue to which jobs will be submitted, if no queue 165 * name is mentioned. 166 */ 167 public static final String DEFAULT_QUEUE_NAME = "default"; 168 169 static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 170 JobContext.MAP_MEMORY_MB; 171 172 static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = 173 JobContext.REDUCE_MEMORY_MB; 174 175 /** Pattern for the default unpacking behavior for job jars */ 176 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT = 177 Pattern.compile("(?:classes/|lib/).*"); 178 179 /** 180 * Configuration key to set the java command line options for the child 181 * map and reduce tasks. 182 * 183 * Java opts for the task tracker child processes. 184 * The following symbol, if present, will be interpolated: @taskid@. 185 * It is replaced by current TaskID. Any other occurrences of '@' will go 186 * unchanged. 187 * For example, to enable verbose gc logging to a file named for the taskid in 188 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 189 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 190 * 191 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 192 * other environment variables to the child processes. 193 * 194 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 195 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS} 196 */ 197 @Deprecated 198 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts"; 199 200 /** 201 * Configuration key to set the java command line options for the map tasks. 202 * 203 * Java opts for the task tracker child map processes. 204 * The following symbol, if present, will be interpolated: @taskid@. 205 * It is replaced by current TaskID. Any other occurrences of '@' will go 206 * unchanged. 207 * For example, to enable verbose gc logging to a file named for the taskid in 208 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 209 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 210 * 211 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 212 * other environment variables to the map processes. 213 */ 214 public static final String MAPRED_MAP_TASK_JAVA_OPTS = 215 JobContext.MAP_JAVA_OPTS; 216 217 /** 218 * Configuration key to set the java command line options for the reduce tasks. 219 * 220 * Java opts for the task tracker child reduce processes. 221 * The following symbol, if present, will be interpolated: @taskid@. 222 * It is replaced by current TaskID. Any other occurrences of '@' will go 223 * unchanged. 224 * For example, to enable verbose gc logging to a file named for the taskid in 225 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 226 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 227 * 228 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 229 * pass process environment variables to the reduce processes. 230 */ 231 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 232 JobContext.REDUCE_JAVA_OPTS; 233 234 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m"; 235 236 /** 237 * @deprecated 238 * Configuration key to set the maximum virtual memory available to the child 239 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no 240 * longer have any effect. 241 */ 242 @Deprecated 243 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit"; 244 245 /** 246 * @deprecated 247 * Configuration key to set the maximum virtual memory available to the 248 * map tasks (in kilo-bytes). This has been deprecated and will no 249 * longer have any effect. 250 */ 251 @Deprecated 252 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit"; 253 254 /** 255 * @deprecated 256 * Configuration key to set the maximum virtual memory available to the 257 * reduce tasks (in kilo-bytes). This has been deprecated and will no 258 * longer have any effect. 259 */ 260 @Deprecated 261 public static final String MAPRED_REDUCE_TASK_ULIMIT = 262 "mapreduce.reduce.ulimit"; 263 264 265 /** 266 * Configuration key to set the environment of the child map/reduce tasks. 267 * 268 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 269 * reference existing environment variables via <code>$key</code>. 270 * 271 * Example: 272 * <ul> 273 * <li> A=foo - This will set the env variable A to foo. </li> 274 * <li> B=$X:c This is inherit tasktracker's X env variable. </li> 275 * </ul> 276 * 277 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 278 * {@link #MAPRED_REDUCE_TASK_ENV} 279 */ 280 @Deprecated 281 public static final String MAPRED_TASK_ENV = "mapred.child.env"; 282 283 /** 284 * Configuration key to set the maximum virutal memory available to the 285 * map tasks. 286 * 287 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 288 * reference existing environment variables via <code>$key</code>. 289 * 290 * Example: 291 * <ul> 292 * <li> A=foo - This will set the env variable A to foo. </li> 293 * <li> B=$X:c This is inherit tasktracker's X env variable. </li> 294 * </ul> 295 */ 296 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV; 297 298 /** 299 * Configuration key to set the maximum virutal memory available to the 300 * reduce tasks. 301 * 302 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 303 * reference existing environment variables via <code>$key</code>. 304 * 305 * Example: 306 * <ul> 307 * <li> A=foo - This will set the env variable A to foo. </li> 308 * <li> B=$X:c This is inherit tasktracker's X env variable. </li> 309 * </ul> 310 */ 311 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV; 312 313 private Credentials credentials = new Credentials(); 314 315 /** 316 * Configuration key to set the logging {@link Level} for the map task. 317 * 318 * The allowed logging levels are: 319 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 320 */ 321 public static final String MAPRED_MAP_TASK_LOG_LEVEL = 322 JobContext.MAP_LOG_LEVEL; 323 324 /** 325 * Configuration key to set the logging {@link Level} for the reduce task. 326 * 327 * The allowed logging levels are: 328 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 329 */ 330 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 331 JobContext.REDUCE_LOG_LEVEL; 332 333 /** 334 * Default logging level for map/reduce tasks. 335 */ 336 public static final Level DEFAULT_LOG_LEVEL = Level.INFO; 337 338 339 /** 340 * Construct a map/reduce job configuration. 341 */ 342 public JobConf() { 343 checkAndWarnDeprecation(); 344 } 345 346 /** 347 * Construct a map/reduce job configuration. 348 * 349 * @param exampleClass a class whose containing jar is used as the job's jar. 350 */ 351 public JobConf(Class exampleClass) { 352 setJarByClass(exampleClass); 353 checkAndWarnDeprecation(); 354 } 355 356 /** 357 * Construct a map/reduce job configuration. 358 * 359 * @param conf a Configuration whose settings will be inherited. 360 */ 361 public JobConf(Configuration conf) { 362 super(conf); 363 364 if (conf instanceof JobConf) { 365 JobConf that = (JobConf)conf; 366 credentials = that.credentials; 367 } 368 369 checkAndWarnDeprecation(); 370 } 371 372 373 /** Construct a map/reduce job configuration. 374 * 375 * @param conf a Configuration whose settings will be inherited. 376 * @param exampleClass a class whose containing jar is used as the job's jar. 377 */ 378 public JobConf(Configuration conf, Class exampleClass) { 379 this(conf); 380 setJarByClass(exampleClass); 381 } 382 383 384 /** Construct a map/reduce configuration. 385 * 386 * @param config a Configuration-format XML job description file. 387 */ 388 public JobConf(String config) { 389 this(new Path(config)); 390 } 391 392 /** Construct a map/reduce configuration. 393 * 394 * @param config a Configuration-format XML job description file. 395 */ 396 public JobConf(Path config) { 397 super(); 398 addResource(config); 399 checkAndWarnDeprecation(); 400 } 401 402 /** A new map/reduce configuration where the behavior of reading from the 403 * default resources can be turned off. 404 * <p/> 405 * If the parameter {@code loadDefaults} is false, the new instance 406 * will not load resources from the default files. 407 * 408 * @param loadDefaults specifies whether to load from the default files 409 */ 410 public JobConf(boolean loadDefaults) { 411 super(loadDefaults); 412 checkAndWarnDeprecation(); 413 } 414 415 /** 416 * Get credentials for the job. 417 * @return credentials for the job 418 */ 419 public Credentials getCredentials() { 420 return credentials; 421 } 422 423 @Private 424 public void setCredentials(Credentials credentials) { 425 this.credentials = credentials; 426 } 427 428 /** 429 * Get the user jar for the map-reduce job. 430 * 431 * @return the user jar for the map-reduce job. 432 */ 433 public String getJar() { return get(JobContext.JAR); } 434 435 /** 436 * Set the user jar for the map-reduce job. 437 * 438 * @param jar the user jar for the map-reduce job. 439 */ 440 public void setJar(String jar) { set(JobContext.JAR, jar); } 441 442 /** 443 * Get the pattern for jar contents to unpack on the tasktracker 444 */ 445 public Pattern getJarUnpackPattern() { 446 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT); 447 } 448 449 450 /** 451 * Set the job's jar file by finding an example class location. 452 * 453 * @param cls the example class. 454 */ 455 public void setJarByClass(Class cls) { 456 String jar = findContainingJar(cls); 457 if (jar != null) { 458 setJar(jar); 459 } 460 } 461 462 public String[] getLocalDirs() throws IOException { 463 return getTrimmedStrings(MRConfig.LOCAL_DIR); 464 } 465 466 /** 467 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. 468 */ 469 @Deprecated 470 public void deleteLocalFiles() throws IOException { 471 String[] localDirs = getLocalDirs(); 472 for (int i = 0; i < localDirs.length; i++) { 473 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true); 474 } 475 } 476 477 public void deleteLocalFiles(String subdir) throws IOException { 478 String[] localDirs = getLocalDirs(); 479 for (int i = 0; i < localDirs.length; i++) { 480 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true); 481 } 482 } 483 484 /** 485 * Constructs a local file name. Files are distributed among configured 486 * local directories. 487 */ 488 public Path getLocalPath(String pathString) throws IOException { 489 return getLocalPath(MRConfig.LOCAL_DIR, pathString); 490 } 491 492 /** 493 * Get the reported username for this job. 494 * 495 * @return the username 496 */ 497 public String getUser() { 498 return get(JobContext.USER_NAME); 499 } 500 501 /** 502 * Set the reported username for this job. 503 * 504 * @param user the username for this job. 505 */ 506 public void setUser(String user) { 507 set(JobContext.USER_NAME, user); 508 } 509 510 511 512 /** 513 * Set whether the framework should keep the intermediate files for 514 * failed tasks. 515 * 516 * @param keep <code>true</code> if framework should keep the intermediate files 517 * for failed tasks, <code>false</code> otherwise. 518 * 519 */ 520 public void setKeepFailedTaskFiles(boolean keep) { 521 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep); 522 } 523 524 /** 525 * Should the temporary files for failed tasks be kept? 526 * 527 * @return should the files be kept? 528 */ 529 public boolean getKeepFailedTaskFiles() { 530 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false); 531 } 532 533 /** 534 * Set a regular expression for task names that should be kept. 535 * The regular expression ".*_m_000123_0" would keep the files 536 * for the first instance of map 123 that ran. 537 * 538 * @param pattern the java.util.regex.Pattern to match against the 539 * task names. 540 */ 541 public void setKeepTaskFilesPattern(String pattern) { 542 set(JobContext.PRESERVE_FILES_PATTERN, pattern); 543 } 544 545 /** 546 * Get the regular expression that is matched against the task names 547 * to see if we need to keep the files. 548 * 549 * @return the pattern as a string, if it was set, othewise null. 550 */ 551 public String getKeepTaskFilesPattern() { 552 return get(JobContext.PRESERVE_FILES_PATTERN); 553 } 554 555 /** 556 * Set the current working directory for the default file system. 557 * 558 * @param dir the new current working directory. 559 */ 560 public void setWorkingDirectory(Path dir) { 561 dir = new Path(getWorkingDirectory(), dir); 562 set(JobContext.WORKING_DIR, dir.toString()); 563 } 564 565 /** 566 * Get the current working directory for the default file system. 567 * 568 * @return the directory name. 569 */ 570 public Path getWorkingDirectory() { 571 String name = get(JobContext.WORKING_DIR); 572 if (name != null) { 573 return new Path(name); 574 } else { 575 try { 576 Path dir = FileSystem.get(this).getWorkingDirectory(); 577 set(JobContext.WORKING_DIR, dir.toString()); 578 return dir; 579 } catch (IOException e) { 580 throw new RuntimeException(e); 581 } 582 } 583 } 584 585 /** 586 * Sets the number of tasks that a spawned task JVM should run 587 * before it exits 588 * @param numTasks the number of tasks to execute; defaults to 1; 589 * -1 signifies no limit 590 */ 591 public void setNumTasksToExecutePerJvm(int numTasks) { 592 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks); 593 } 594 595 /** 596 * Get the number of tasks that a spawned JVM should execute 597 */ 598 public int getNumTasksToExecutePerJvm() { 599 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1); 600 } 601 602 /** 603 * Get the {@link InputFormat} implementation for the map-reduce job, 604 * defaults to {@link TextInputFormat} if not specified explicity. 605 * 606 * @return the {@link InputFormat} implementation for the map-reduce job. 607 */ 608 public InputFormat getInputFormat() { 609 return ReflectionUtils.newInstance(getClass("mapred.input.format.class", 610 TextInputFormat.class, 611 InputFormat.class), 612 this); 613 } 614 615 /** 616 * Set the {@link InputFormat} implementation for the map-reduce job. 617 * 618 * @param theClass the {@link InputFormat} implementation for the map-reduce 619 * job. 620 */ 621 public void setInputFormat(Class<? extends InputFormat> theClass) { 622 setClass("mapred.input.format.class", theClass, InputFormat.class); 623 } 624 625 /** 626 * Get the {@link OutputFormat} implementation for the map-reduce job, 627 * defaults to {@link TextOutputFormat} if not specified explicity. 628 * 629 * @return the {@link OutputFormat} implementation for the map-reduce job. 630 */ 631 public OutputFormat getOutputFormat() { 632 return ReflectionUtils.newInstance(getClass("mapred.output.format.class", 633 TextOutputFormat.class, 634 OutputFormat.class), 635 this); 636 } 637 638 /** 639 * Get the {@link OutputCommitter} implementation for the map-reduce job, 640 * defaults to {@link FileOutputCommitter} if not specified explicitly. 641 * 642 * @return the {@link OutputCommitter} implementation for the map-reduce job. 643 */ 644 public OutputCommitter getOutputCommitter() { 645 return (OutputCommitter)ReflectionUtils.newInstance( 646 getClass("mapred.output.committer.class", FileOutputCommitter.class, 647 OutputCommitter.class), this); 648 } 649 650 /** 651 * Set the {@link OutputCommitter} implementation for the map-reduce job. 652 * 653 * @param theClass the {@link OutputCommitter} implementation for the map-reduce 654 * job. 655 */ 656 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) { 657 setClass("mapred.output.committer.class", theClass, OutputCommitter.class); 658 } 659 660 /** 661 * Set the {@link OutputFormat} implementation for the map-reduce job. 662 * 663 * @param theClass the {@link OutputFormat} implementation for the map-reduce 664 * job. 665 */ 666 public void setOutputFormat(Class<? extends OutputFormat> theClass) { 667 setClass("mapred.output.format.class", theClass, OutputFormat.class); 668 } 669 670 /** 671 * Should the map outputs be compressed before transfer? 672 * Uses the SequenceFile compression. 673 * 674 * @param compress should the map outputs be compressed? 675 */ 676 public void setCompressMapOutput(boolean compress) { 677 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress); 678 } 679 680 /** 681 * Are the outputs of the maps be compressed? 682 * 683 * @return <code>true</code> if the outputs of the maps are to be compressed, 684 * <code>false</code> otherwise. 685 */ 686 public boolean getCompressMapOutput() { 687 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false); 688 } 689 690 /** 691 * Set the given class as the {@link CompressionCodec} for the map outputs. 692 * 693 * @param codecClass the {@link CompressionCodec} class that will compress 694 * the map outputs. 695 */ 696 public void 697 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { 698 setCompressMapOutput(true); 699 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 700 CompressionCodec.class); 701 } 702 703 /** 704 * Get the {@link CompressionCodec} for compressing the map outputs. 705 * 706 * @param defaultValue the {@link CompressionCodec} to return if not set 707 * @return the {@link CompressionCodec} class that should be used to compress the 708 * map outputs. 709 * @throws IllegalArgumentException if the class was specified, but not found 710 */ 711 public Class<? extends CompressionCodec> 712 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { 713 Class<? extends CompressionCodec> codecClass = defaultValue; 714 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC); 715 if (name != null) { 716 try { 717 codecClass = getClassByName(name).asSubclass(CompressionCodec.class); 718 } catch (ClassNotFoundException e) { 719 throw new IllegalArgumentException("Compression codec " + name + 720 " was not found.", e); 721 } 722 } 723 return codecClass; 724 } 725 726 /** 727 * Get the key class for the map output data. If it is not set, use the 728 * (final) output key class. This allows the map output key class to be 729 * different than the final output key class. 730 * 731 * @return the map output key class. 732 */ 733 public Class<?> getMapOutputKeyClass() { 734 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); 735 if (retv == null) { 736 retv = getOutputKeyClass(); 737 } 738 return retv; 739 } 740 741 /** 742 * Set the key class for the map output data. This allows the user to 743 * specify the map output key class to be different than the final output 744 * value class. 745 * 746 * @param theClass the map output key class. 747 */ 748 public void setMapOutputKeyClass(Class<?> theClass) { 749 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class); 750 } 751 752 /** 753 * Get the value class for the map output data. If it is not set, use the 754 * (final) output value class This allows the map output value class to be 755 * different than the final output value class. 756 * 757 * @return the map output value class. 758 */ 759 public Class<?> getMapOutputValueClass() { 760 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null, 761 Object.class); 762 if (retv == null) { 763 retv = getOutputValueClass(); 764 } 765 return retv; 766 } 767 768 /** 769 * Set the value class for the map output data. This allows the user to 770 * specify the map output value class to be different than the final output 771 * value class. 772 * 773 * @param theClass the map output value class. 774 */ 775 public void setMapOutputValueClass(Class<?> theClass) { 776 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class); 777 } 778 779 /** 780 * Get the key class for the job output data. 781 * 782 * @return the key class for the job output data. 783 */ 784 public Class<?> getOutputKeyClass() { 785 return getClass(JobContext.OUTPUT_KEY_CLASS, 786 LongWritable.class, Object.class); 787 } 788 789 /** 790 * Set the key class for the job output data. 791 * 792 * @param theClass the key class for the job output data. 793 */ 794 public void setOutputKeyClass(Class<?> theClass) { 795 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class); 796 } 797 798 /** 799 * Get the {@link RawComparator} comparator used to compare keys. 800 * 801 * @return the {@link RawComparator} comparator used to compare keys. 802 */ 803 public RawComparator getOutputKeyComparator() { 804 Class<? extends RawComparator> theClass = getClass( 805 JobContext.KEY_COMPARATOR, null, RawComparator.class); 806 if (theClass != null) 807 return ReflectionUtils.newInstance(theClass, this); 808 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); 809 } 810 811 /** 812 * Set the {@link RawComparator} comparator used to compare keys. 813 * 814 * @param theClass the {@link RawComparator} comparator used to 815 * compare keys. 816 * @see #setOutputValueGroupingComparator(Class) 817 */ 818 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) { 819 setClass(JobContext.KEY_COMPARATOR, 820 theClass, RawComparator.class); 821 } 822 823 /** 824 * Set the {@link KeyFieldBasedComparator} options used to compare keys. 825 * 826 * @param keySpec the key specification of the form -k pos1[,pos2], where, 827 * pos is of the form f[.c][opts], where f is the number 828 * of the key field to use, and c is the number of the first character from 829 * the beginning of the field. Fields and character posns are numbered 830 * starting with 1; a character position of zero in pos2 indicates the 831 * field's last character. If '.c' is omitted from pos1, it defaults to 1 832 * (the beginning of the field); if omitted from pos2, it defaults to 0 833 * (the end of the field). opts are ordering options. The supported options 834 * are: 835 * -n, (Sort numerically) 836 * -r, (Reverse the result of comparison) 837 */ 838 public void setKeyFieldComparatorOptions(String keySpec) { 839 setOutputKeyComparatorClass(KeyFieldBasedComparator.class); 840 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec); 841 } 842 843 /** 844 * Get the {@link KeyFieldBasedComparator} options 845 */ 846 public String getKeyFieldComparatorOption() { 847 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS); 848 } 849 850 /** 851 * Set the {@link KeyFieldBasedPartitioner} options used for 852 * {@link Partitioner} 853 * 854 * @param keySpec the key specification of the form -k pos1[,pos2], where, 855 * pos is of the form f[.c][opts], where f is the number 856 * of the key field to use, and c is the number of the first character from 857 * the beginning of the field. Fields and character posns are numbered 858 * starting with 1; a character position of zero in pos2 indicates the 859 * field's last character. If '.c' is omitted from pos1, it defaults to 1 860 * (the beginning of the field); if omitted from pos2, it defaults to 0 861 * (the end of the field). 862 */ 863 public void setKeyFieldPartitionerOptions(String keySpec) { 864 setPartitionerClass(KeyFieldBasedPartitioner.class); 865 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec); 866 } 867 868 /** 869 * Get the {@link KeyFieldBasedPartitioner} options 870 */ 871 public String getKeyFieldPartitionerOption() { 872 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); 873 } 874 875 /** 876 * Get the user defined {@link WritableComparable} comparator for 877 * grouping keys of inputs to the reduce. 878 * 879 * @return comparator set by the user for grouping values. 880 * @see #setOutputValueGroupingComparator(Class) for details. 881 */ 882 public RawComparator getOutputValueGroupingComparator() { 883 Class<? extends RawComparator> theClass = getClass( 884 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); 885 if (theClass == null) { 886 return getOutputKeyComparator(); 887 } 888 889 return ReflectionUtils.newInstance(theClass, this); 890 } 891 892 /** 893 * Set the user defined {@link RawComparator} comparator for 894 * grouping keys in the input to the reduce. 895 * 896 * <p>This comparator should be provided if the equivalence rules for keys 897 * for sorting the intermediates are different from those for grouping keys 898 * before each call to 899 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 900 * 901 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 902 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 903 * 904 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 905 * how keys are sorted, this can be used in conjunction to simulate 906 * <i>secondary sort on values</i>.</p> 907 * 908 * <p><i>Note</i>: This is not a guarantee of the reduce sort being 909 * <i>stable</i> in any sense. (In any case, with the order of available 910 * map-outputs to the reduce being non-deterministic, it wouldn't make 911 * that much sense.)</p> 912 * 913 * @param theClass the comparator class to be used for grouping keys. 914 * It should implement <code>RawComparator</code>. 915 * @see #setOutputKeyComparatorClass(Class) 916 */ 917 public void setOutputValueGroupingComparator( 918 Class<? extends RawComparator> theClass) { 919 setClass(JobContext.GROUP_COMPARATOR_CLASS, 920 theClass, RawComparator.class); 921 } 922 923 /** 924 * Should the framework use the new context-object code for running 925 * the mapper? 926 * @return true, if the new api should be used 927 */ 928 public boolean getUseNewMapper() { 929 return getBoolean("mapred.mapper.new-api", false); 930 } 931 /** 932 * Set whether the framework should use the new api for the mapper. 933 * This is the default for jobs submitted with the new Job api. 934 * @param flag true, if the new api should be used 935 */ 936 public void setUseNewMapper(boolean flag) { 937 setBoolean("mapred.mapper.new-api", flag); 938 } 939 940 /** 941 * Should the framework use the new context-object code for running 942 * the reducer? 943 * @return true, if the new api should be used 944 */ 945 public boolean getUseNewReducer() { 946 return getBoolean("mapred.reducer.new-api", false); 947 } 948 /** 949 * Set whether the framework should use the new api for the reducer. 950 * This is the default for jobs submitted with the new Job api. 951 * @param flag true, if the new api should be used 952 */ 953 public void setUseNewReducer(boolean flag) { 954 setBoolean("mapred.reducer.new-api", flag); 955 } 956 957 /** 958 * Get the value class for job outputs. 959 * 960 * @return the value class for job outputs. 961 */ 962 public Class<?> getOutputValueClass() { 963 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class); 964 } 965 966 /** 967 * Set the value class for job outputs. 968 * 969 * @param theClass the value class for job outputs. 970 */ 971 public void setOutputValueClass(Class<?> theClass) { 972 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class); 973 } 974 975 /** 976 * Get the {@link Mapper} class for the job. 977 * 978 * @return the {@link Mapper} class for the job. 979 */ 980 public Class<? extends Mapper> getMapperClass() { 981 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class); 982 } 983 984 /** 985 * Set the {@link Mapper} class for the job. 986 * 987 * @param theClass the {@link Mapper} class for the job. 988 */ 989 public void setMapperClass(Class<? extends Mapper> theClass) { 990 setClass("mapred.mapper.class", theClass, Mapper.class); 991 } 992 993 /** 994 * Get the {@link MapRunnable} class for the job. 995 * 996 * @return the {@link MapRunnable} class for the job. 997 */ 998 public Class<? extends MapRunnable> getMapRunnerClass() { 999 return getClass("mapred.map.runner.class", 1000 MapRunner.class, MapRunnable.class); 1001 } 1002 1003 /** 1004 * Expert: Set the {@link MapRunnable} class for the job. 1005 * 1006 * Typically used to exert greater control on {@link Mapper}s. 1007 * 1008 * @param theClass the {@link MapRunnable} class for the job. 1009 */ 1010 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) { 1011 setClass("mapred.map.runner.class", theClass, MapRunnable.class); 1012 } 1013 1014 /** 1015 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 1016 * to be sent to the {@link Reducer}s. 1017 * 1018 * @return the {@link Partitioner} used to partition map-outputs. 1019 */ 1020 public Class<? extends Partitioner> getPartitionerClass() { 1021 return getClass("mapred.partitioner.class", 1022 HashPartitioner.class, Partitioner.class); 1023 } 1024 1025 /** 1026 * Set the {@link Partitioner} class used to partition 1027 * {@link Mapper}-outputs to be sent to the {@link Reducer}s. 1028 * 1029 * @param theClass the {@link Partitioner} used to partition map-outputs. 1030 */ 1031 public void setPartitionerClass(Class<? extends Partitioner> theClass) { 1032 setClass("mapred.partitioner.class", theClass, Partitioner.class); 1033 } 1034 1035 /** 1036 * Get the {@link Reducer} class for the job. 1037 * 1038 * @return the {@link Reducer} class for the job. 1039 */ 1040 public Class<? extends Reducer> getReducerClass() { 1041 return getClass("mapred.reducer.class", 1042 IdentityReducer.class, Reducer.class); 1043 } 1044 1045 /** 1046 * Set the {@link Reducer} class for the job. 1047 * 1048 * @param theClass the {@link Reducer} class for the job. 1049 */ 1050 public void setReducerClass(Class<? extends Reducer> theClass) { 1051 setClass("mapred.reducer.class", theClass, Reducer.class); 1052 } 1053 1054 /** 1055 * Get the user-defined <i>combiner</i> class used to combine map-outputs 1056 * before being sent to the reducers. Typically the combiner is same as the 1057 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}. 1058 * 1059 * @return the user-defined combiner class used to combine map-outputs. 1060 */ 1061 public Class<? extends Reducer> getCombinerClass() { 1062 return getClass("mapred.combiner.class", null, Reducer.class); 1063 } 1064 1065 /** 1066 * Set the user-defined <i>combiner</i> class used to combine map-outputs 1067 * before being sent to the reducers. 1068 * 1069 * <p>The combiner is an application-specified aggregation operation, which 1070 * can help cut down the amount of data transferred between the 1071 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p> 1072 * 1073 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both 1074 * the mapper and reducer tasks. In general, the combiner is called as the 1075 * sort/merge result is written to disk. The combiner must: 1076 * <ul> 1077 * <li> be side-effect free</li> 1078 * <li> have the same input and output key types and the same input and 1079 * output value types</li> 1080 * </ul></p> 1081 * 1082 * <p>Typically the combiner is same as the <code>Reducer</code> for the 1083 * job i.e. {@link #setReducerClass(Class)}.</p> 1084 * 1085 * @param theClass the user-defined combiner class used to combine 1086 * map-outputs. 1087 */ 1088 public void setCombinerClass(Class<? extends Reducer> theClass) { 1089 setClass("mapred.combiner.class", theClass, Reducer.class); 1090 } 1091 1092 /** 1093 * Should speculative execution be used for this job? 1094 * Defaults to <code>true</code>. 1095 * 1096 * @return <code>true</code> if speculative execution be used for this job, 1097 * <code>false</code> otherwise. 1098 */ 1099 public boolean getSpeculativeExecution() { 1100 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution()); 1101 } 1102 1103 /** 1104 * Turn speculative execution on or off for this job. 1105 * 1106 * @param speculativeExecution <code>true</code> if speculative execution 1107 * should be turned on, else <code>false</code>. 1108 */ 1109 public void setSpeculativeExecution(boolean speculativeExecution) { 1110 setMapSpeculativeExecution(speculativeExecution); 1111 setReduceSpeculativeExecution(speculativeExecution); 1112 } 1113 1114 /** 1115 * Should speculative execution be used for this job for map tasks? 1116 * Defaults to <code>true</code>. 1117 * 1118 * @return <code>true</code> if speculative execution be 1119 * used for this job for map tasks, 1120 * <code>false</code> otherwise. 1121 */ 1122 public boolean getMapSpeculativeExecution() { 1123 return getBoolean(JobContext.MAP_SPECULATIVE, true); 1124 } 1125 1126 /** 1127 * Turn speculative execution on or off for this job for map tasks. 1128 * 1129 * @param speculativeExecution <code>true</code> if speculative execution 1130 * should be turned on for map tasks, 1131 * else <code>false</code>. 1132 */ 1133 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1134 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution); 1135 } 1136 1137 /** 1138 * Should speculative execution be used for this job for reduce tasks? 1139 * Defaults to <code>true</code>. 1140 * 1141 * @return <code>true</code> if speculative execution be used 1142 * for reduce tasks for this job, 1143 * <code>false</code> otherwise. 1144 */ 1145 public boolean getReduceSpeculativeExecution() { 1146 return getBoolean(JobContext.REDUCE_SPECULATIVE, true); 1147 } 1148 1149 /** 1150 * Turn speculative execution on or off for this job for reduce tasks. 1151 * 1152 * @param speculativeExecution <code>true</code> if speculative execution 1153 * should be turned on for reduce tasks, 1154 * else <code>false</code>. 1155 */ 1156 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1157 setBoolean(JobContext.REDUCE_SPECULATIVE, 1158 speculativeExecution); 1159 } 1160 1161 /** 1162 * Get configured the number of reduce tasks for this job. 1163 * Defaults to <code>1</code>. 1164 * 1165 * @return the number of reduce tasks for this job. 1166 */ 1167 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); } 1168 1169 /** 1170 * Set the number of map tasks for this job. 1171 * 1172 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 1173 * number of spawned map tasks depends on the number of {@link InputSplit}s 1174 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. 1175 * 1176 * A custom {@link InputFormat} is typically used to accurately control 1177 * the number of map tasks for the job.</p> 1178 * 1179 * <h4 id="NoOfMaps">How many maps?</h4> 1180 * 1181 * <p>The number of maps is usually driven by the total size of the inputs 1182 * i.e. total number of blocks of the input files.</p> 1183 * 1184 * <p>The right level of parallelism for maps seems to be around 10-100 maps 1185 * per-node, although it has been set up to 300 or so for very cpu-light map 1186 * tasks. Task setup takes awhile, so it is best if the maps take at least a 1187 * minute to execute.</p> 1188 * 1189 * <p>The default behavior of file-based {@link InputFormat}s is to split the 1190 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 1191 * bytes, of input files. However, the {@link FileSystem} blocksize of the 1192 * input files is treated as an upper bound for input splits. A lower bound 1193 * on the split size can be set via 1194 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize"> 1195 * mapreduce.input.fileinputformat.split.minsize</a>.</p> 1196 * 1197 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 1198 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 1199 * used to set it even higher.</p> 1200 * 1201 * @param n the number of map tasks for this job. 1202 * @see InputFormat#getSplits(JobConf, int) 1203 * @see FileInputFormat 1204 * @see FileSystem#getDefaultBlockSize() 1205 * @see FileStatus#getBlockSize() 1206 */ 1207 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); } 1208 1209 /** 1210 * Get configured the number of reduce tasks for this job. Defaults to 1211 * <code>1</code>. 1212 * 1213 * @return the number of reduce tasks for this job. 1214 */ 1215 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); } 1216 1217 /** 1218 * Set the requisite number of reduce tasks for this job. 1219 * 1220 * <h4 id="NoOfReduces">How many reduces?</h4> 1221 * 1222 * <p>The right number of reduces seems to be <code>0.95</code> or 1223 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> * 1224 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum"> 1225 * mapreduce.tasktracker.reduce.tasks.maximum</a>). 1226 * </p> 1227 * 1228 * <p>With <code>0.95</code> all of the reduces can launch immediately and 1229 * start transfering map outputs as the maps finish. With <code>1.75</code> 1230 * the faster nodes will finish their first round of reduces and launch a 1231 * second wave of reduces doing a much better job of load balancing.</p> 1232 * 1233 * <p>Increasing the number of reduces increases the framework overhead, but 1234 * increases load balancing and lowers the cost of failures.</p> 1235 * 1236 * <p>The scaling factors above are slightly less than whole numbers to 1237 * reserve a few reduce slots in the framework for speculative-tasks, failures 1238 * etc.</p> 1239 * 1240 * <h4 id="ReducerNone">Reducer NONE</h4> 1241 * 1242 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p> 1243 * 1244 * <p>In this case the output of the map-tasks directly go to distributed 1245 * file-system, to the path set by 1246 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 1247 * framework doesn't sort the map-outputs before writing it out to HDFS.</p> 1248 * 1249 * @param n the number of reduce tasks for this job. 1250 */ 1251 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } 1252 1253 /** 1254 * Get the configured number of maximum attempts that will be made to run a 1255 * map task, as specified by the <code>mapreduce.map.maxattempts</code> 1256 * property. If this property is not already set, the default is 4 attempts. 1257 * 1258 * @return the max number of attempts per map task. 1259 */ 1260 public int getMaxMapAttempts() { 1261 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4); 1262 } 1263 1264 /** 1265 * Expert: Set the number of maximum attempts that will be made to run a 1266 * map task. 1267 * 1268 * @param n the number of attempts per map task. 1269 */ 1270 public void setMaxMapAttempts(int n) { 1271 setInt(JobContext.MAP_MAX_ATTEMPTS, n); 1272 } 1273 1274 /** 1275 * Get the configured number of maximum attempts that will be made to run a 1276 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code> 1277 * property. If this property is not already set, the default is 4 attempts. 1278 * 1279 * @return the max number of attempts per reduce task. 1280 */ 1281 public int getMaxReduceAttempts() { 1282 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4); 1283 } 1284 /** 1285 * Expert: Set the number of maximum attempts that will be made to run a 1286 * reduce task. 1287 * 1288 * @param n the number of attempts per reduce task. 1289 */ 1290 public void setMaxReduceAttempts(int n) { 1291 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n); 1292 } 1293 1294 /** 1295 * Get the user-specified job name. This is only used to identify the 1296 * job to the user. 1297 * 1298 * @return the job's name, defaulting to "". 1299 */ 1300 public String getJobName() { 1301 return get(JobContext.JOB_NAME, ""); 1302 } 1303 1304 /** 1305 * Set the user-specified job name. 1306 * 1307 * @param name the job's new name. 1308 */ 1309 public void setJobName(String name) { 1310 set(JobContext.JOB_NAME, name); 1311 } 1312 1313 /** 1314 * Get the user-specified session identifier. The default is the empty string. 1315 * 1316 * The session identifier is used to tag metric data that is reported to some 1317 * performance metrics system via the org.apache.hadoop.metrics API. The 1318 * session identifier is intended, in particular, for use by Hadoop-On-Demand 1319 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 1320 * HOD will set the session identifier by modifying the mapred-site.xml file 1321 * before starting the cluster. 1322 * 1323 * When not running under HOD, this identifer is expected to remain set to 1324 * the empty string. 1325 * 1326 * @return the session identifier, defaulting to "". 1327 */ 1328 @Deprecated 1329 public String getSessionId() { 1330 return get("session.id", ""); 1331 } 1332 1333 /** 1334 * Set the user-specified session identifier. 1335 * 1336 * @param sessionId the new session id. 1337 */ 1338 @Deprecated 1339 public void setSessionId(String sessionId) { 1340 set("session.id", sessionId); 1341 } 1342 1343 /** 1344 * Set the maximum no. of failures of a given job per tasktracker. 1345 * If the no. of task failures exceeds <code>noFailures</code>, the 1346 * tasktracker is <i>blacklisted</i> for this job. 1347 * 1348 * @param noFailures maximum no. of failures of a given job per tasktracker. 1349 */ 1350 public void setMaxTaskFailuresPerTracker(int noFailures) { 1351 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures); 1352 } 1353 1354 /** 1355 * Expert: Get the maximum no. of failures of a given job per tasktracker. 1356 * If the no. of task failures exceeds this, the tasktracker is 1357 * <i>blacklisted</i> for this job. 1358 * 1359 * @return the maximum no. of failures of a given job per tasktracker. 1360 */ 1361 public int getMaxTaskFailuresPerTracker() { 1362 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3); 1363 } 1364 1365 /** 1366 * Get the maximum percentage of map tasks that can fail without 1367 * the job being aborted. 1368 * 1369 * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 1370 * attempts before being declared as <i>failed</i>. 1371 * 1372 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in 1373 * the job being declared as {@link JobStatus#FAILED}. 1374 * 1375 * @return the maximum percentage of map tasks that can fail without 1376 * the job being aborted. 1377 */ 1378 public int getMaxMapTaskFailuresPercent() { 1379 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0); 1380 } 1381 1382 /** 1383 * Expert: Set the maximum percentage of map tasks that can fail without the 1384 * job being aborted. 1385 * 1386 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 1387 * before being declared as <i>failed</i>. 1388 * 1389 * @param percent the maximum percentage of map tasks that can fail without 1390 * the job being aborted. 1391 */ 1392 public void setMaxMapTaskFailuresPercent(int percent) { 1393 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent); 1394 } 1395 1396 /** 1397 * Get the maximum percentage of reduce tasks that can fail without 1398 * the job being aborted. 1399 * 1400 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1401 * attempts before being declared as <i>failed</i>. 1402 * 1403 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 1404 * in the job being declared as {@link JobStatus#FAILED}. 1405 * 1406 * @return the maximum percentage of reduce tasks that can fail without 1407 * the job being aborted. 1408 */ 1409 public int getMaxReduceTaskFailuresPercent() { 1410 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0); 1411 } 1412 1413 /** 1414 * Set the maximum percentage of reduce tasks that can fail without the job 1415 * being aborted. 1416 * 1417 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1418 * attempts before being declared as <i>failed</i>. 1419 * 1420 * @param percent the maximum percentage of reduce tasks that can fail without 1421 * the job being aborted. 1422 */ 1423 public void setMaxReduceTaskFailuresPercent(int percent) { 1424 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent); 1425 } 1426 1427 /** 1428 * Set {@link JobPriority} for this job. 1429 * 1430 * @param prio the {@link JobPriority} for this job. 1431 */ 1432 public void setJobPriority(JobPriority prio) { 1433 set(JobContext.PRIORITY, prio.toString()); 1434 } 1435 1436 /** 1437 * Get the {@link JobPriority} for this job. 1438 * 1439 * @return the {@link JobPriority} for this job. 1440 */ 1441 public JobPriority getJobPriority() { 1442 String prio = get(JobContext.PRIORITY); 1443 if(prio == null) { 1444 return JobPriority.NORMAL; 1445 } 1446 1447 return JobPriority.valueOf(prio); 1448 } 1449 1450 /** 1451 * Set JobSubmitHostName for this job. 1452 * 1453 * @param hostname the JobSubmitHostName for this job. 1454 */ 1455 void setJobSubmitHostName(String hostname) { 1456 set(MRJobConfig.JOB_SUBMITHOST, hostname); 1457 } 1458 1459 /** 1460 * Get the JobSubmitHostName for this job. 1461 * 1462 * @return the JobSubmitHostName for this job. 1463 */ 1464 String getJobSubmitHostName() { 1465 String hostname = get(MRJobConfig.JOB_SUBMITHOST); 1466 1467 return hostname; 1468 } 1469 1470 /** 1471 * Set JobSubmitHostAddress for this job. 1472 * 1473 * @param hostadd the JobSubmitHostAddress for this job. 1474 */ 1475 void setJobSubmitHostAddress(String hostadd) { 1476 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd); 1477 } 1478 1479 /** 1480 * Get JobSubmitHostAddress for this job. 1481 * 1482 * @return JobSubmitHostAddress for this job. 1483 */ 1484 String getJobSubmitHostAddress() { 1485 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR); 1486 1487 return hostadd; 1488 } 1489 1490 /** 1491 * Get whether the task profiling is enabled. 1492 * @return true if some tasks will be profiled 1493 */ 1494 public boolean getProfileEnabled() { 1495 return getBoolean(JobContext.TASK_PROFILE, false); 1496 } 1497 1498 /** 1499 * Set whether the system should collect profiler information for some of 1500 * the tasks in this job? The information is stored in the user log 1501 * directory. 1502 * @param newValue true means it should be gathered 1503 */ 1504 public void setProfileEnabled(boolean newValue) { 1505 setBoolean(JobContext.TASK_PROFILE, newValue); 1506 } 1507 1508 /** 1509 * Get the profiler configuration arguments. 1510 * 1511 * The default value for this property is 1512 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s" 1513 * 1514 * @return the parameters to pass to the task child to configure profiling 1515 */ 1516 public String getProfileParams() { 1517 return get(JobContext.TASK_PROFILE_PARAMS, 1518 "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," + 1519 "verbose=n,file=%s"); 1520 } 1521 1522 /** 1523 * Set the profiler configuration arguments. If the string contains a '%s' it 1524 * will be replaced with the name of the profiling output file when the task 1525 * runs. 1526 * 1527 * This value is passed to the task child JVM on the command line. 1528 * 1529 * @param value the configuration string 1530 */ 1531 public void setProfileParams(String value) { 1532 set(JobContext.TASK_PROFILE_PARAMS, value); 1533 } 1534 1535 /** 1536 * Get the range of maps or reduces to profile. 1537 * @param isMap is the task a map? 1538 * @return the task ranges 1539 */ 1540 public IntegerRanges getProfileTaskRange(boolean isMap) { 1541 return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 1542 JobContext.NUM_REDUCE_PROFILES), "0-2"); 1543 } 1544 1545 /** 1546 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1547 * must also be called. 1548 * @param newValue a set of integer ranges of the map ids 1549 */ 1550 public void setProfileTaskRange(boolean isMap, String newValue) { 1551 // parse the value to make sure it is legal 1552 new Configuration.IntegerRanges(newValue); 1553 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 1554 newValue); 1555 } 1556 1557 /** 1558 * Set the debug script to run when the map tasks fail. 1559 * 1560 * <p>The debug script can aid debugging of failed map tasks. The script is 1561 * given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1562 * 1563 * <p>The debug command, run on the node where the map failed, is:</p> 1564 * <p><pre><blockquote> 1565 * $script $stdout $stderr $syslog $jobconf. 1566 * </blockquote></pre></p> 1567 * 1568 * <p> The script file is distributed through {@link DistributedCache} 1569 * APIs. The script needs to be symlinked. </p> 1570 * 1571 * <p>Here is an example on how to submit a script 1572 * <p><blockquote><pre> 1573 * job.setMapDebugScript("./myscript"); 1574 * DistributedCache.createSymlink(job); 1575 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1576 * </pre></blockquote></p> 1577 * 1578 * @param mDbgScript the script name 1579 */ 1580 public void setMapDebugScript(String mDbgScript) { 1581 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript); 1582 } 1583 1584 /** 1585 * Get the map task's debug script. 1586 * 1587 * @return the debug Script for the mapred job for failed map tasks. 1588 * @see #setMapDebugScript(String) 1589 */ 1590 public String getMapDebugScript() { 1591 return get(JobContext.MAP_DEBUG_SCRIPT); 1592 } 1593 1594 /** 1595 * Set the debug script to run when the reduce tasks fail. 1596 * 1597 * <p>The debug script can aid debugging of failed reduce tasks. The script 1598 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1599 * 1600 * <p>The debug command, run on the node where the map failed, is:</p> 1601 * <p><pre><blockquote> 1602 * $script $stdout $stderr $syslog $jobconf. 1603 * </blockquote></pre></p> 1604 * 1605 * <p> The script file is distributed through {@link DistributedCache} 1606 * APIs. The script file needs to be symlinked </p> 1607 * 1608 * <p>Here is an example on how to submit a script 1609 * <p><blockquote><pre> 1610 * job.setReduceDebugScript("./myscript"); 1611 * DistributedCache.createSymlink(job); 1612 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1613 * </pre></blockquote></p> 1614 * 1615 * @param rDbgScript the script name 1616 */ 1617 public void setReduceDebugScript(String rDbgScript) { 1618 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript); 1619 } 1620 1621 /** 1622 * Get the reduce task's debug Script 1623 * 1624 * @return the debug script for the mapred job for failed reduce tasks. 1625 * @see #setReduceDebugScript(String) 1626 */ 1627 public String getReduceDebugScript() { 1628 return get(JobContext.REDUCE_DEBUG_SCRIPT); 1629 } 1630 1631 /** 1632 * Get the uri to be invoked in-order to send a notification after the job 1633 * has completed (success/failure). 1634 * 1635 * @return the job end notification uri, <code>null</code> if it hasn't 1636 * been set. 1637 * @see #setJobEndNotificationURI(String) 1638 */ 1639 public String getJobEndNotificationURI() { 1640 return get(JobContext.MR_JOB_END_NOTIFICATION_URL); 1641 } 1642 1643 /** 1644 * Set the uri to be invoked in-order to send a notification after the job 1645 * has completed (success/failure). 1646 * 1647 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 1648 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 1649 * identifier and completion-status respectively.</p> 1650 * 1651 * <p>This is typically used by application-writers to implement chaining of 1652 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p> 1653 * 1654 * @param uri the job end notification uri 1655 * @see JobStatus 1656 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html# 1657 * JobCompletionAndChaining">Job Completion and Chaining</a> 1658 */ 1659 public void setJobEndNotificationURI(String uri) { 1660 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); 1661 } 1662 1663 /** 1664 * Get job-specific shared directory for use as scratch space 1665 * 1666 * <p> 1667 * When a job starts, a shared directory is created at location 1668 * <code> 1669 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>. 1670 * This directory is exposed to the users through 1671 * <code>mapreduce.job.local.dir </code>. 1672 * So, the tasks can use this space 1673 * as scratch space and share files among them. </p> 1674 * This value is available as System property also. 1675 * 1676 * @return The localized job specific shared directory 1677 */ 1678 public String getJobLocalDir() { 1679 return get(JobContext.JOB_LOCAL_DIR); 1680 } 1681 1682 /** 1683 * Get memory required to run a map task of the job, in MB. 1684 * 1685 * If a value is specified in the configuration, it is returned. 1686 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}. 1687 * <p/> 1688 * For backward compatibility, if the job configuration sets the 1689 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1690 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1691 * after converting it from bytes to MB. 1692 * @return memory required to run a map task of the job, in MB, 1693 * or {@link #DISABLED_MEMORY_LIMIT} if unset. 1694 */ 1695 public long getMemoryForMapTask() { 1696 long value = getDeprecatedMemoryValue(); 1697 if (value == DISABLED_MEMORY_LIMIT) { 1698 value = normalizeMemoryConfigValue( 1699 getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, 1700 DISABLED_MEMORY_LIMIT)); 1701 } 1702 return value; 1703 } 1704 1705 public void setMemoryForMapTask(long mem) { 1706 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1707 } 1708 1709 /** 1710 * Get memory required to run a reduce task of the job, in MB. 1711 * 1712 * If a value is specified in the configuration, it is returned. 1713 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}. 1714 * <p/> 1715 * For backward compatibility, if the job configuration sets the 1716 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1717 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1718 * after converting it from bytes to MB. 1719 * @return memory required to run a reduce task of the job, in MB, 1720 * or {@link #DISABLED_MEMORY_LIMIT} if unset. 1721 */ 1722 public long getMemoryForReduceTask() { 1723 long value = getDeprecatedMemoryValue(); 1724 if (value == DISABLED_MEMORY_LIMIT) { 1725 value = normalizeMemoryConfigValue( 1726 getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, 1727 DISABLED_MEMORY_LIMIT)); 1728 } 1729 return value; 1730 } 1731 1732 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY, 1733 // converted into MBs. 1734 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative 1735 // value. 1736 private long getDeprecatedMemoryValue() { 1737 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1738 DISABLED_MEMORY_LIMIT); 1739 oldValue = normalizeMemoryConfigValue(oldValue); 1740 if (oldValue != DISABLED_MEMORY_LIMIT) { 1741 oldValue /= (1024*1024); 1742 } 1743 return oldValue; 1744 } 1745 1746 public void setMemoryForReduceTask(long mem) { 1747 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1748 } 1749 1750 /** 1751 * Return the name of the queue to which this job is submitted. 1752 * Defaults to 'default'. 1753 * 1754 * @return name of the queue 1755 */ 1756 public String getQueueName() { 1757 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME); 1758 } 1759 1760 /** 1761 * Set the name of the queue to which this job should be submitted. 1762 * 1763 * @param queueName Name of the queue 1764 */ 1765 public void setQueueName(String queueName) { 1766 set(JobContext.QUEUE_NAME, queueName); 1767 } 1768 1769 /** 1770 * Normalize the negative values in configuration 1771 * 1772 * @param val 1773 * @return normalized value 1774 */ 1775 public static long normalizeMemoryConfigValue(long val) { 1776 if (val < 0) { 1777 val = DISABLED_MEMORY_LIMIT; 1778 } 1779 return val; 1780 } 1781 1782 /** 1783 * Compute the number of slots required to run a single map task-attempt 1784 * of this job. 1785 * @param slotSizePerMap cluster-wide value of the amount of memory required 1786 * to run a map-task 1787 * @return the number of slots required to run a single map task-attempt 1788 * 1 if memory parameters are disabled. 1789 */ 1790 int computeNumSlotsPerMap(long slotSizePerMap) { 1791 if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) || 1792 (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) { 1793 return 1; 1794 } 1795 return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap)); 1796 } 1797 1798 /** 1799 * Compute the number of slots required to run a single reduce task-attempt 1800 * of this job. 1801 * @param slotSizePerReduce cluster-wide value of the amount of memory 1802 * required to run a reduce-task 1803 * @return the number of slots required to run a single reduce task-attempt 1804 * 1 if memory parameters are disabled 1805 */ 1806 int computeNumSlotsPerReduce(long slotSizePerReduce) { 1807 if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) || 1808 (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) { 1809 return 1; 1810 } 1811 return 1812 (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce)); 1813 } 1814 1815 /** 1816 * Find a jar that contains a class of the same name, if any. 1817 * It will return a jar file, even if that is not the first thing 1818 * on the class path that has a class with the same name. 1819 * 1820 * @param my_class the class to find. 1821 * @return a jar file that contains the class, or null. 1822 * @throws IOException 1823 */ 1824 public static String findContainingJar(Class my_class) { 1825 ClassLoader loader = my_class.getClassLoader(); 1826 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; 1827 try { 1828 for(Enumeration itr = loader.getResources(class_file); 1829 itr.hasMoreElements();) { 1830 URL url = (URL) itr.nextElement(); 1831 if ("jar".equals(url.getProtocol())) { 1832 String toReturn = url.getPath(); 1833 if (toReturn.startsWith("file:")) { 1834 toReturn = toReturn.substring("file:".length()); 1835 } 1836 // URLDecoder is a misnamed class, since it actually decodes 1837 // x-www-form-urlencoded MIME type rather than actual 1838 // URL encoding (which the file path has). Therefore it would 1839 // decode +s to ' 's which is incorrect (spaces are actually 1840 // either unencoded or encoded as "%20"). Replace +s first, so 1841 // that they are kept sacred during the decoding process. 1842 toReturn = toReturn.replaceAll("\\+", "%2B"); 1843 toReturn = URLDecoder.decode(toReturn, "UTF-8"); 1844 return toReturn.replaceAll("!.*$", ""); 1845 } 1846 } 1847 } catch (IOException e) { 1848 throw new RuntimeException(e); 1849 } 1850 return null; 1851 } 1852 1853 1854 /** 1855 * Get the memory required to run a task of this job, in bytes. See 1856 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1857 * <p/> 1858 * This method is deprecated. Now, different memory limits can be 1859 * set for map and reduce tasks of a job, in MB. 1860 * <p/> 1861 * For backward compatibility, if the job configuration sets the 1862 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1863 * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 1864 * Otherwise, this method will return the larger of the values returned by 1865 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} 1866 * after converting them into bytes. 1867 * 1868 * @return Memory required to run a task of this job, in bytes, 1869 * or {@link #DISABLED_MEMORY_LIMIT}, if unset. 1870 * @see #setMaxVirtualMemoryForTask(long) 1871 * @deprecated Use {@link #getMemoryForMapTask()} and 1872 * {@link #getMemoryForReduceTask()} 1873 */ 1874 @Deprecated 1875 public long getMaxVirtualMemoryForTask() { 1876 LOG.warn( 1877 "getMaxVirtualMemoryForTask() is deprecated. " + 1878 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()"); 1879 1880 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT); 1881 value = normalizeMemoryConfigValue(value); 1882 if (value == DISABLED_MEMORY_LIMIT) { 1883 value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask()); 1884 value = normalizeMemoryConfigValue(value); 1885 if (value != DISABLED_MEMORY_LIMIT) { 1886 value *= 1024*1024; 1887 } 1888 } 1889 return value; 1890 } 1891 1892 /** 1893 * Set the maximum amount of memory any task of this job can use. See 1894 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1895 * <p/> 1896 * mapred.task.maxvmem is split into 1897 * mapreduce.map.memory.mb 1898 * and mapreduce.map.memory.mb,mapred 1899 * each of the new key are set 1900 * as mapred.task.maxvmem / 1024 1901 * as new values are in MB 1902 * 1903 * @param vmem Maximum amount of virtual memory in bytes any task of this job 1904 * can use. 1905 * @see #getMaxVirtualMemoryForTask() 1906 * @deprecated 1907 * Use {@link #setMemoryForMapTask(long mem)} and 1908 * Use {@link #setMemoryForReduceTask(long mem)} 1909 */ 1910 @Deprecated 1911 public void setMaxVirtualMemoryForTask(long vmem) { 1912 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+ 1913 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()"); 1914 if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) { 1915 setMemoryForMapTask(DISABLED_MEMORY_LIMIT); 1916 setMemoryForReduceTask(DISABLED_MEMORY_LIMIT); 1917 } 1918 1919 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) { 1920 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb 1921 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb 1922 }else{ 1923 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem); 1924 } 1925 } 1926 1927 /** 1928 * @deprecated this variable is deprecated and nolonger in use. 1929 */ 1930 @Deprecated 1931 public long getMaxPhysicalMemoryForTask() { 1932 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated." 1933 + " Refer to the APIs getMemoryForMapTask() and" 1934 + " getMemoryForReduceTask() for details."); 1935 return -1; 1936 } 1937 1938 /* 1939 * @deprecated this 1940 */ 1941 @Deprecated 1942 public void setMaxPhysicalMemoryForTask(long mem) { 1943 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated." 1944 + " The value set is ignored. Refer to " 1945 + " setMemoryForMapTask() and setMemoryForReduceTask() for details."); 1946 } 1947 1948 static String deprecatedString(String key) { 1949 return "The variable " + key + " is no longer used."; 1950 } 1951 1952 private void checkAndWarnDeprecation() { 1953 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) { 1954 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) 1955 + " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY 1956 + " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY); 1957 } 1958 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) { 1959 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT)); 1960 } 1961 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) { 1962 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT)); 1963 } 1964 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) { 1965 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT)); 1966 } 1967 } 1968 1969 1970 } 1971