001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.mapred; 019 020 import java.io.FileNotFoundException; 021 import java.io.IOException; 022 import java.net.InetSocketAddress; 023 import java.net.URL; 024 import java.security.PrivilegedExceptionAction; 025 import java.util.ArrayList; 026 import java.util.Collection; 027 import java.util.List; 028 029 import org.apache.hadoop.classification.InterfaceAudience; 030 import org.apache.hadoop.classification.InterfaceStability; 031 import org.apache.hadoop.conf.Configuration; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.fs.Path; 034 import org.apache.hadoop.io.Text; 035 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo; 036 import org.apache.hadoop.mapreduce.Cluster; 037 import org.apache.hadoop.mapreduce.ClusterMetrics; 038 import org.apache.hadoop.mapreduce.Job; 039 import org.apache.hadoop.mapreduce.QueueInfo; 040 import org.apache.hadoop.mapreduce.TaskTrackerInfo; 041 import org.apache.hadoop.mapreduce.TaskType; 042 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 043 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 044 import org.apache.hadoop.mapreduce.tools.CLI; 045 import org.apache.hadoop.mapreduce.util.ConfigUtil; 046 import org.apache.hadoop.security.UserGroupInformation; 047 import org.apache.hadoop.security.token.SecretManager.InvalidToken; 048 import org.apache.hadoop.security.token.Token; 049 import org.apache.hadoop.security.token.TokenRenewer; 050 import org.apache.hadoop.util.Tool; 051 import org.apache.hadoop.util.ToolRunner; 052 053 /** 054 * <code>JobClient</code> is the primary interface for the user-job to interact 055 * with the cluster. 056 * 057 * <code>JobClient</code> provides facilities to submit jobs, track their 058 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster 059 * status information etc. 060 * 061 * <p>The job submission process involves: 062 * <ol> 063 * <li> 064 * Checking the input and output specifications of the job. 065 * </li> 066 * <li> 067 * Computing the {@link InputSplit}s for the job. 068 * </li> 069 * <li> 070 * Setup the requisite accounting information for the {@link DistributedCache} 071 * of the job, if necessary. 072 * </li> 073 * <li> 074 * Copying the job's jar and configuration to the map-reduce system directory 075 * on the distributed file-system. 076 * </li> 077 * <li> 078 * Submitting the job to the cluster and optionally monitoring 079 * it's status. 080 * </li> 081 * </ol></p> 082 * 083 * Normally the user creates the application, describes various facets of the 084 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 085 * the job and monitor its progress. 086 * 087 * <p>Here is an example on how to use <code>JobClient</code>:</p> 088 * <p><blockquote><pre> 089 * // Create a new JobConf 090 * JobConf job = new JobConf(new Configuration(), MyJob.class); 091 * 092 * // Specify various job-specific parameters 093 * job.setJobName("myjob"); 094 * 095 * job.setInputPath(new Path("in")); 096 * job.setOutputPath(new Path("out")); 097 * 098 * job.setMapperClass(MyJob.MyMapper.class); 099 * job.setReducerClass(MyJob.MyReducer.class); 100 * 101 * // Submit the job, then poll for progress until the job is complete 102 * JobClient.runJob(job); 103 * </pre></blockquote></p> 104 * 105 * <h4 id="JobControl">Job Control</h4> 106 * 107 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 108 * which cannot be done via a single map-reduce job. This is fairly easy since 109 * the output of the job, typically, goes to distributed file-system and that 110 * can be used as the input for the next job.</p> 111 * 112 * <p>However, this also means that the onus on ensuring jobs are complete 113 * (success/failure) lies squarely on the clients. In such situations the 114 * various job-control options are: 115 * <ol> 116 * <li> 117 * {@link #runJob(JobConf)} : submits the job and returns only after 118 * the job has completed. 119 * </li> 120 * <li> 121 * {@link #submitJob(JobConf)} : only submits the job, then poll the 122 * returned handle to the {@link RunningJob} to query status and make 123 * scheduling decisions. 124 * </li> 125 * <li> 126 * {@link JobConf#setJobEndNotificationURI(String)} : setup a notification 127 * on job-completion, thus avoiding polling. 128 * </li> 129 * </ol></p> 130 * 131 * @see JobConf 132 * @see ClusterStatus 133 * @see Tool 134 * @see DistributedCache 135 */ 136 @InterfaceAudience.Public 137 @InterfaceStability.Stable 138 public class JobClient extends CLI { 139 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 140 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 141 /* notes that get delegation token was called. Again this is hack for oozie 142 * to make sure we add history server delegation tokens to the credentials 143 * for the job. Since the api only allows one delegation token to be returned, 144 * we have to add this hack. 145 */ 146 private boolean getDelegationTokenCalled = false; 147 /* do we need a HS delegation token for this client */ 148 static final String HS_DELEGATION_TOKEN_REQUIRED 149 = "mapreduce.history.server.delegationtoken.required"; 150 151 static{ 152 ConfigUtil.loadResources(); 153 } 154 155 /** 156 * A NetworkedJob is an implementation of RunningJob. It holds 157 * a JobProfile object to provide some info, and interacts with the 158 * remote service to provide certain functionality. 159 */ 160 static class NetworkedJob implements RunningJob { 161 Job job; 162 /** 163 * We store a JobProfile and a timestamp for when we last 164 * acquired the job profile. If the job is null, then we cannot 165 * perform any of the tasks. The job might be null if the cluster 166 * has completely forgotten about the job. (eg, 24 hours after the 167 * job completes.) 168 */ 169 public NetworkedJob(JobStatus status, Cluster cluster) throws IOException { 170 job = Job.getInstance(cluster, status, new JobConf(status.getJobFile())); 171 } 172 173 public NetworkedJob(Job job) throws IOException { 174 this.job = job; 175 } 176 177 public Configuration getConfiguration() { 178 return job.getConfiguration(); 179 } 180 181 /** 182 * An identifier for the job 183 */ 184 public JobID getID() { 185 return JobID.downgrade(job.getJobID()); 186 } 187 188 /** @deprecated This method is deprecated and will be removed. Applications should 189 * rather use {@link #getID()}.*/ 190 @Deprecated 191 public String getJobID() { 192 return getID().toString(); 193 } 194 195 /** 196 * The user-specified job name 197 */ 198 public String getJobName() { 199 return job.getJobName(); 200 } 201 202 /** 203 * The name of the job file 204 */ 205 public String getJobFile() { 206 return job.getJobFile(); 207 } 208 209 /** 210 * A URL where the job's status can be seen 211 */ 212 public String getTrackingURL() { 213 return job.getTrackingURL(); 214 } 215 216 /** 217 * A float between 0.0 and 1.0, indicating the % of map work 218 * completed. 219 */ 220 public float mapProgress() throws IOException { 221 try { 222 return job.mapProgress(); 223 } catch (InterruptedException ie) { 224 throw new IOException(ie); 225 } 226 } 227 228 /** 229 * A float between 0.0 and 1.0, indicating the % of reduce work 230 * completed. 231 */ 232 public float reduceProgress() throws IOException { 233 try { 234 return job.reduceProgress(); 235 } catch (InterruptedException ie) { 236 throw new IOException(ie); 237 } 238 } 239 240 /** 241 * A float between 0.0 and 1.0, indicating the % of cleanup work 242 * completed. 243 */ 244 public float cleanupProgress() throws IOException { 245 try { 246 return job.cleanupProgress(); 247 } catch (InterruptedException ie) { 248 throw new IOException(ie); 249 } 250 } 251 252 /** 253 * A float between 0.0 and 1.0, indicating the % of setup work 254 * completed. 255 */ 256 public float setupProgress() throws IOException { 257 try { 258 return job.setupProgress(); 259 } catch (InterruptedException ie) { 260 throw new IOException(ie); 261 } 262 } 263 264 /** 265 * Returns immediately whether the whole job is done yet or not. 266 */ 267 public synchronized boolean isComplete() throws IOException { 268 try { 269 return job.isComplete(); 270 } catch (InterruptedException ie) { 271 throw new IOException(ie); 272 } 273 } 274 275 /** 276 * True iff job completed successfully. 277 */ 278 public synchronized boolean isSuccessful() throws IOException { 279 try { 280 return job.isSuccessful(); 281 } catch (InterruptedException ie) { 282 throw new IOException(ie); 283 } 284 } 285 286 /** 287 * Blocks until the job is finished 288 */ 289 public void waitForCompletion() throws IOException { 290 try { 291 job.waitForCompletion(false); 292 } catch (InterruptedException ie) { 293 throw new IOException(ie); 294 } catch (ClassNotFoundException ce) { 295 throw new IOException(ce); 296 } 297 } 298 299 /** 300 * Tells the service to get the state of the current job. 301 */ 302 public synchronized int getJobState() throws IOException { 303 try { 304 return job.getJobState().getValue(); 305 } catch (InterruptedException ie) { 306 throw new IOException(ie); 307 } 308 } 309 310 /** 311 * Tells the service to terminate the current job. 312 */ 313 public synchronized void killJob() throws IOException { 314 try { 315 job.killJob(); 316 } catch (InterruptedException ie) { 317 throw new IOException(ie); 318 } 319 } 320 321 322 /** Set the priority of the job. 323 * @param priority new priority of the job. 324 */ 325 public synchronized void setJobPriority(String priority) 326 throws IOException { 327 try { 328 job.setPriority( 329 org.apache.hadoop.mapreduce.JobPriority.valueOf(priority)); 330 } catch (InterruptedException ie) { 331 throw new IOException(ie); 332 } 333 } 334 335 /** 336 * Kill indicated task attempt. 337 * @param taskId the id of the task to kill. 338 * @param shouldFail if true the task is failed and added to failed tasks list, otherwise 339 * it is just killed, w/o affecting job failure status. 340 */ 341 public synchronized void killTask(TaskAttemptID taskId, 342 boolean shouldFail) throws IOException { 343 try { 344 if (shouldFail) { 345 job.failTask(taskId); 346 } else { 347 job.killTask(taskId); 348 } 349 } catch (InterruptedException ie) { 350 throw new IOException(ie); 351 } 352 } 353 354 /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/ 355 @Deprecated 356 public synchronized void killTask(String taskId, boolean shouldFail) throws IOException { 357 killTask(TaskAttemptID.forName(taskId), shouldFail); 358 } 359 360 /** 361 * Fetch task completion events from cluster for this job. 362 */ 363 public synchronized TaskCompletionEvent[] getTaskCompletionEvents( 364 int startFrom) throws IOException { 365 try { 366 org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 367 job.getTaskCompletionEvents(startFrom, 10); 368 TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length]; 369 for (int i = 0 ; i < acls.length; i++ ) { 370 ret[i] = TaskCompletionEvent.downgrade(acls[i]); 371 } 372 return ret; 373 } catch (InterruptedException ie) { 374 throw new IOException(ie); 375 } 376 } 377 378 /** 379 * Dump stats to screen 380 */ 381 @Override 382 public String toString() { 383 return job.toString(); 384 } 385 386 /** 387 * Returns the counters for this job 388 */ 389 public Counters getCounters() throws IOException { 390 try { 391 Counters result = null; 392 org.apache.hadoop.mapreduce.Counters temp = job.getCounters(); 393 if(temp != null) { 394 result = Counters.downgrade(temp); 395 } 396 return result; 397 } catch (InterruptedException ie) { 398 throw new IOException(ie); 399 } 400 } 401 402 @Override 403 public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException { 404 try { 405 return job.getTaskDiagnostics(id); 406 } catch (InterruptedException ie) { 407 throw new IOException(ie); 408 } 409 } 410 411 public String getHistoryUrl() throws IOException { 412 try { 413 return job.getHistoryUrl(); 414 } catch (InterruptedException ie) { 415 throw new IOException(ie); 416 } 417 } 418 419 public boolean isRetired() throws IOException { 420 try { 421 return job.isRetired(); 422 } catch (InterruptedException ie) { 423 throw new IOException(ie); 424 } 425 } 426 427 boolean monitorAndPrintJob() throws IOException, InterruptedException { 428 return job.monitorAndPrintJob(); 429 } 430 431 @Override 432 public String getFailureInfo() throws IOException { 433 try { 434 return job.getStatus().getFailureInfo(); 435 } catch (InterruptedException ie) { 436 throw new IOException(ie); 437 } 438 } 439 440 @Override 441 public JobStatus getJobStatus() throws IOException { 442 try { 443 return JobStatus.downgrade(job.getStatus()); 444 } catch (InterruptedException ie) { 445 throw new IOException(ie); 446 } 447 } 448 } 449 450 /** 451 * Ugi of the client. We store this ugi when the client is created and 452 * then make sure that the same ugi is used to run the various protocols. 453 */ 454 UserGroupInformation clientUgi; 455 456 /** 457 * Create a job client. 458 */ 459 public JobClient() { 460 } 461 462 /** 463 * Build a job client with the given {@link JobConf}, and connect to the 464 * default cluster 465 * 466 * @param conf the job configuration. 467 * @throws IOException 468 */ 469 public JobClient(JobConf conf) throws IOException { 470 init(conf); 471 } 472 473 /** 474 * Build a job client with the given {@link Configuration}, 475 * and connect to the default cluster 476 * 477 * @param conf the configuration. 478 * @throws IOException 479 */ 480 public JobClient(Configuration conf) throws IOException { 481 init(new JobConf(conf)); 482 } 483 484 /** 485 * Connect to the default cluster 486 * @param conf the job configuration. 487 * @throws IOException 488 */ 489 public void init(JobConf conf) throws IOException { 490 setConf(conf); 491 cluster = new Cluster(conf); 492 clientUgi = UserGroupInformation.getCurrentUser(); 493 } 494 495 /** 496 * Build a job client, connect to the indicated job tracker. 497 * 498 * @param jobTrackAddr the job tracker to connect to. 499 * @param conf configuration. 500 */ 501 public JobClient(InetSocketAddress jobTrackAddr, 502 Configuration conf) throws IOException { 503 cluster = new Cluster(jobTrackAddr, conf); 504 clientUgi = UserGroupInformation.getCurrentUser(); 505 } 506 507 /** 508 * Close the <code>JobClient</code>. 509 */ 510 public synchronized void close() throws IOException { 511 cluster.close(); 512 } 513 514 /** 515 * Get a filesystem handle. We need this to prepare jobs 516 * for submission to the MapReduce system. 517 * 518 * @return the filesystem handle. 519 */ 520 public synchronized FileSystem getFs() throws IOException { 521 try { 522 return cluster.getFileSystem(); 523 } catch (InterruptedException ie) { 524 throw new IOException(ie); 525 } 526 } 527 528 /** 529 * Get a handle to the Cluster 530 */ 531 public Cluster getClusterHandle() { 532 return cluster; 533 } 534 535 /** 536 * Submit a job to the MR system. 537 * 538 * This returns a handle to the {@link RunningJob} which can be used to track 539 * the running-job. 540 * 541 * @param jobFile the job configuration. 542 * @return a handle to the {@link RunningJob} which can be used to track the 543 * running-job. 544 * @throws FileNotFoundException 545 * @throws InvalidJobConfException 546 * @throws IOException 547 */ 548 public RunningJob submitJob(String jobFile) throws FileNotFoundException, 549 InvalidJobConfException, 550 IOException { 551 // Load in the submitted job details 552 JobConf job = new JobConf(jobFile); 553 return submitJob(job); 554 } 555 556 /** 557 * Submit a job to the MR system. 558 * This returns a handle to the {@link RunningJob} which can be used to track 559 * the running-job. 560 * 561 * @param conf the job configuration. 562 * @return a handle to the {@link RunningJob} which can be used to track the 563 * running-job. 564 * @throws FileNotFoundException 565 * @throws IOException 566 */ 567 public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, 568 IOException { 569 try { 570 conf.setBooleanIfUnset("mapred.mapper.new-api", false); 571 conf.setBooleanIfUnset("mapred.reducer.new-api", false); 572 if (getDelegationTokenCalled) { 573 conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled); 574 getDelegationTokenCalled = false; 575 } 576 Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { 577 @Override 578 public Job run() throws IOException, ClassNotFoundException, 579 InterruptedException { 580 Job job = Job.getInstance(conf); 581 job.submit(); 582 return job; 583 } 584 }); 585 // update our Cluster instance with the one created by Job for submission 586 // (we can't pass our Cluster instance to Job, since Job wraps the config 587 // instance, and the two configs would then diverge) 588 cluster = job.getCluster(); 589 return new NetworkedJob(job); 590 } catch (InterruptedException ie) { 591 throw new IOException("interrupted", ie); 592 } 593 } 594 595 private Job getJobUsingCluster(final JobID jobid) throws IOException, 596 InterruptedException { 597 return clientUgi.doAs(new PrivilegedExceptionAction<Job>() { 598 public Job run() throws IOException, InterruptedException { 599 return cluster.getJob(jobid); 600 } 601 }); 602 } 603 /** 604 * Get an {@link RunningJob} object to track an ongoing job. Returns 605 * null if the id does not correspond to any known job. 606 * 607 * @param jobid the jobid of the job. 608 * @return the {@link RunningJob} handle to track the job, null if the 609 * <code>jobid</code> doesn't correspond to any known job. 610 * @throws IOException 611 */ 612 public RunningJob getJob(final JobID jobid) throws IOException { 613 try { 614 615 Job job = getJobUsingCluster(jobid); 616 if (job != null) { 617 JobStatus status = JobStatus.downgrade(job.getStatus()); 618 if (status != null) { 619 return new NetworkedJob(status, cluster); 620 } 621 } 622 } catch (InterruptedException ie) { 623 throw new IOException(ie); 624 } 625 return null; 626 } 627 628 /**@deprecated Applications should rather use {@link #getJob(JobID)}. 629 */ 630 @Deprecated 631 public RunningJob getJob(String jobid) throws IOException { 632 return getJob(JobID.forName(jobid)); 633 } 634 635 private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0]; 636 637 /** 638 * Get the information of the current state of the map tasks of a job. 639 * 640 * @param jobId the job to query. 641 * @return the list of all of the map tips. 642 * @throws IOException 643 */ 644 public TaskReport[] getMapTaskReports(JobID jobId) throws IOException { 645 return getTaskReports(jobId, TaskType.MAP); 646 } 647 648 private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 649 IOException { 650 try { 651 Job j = getJobUsingCluster(jobId); 652 if(j == null) { 653 return EMPTY_TASK_REPORTS; 654 } 655 return TaskReport.downgradeArray(j.getTaskReports(type)); 656 } catch (InterruptedException ie) { 657 throw new IOException(ie); 658 } 659 } 660 661 /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/ 662 @Deprecated 663 public TaskReport[] getMapTaskReports(String jobId) throws IOException { 664 return getMapTaskReports(JobID.forName(jobId)); 665 } 666 667 /** 668 * Get the information of the current state of the reduce tasks of a job. 669 * 670 * @param jobId the job to query. 671 * @return the list of all of the reduce tips. 672 * @throws IOException 673 */ 674 public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException { 675 return getTaskReports(jobId, TaskType.REDUCE); 676 } 677 678 /** 679 * Get the information of the current state of the cleanup tasks of a job. 680 * 681 * @param jobId the job to query. 682 * @return the list of all of the cleanup tips. 683 * @throws IOException 684 */ 685 public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException { 686 return getTaskReports(jobId, TaskType.JOB_CLEANUP); 687 } 688 689 /** 690 * Get the information of the current state of the setup tasks of a job. 691 * 692 * @param jobId the job to query. 693 * @return the list of all of the setup tips. 694 * @throws IOException 695 */ 696 public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException { 697 return getTaskReports(jobId, TaskType.JOB_SETUP); 698 } 699 700 701 /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/ 702 @Deprecated 703 public TaskReport[] getReduceTaskReports(String jobId) throws IOException { 704 return getReduceTaskReports(JobID.forName(jobId)); 705 } 706 707 /** 708 * Display the information about a job's tasks, of a particular type and 709 * in a particular state 710 * 711 * @param jobId the ID of the job 712 * @param type the type of the task (map/reduce/setup/cleanup) 713 * @param state the state of the task 714 * (pending/running/completed/failed/killed) 715 */ 716 public void displayTasks(final JobID jobId, String type, String state) 717 throws IOException { 718 try { 719 Job job = getJobUsingCluster(jobId); 720 super.displayTasks(job, type, state); 721 } catch (InterruptedException ie) { 722 throw new IOException(ie); 723 } 724 } 725 726 /** 727 * Get status information about the Map-Reduce cluster. 728 * 729 * @return the status information about the Map-Reduce cluster as an object 730 * of {@link ClusterStatus}. 731 * @throws IOException 732 */ 733 public ClusterStatus getClusterStatus() throws IOException { 734 try { 735 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 736 public ClusterStatus run() throws IOException, InterruptedException { 737 ClusterMetrics metrics = cluster.getClusterStatus(); 738 return new ClusterStatus(metrics.getTaskTrackerCount(), 739 metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(), 740 metrics.getOccupiedMapSlots(), 741 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 742 metrics.getReduceSlotCapacity(), 743 cluster.getJobTrackerStatus(), 744 metrics.getDecommissionedTaskTrackerCount()); 745 } 746 }); 747 } 748 catch (InterruptedException ie) { 749 throw new IOException(ie); 750 } 751 } 752 753 private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) { 754 Collection<String> list = new ArrayList<String>(); 755 for (TaskTrackerInfo info: objs) { 756 list.add(info.getTaskTrackerName()); 757 } 758 return list; 759 } 760 761 private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) { 762 Collection<BlackListInfo> list = new ArrayList<BlackListInfo>(); 763 for (TaskTrackerInfo info: objs) { 764 BlackListInfo binfo = new BlackListInfo(); 765 binfo.setTrackerName(info.getTaskTrackerName()); 766 binfo.setReasonForBlackListing(info.getReasonForBlacklist()); 767 binfo.setBlackListReport(info.getBlacklistReport()); 768 list.add(binfo); 769 } 770 return list; 771 } 772 773 /** 774 * Get status information about the Map-Reduce cluster. 775 * 776 * @param detailed if true then get a detailed status including the 777 * tracker names 778 * @return the status information about the Map-Reduce cluster as an object 779 * of {@link ClusterStatus}. 780 * @throws IOException 781 */ 782 public ClusterStatus getClusterStatus(boolean detailed) throws IOException { 783 try { 784 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 785 public ClusterStatus run() throws IOException, InterruptedException { 786 ClusterMetrics metrics = cluster.getClusterStatus(); 787 return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()), 788 arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()), 789 cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), 790 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 791 metrics.getReduceSlotCapacity(), 792 cluster.getJobTrackerStatus()); 793 } 794 }); 795 } catch (InterruptedException ie) { 796 throw new IOException(ie); 797 } 798 } 799 800 801 /** 802 * Get the jobs that are not completed and not failed. 803 * 804 * @return array of {@link JobStatus} for the running/to-be-run jobs. 805 * @throws IOException 806 */ 807 public JobStatus[] jobsToComplete() throws IOException { 808 List<JobStatus> stats = new ArrayList<JobStatus>(); 809 for (JobStatus stat : getAllJobs()) { 810 if (!stat.isJobComplete()) { 811 stats.add(stat); 812 } 813 } 814 return stats.toArray(new JobStatus[0]); 815 } 816 817 /** 818 * Get the jobs that are submitted. 819 * 820 * @return array of {@link JobStatus} for the submitted jobs. 821 * @throws IOException 822 */ 823 public JobStatus[] getAllJobs() throws IOException { 824 try { 825 org.apache.hadoop.mapreduce.JobStatus[] jobs = 826 clientUgi.doAs(new PrivilegedExceptionAction< 827 org.apache.hadoop.mapreduce.JobStatus[]> () { 828 public org.apache.hadoop.mapreduce.JobStatus[] run() 829 throws IOException, InterruptedException { 830 return cluster.getAllJobStatuses(); 831 } 832 }); 833 JobStatus[] stats = new JobStatus[jobs.length]; 834 for (int i = 0; i < jobs.length; i++) { 835 stats[i] = JobStatus.downgrade(jobs[i]); 836 } 837 return stats; 838 } catch (InterruptedException ie) { 839 throw new IOException(ie); 840 } 841 } 842 843 /** 844 * Utility that submits a job, then polls for progress until the job is 845 * complete. 846 * 847 * @param job the job configuration. 848 * @throws IOException if the job fails 849 */ 850 public static RunningJob runJob(JobConf job) throws IOException { 851 JobClient jc = new JobClient(job); 852 RunningJob rj = jc.submitJob(job); 853 try { 854 if (!jc.monitorAndPrintJob(job, rj)) { 855 throw new IOException("Job failed!"); 856 } 857 } catch (InterruptedException ie) { 858 Thread.currentThread().interrupt(); 859 } 860 return rj; 861 } 862 863 /** 864 * Monitor a job and print status in real-time as progress is made and tasks 865 * fail. 866 * @param conf the job's configuration 867 * @param job the job to track 868 * @return true if the job succeeded 869 * @throws IOException if communication to the JobTracker fails 870 */ 871 public boolean monitorAndPrintJob(JobConf conf, 872 RunningJob job 873 ) throws IOException, InterruptedException { 874 return ((NetworkedJob)job).monitorAndPrintJob(); 875 } 876 877 static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 878 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 879 } 880 881 static Configuration getConfiguration(String jobTrackerSpec) 882 { 883 Configuration conf = new Configuration(); 884 if (jobTrackerSpec != null) { 885 if (jobTrackerSpec.indexOf(":") >= 0) { 886 conf.set("mapred.job.tracker", jobTrackerSpec); 887 } else { 888 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml"; 889 URL validate = conf.getResource(classpathFile); 890 if (validate == null) { 891 throw new RuntimeException(classpathFile + " not found on CLASSPATH"); 892 } 893 conf.addResource(classpathFile); 894 } 895 } 896 return conf; 897 } 898 899 /** 900 * Sets the output filter for tasks. only those tasks are printed whose 901 * output matches the filter. 902 * @param newValue task filter. 903 */ 904 @Deprecated 905 public void setTaskOutputFilter(TaskStatusFilter newValue){ 906 this.taskOutputFilter = newValue; 907 } 908 909 /** 910 * Get the task output filter out of the JobConf. 911 * 912 * @param job the JobConf to examine. 913 * @return the filter level. 914 */ 915 public static TaskStatusFilter getTaskOutputFilter(JobConf job) { 916 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 917 "FAILED")); 918 } 919 920 /** 921 * Modify the JobConf to set the task output filter. 922 * 923 * @param job the JobConf to modify. 924 * @param newValue the value to set. 925 */ 926 public static void setTaskOutputFilter(JobConf job, 927 TaskStatusFilter newValue) { 928 job.set("jobclient.output.filter", newValue.toString()); 929 } 930 931 /** 932 * Returns task output filter. 933 * @return task filter. 934 */ 935 @Deprecated 936 public TaskStatusFilter getTaskOutputFilter(){ 937 return this.taskOutputFilter; 938 } 939 940 protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs, 941 String counterGroupName, String counterName) throws IOException { 942 Counters counters = Counters.downgrade(cntrs); 943 return counters.findCounter(counterGroupName, counterName).getValue(); 944 } 945 946 /** 947 * Get status information about the max available Maps in the cluster. 948 * 949 * @return the max available Maps in the cluster 950 * @throws IOException 951 */ 952 public int getDefaultMaps() throws IOException { 953 try { 954 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 955 @Override 956 public Integer run() throws IOException, InterruptedException { 957 return cluster.getClusterStatus().getMapSlotCapacity(); 958 } 959 }); 960 } catch (InterruptedException ie) { 961 throw new IOException(ie); 962 } 963 } 964 965 /** 966 * Get status information about the max available Reduces in the cluster. 967 * 968 * @return the max available Reduces in the cluster 969 * @throws IOException 970 */ 971 public int getDefaultReduces() throws IOException { 972 try { 973 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 974 @Override 975 public Integer run() throws IOException, InterruptedException { 976 return cluster.getClusterStatus().getReduceSlotCapacity(); 977 } 978 }); 979 } catch (InterruptedException ie) { 980 throw new IOException(ie); 981 } 982 } 983 984 /** 985 * Grab the jobtracker system directory path where job-specific files are to be placed. 986 * 987 * @return the system directory where job-specific files are to be placed. 988 */ 989 public Path getSystemDir() { 990 try { 991 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() { 992 @Override 993 public Path run() throws IOException, InterruptedException { 994 return cluster.getSystemDir(); 995 } 996 }); 997 } catch (IOException ioe) { 998 return null; 999 } catch (InterruptedException ie) { 1000 return null; 1001 } 1002 } 1003 1004 private JobQueueInfo getJobQueueInfo(QueueInfo queue) { 1005 JobQueueInfo ret = new JobQueueInfo(queue); 1006 // make sure to convert any children 1007 if (queue.getQueueChildren().size() > 0) { 1008 List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue 1009 .getQueueChildren().size()); 1010 for (QueueInfo child : queue.getQueueChildren()) { 1011 childQueues.add(getJobQueueInfo(child)); 1012 } 1013 ret.setChildren(childQueues); 1014 } 1015 return ret; 1016 } 1017 1018 private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues) 1019 throws IOException { 1020 JobQueueInfo[] ret = new JobQueueInfo[queues.length]; 1021 for (int i = 0; i < queues.length; i++) { 1022 ret[i] = getJobQueueInfo(queues[i]); 1023 } 1024 return ret; 1025 } 1026 1027 /** 1028 * Returns an array of queue information objects about root level queues 1029 * configured 1030 * 1031 * @return the array of root level JobQueueInfo objects 1032 * @throws IOException 1033 */ 1034 public JobQueueInfo[] getRootQueues() throws IOException { 1035 try { 1036 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1037 public JobQueueInfo[] run() throws IOException, InterruptedException { 1038 return getJobQueueInfoArray(cluster.getRootQueues()); 1039 } 1040 }); 1041 } catch (InterruptedException ie) { 1042 throw new IOException(ie); 1043 } 1044 } 1045 1046 /** 1047 * Returns an array of queue information objects about immediate children 1048 * of queue queueName. 1049 * 1050 * @param queueName 1051 * @return the array of immediate children JobQueueInfo objects 1052 * @throws IOException 1053 */ 1054 public JobQueueInfo[] getChildQueues(final String queueName) throws IOException { 1055 try { 1056 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1057 public JobQueueInfo[] run() throws IOException, InterruptedException { 1058 return getJobQueueInfoArray(cluster.getChildQueues(queueName)); 1059 } 1060 }); 1061 } catch (InterruptedException ie) { 1062 throw new IOException(ie); 1063 } 1064 } 1065 1066 /** 1067 * Return an array of queue information objects about all the Job Queues 1068 * configured. 1069 * 1070 * @return Array of JobQueueInfo objects 1071 * @throws IOException 1072 */ 1073 public JobQueueInfo[] getQueues() throws IOException { 1074 try { 1075 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1076 public JobQueueInfo[] run() throws IOException, InterruptedException { 1077 return getJobQueueInfoArray(cluster.getQueues()); 1078 } 1079 }); 1080 } catch (InterruptedException ie) { 1081 throw new IOException(ie); 1082 } 1083 } 1084 1085 /** 1086 * Gets all the jobs which were added to particular Job Queue 1087 * 1088 * @param queueName name of the Job Queue 1089 * @return Array of jobs present in the job queue 1090 * @throws IOException 1091 */ 1092 1093 public JobStatus[] getJobsFromQueue(final String queueName) throws IOException { 1094 try { 1095 QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() { 1096 @Override 1097 public QueueInfo run() throws IOException, InterruptedException { 1098 return cluster.getQueue(queueName); 1099 } 1100 }); 1101 if (queue == null) { 1102 return null; 1103 } 1104 org.apache.hadoop.mapreduce.JobStatus[] stats = 1105 queue.getJobStatuses(); 1106 JobStatus[] ret = new JobStatus[stats.length]; 1107 for (int i = 0 ; i < stats.length; i++ ) { 1108 ret[i] = JobStatus.downgrade(stats[i]); 1109 } 1110 return ret; 1111 } catch (InterruptedException ie) { 1112 throw new IOException(ie); 1113 } 1114 } 1115 1116 /** 1117 * Gets the queue information associated to a particular Job Queue 1118 * 1119 * @param queueName name of the job queue. 1120 * @return Queue information associated to particular queue. 1121 * @throws IOException 1122 */ 1123 public JobQueueInfo getQueueInfo(final String queueName) throws IOException { 1124 try { 1125 QueueInfo queueInfo = clientUgi.doAs(new 1126 PrivilegedExceptionAction<QueueInfo>() { 1127 public QueueInfo run() throws IOException, InterruptedException { 1128 return cluster.getQueue(queueName); 1129 } 1130 }); 1131 if (queueInfo != null) { 1132 return new JobQueueInfo(queueInfo); 1133 } 1134 return null; 1135 } catch (InterruptedException ie) { 1136 throw new IOException(ie); 1137 } 1138 } 1139 1140 /** 1141 * Gets the Queue ACLs for current user 1142 * @return array of QueueAclsInfo object for current user. 1143 * @throws IOException 1144 */ 1145 public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException { 1146 try { 1147 org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 1148 clientUgi.doAs(new 1149 PrivilegedExceptionAction 1150 <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() { 1151 public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 1152 throws IOException, InterruptedException { 1153 return cluster.getQueueAclsForCurrentUser(); 1154 } 1155 }); 1156 QueueAclsInfo[] ret = new QueueAclsInfo[acls.length]; 1157 for (int i = 0 ; i < acls.length; i++ ) { 1158 ret[i] = QueueAclsInfo.downgrade(acls[i]); 1159 } 1160 return ret; 1161 } catch (InterruptedException ie) { 1162 throw new IOException(ie); 1163 } 1164 } 1165 1166 /** 1167 * Get a delegation token for the user from the JobTracker. 1168 * @param renewer the user who can renew the token 1169 * @return the new token 1170 * @throws IOException 1171 */ 1172 public Token<DelegationTokenIdentifier> 1173 getDelegationToken(final Text renewer) throws IOException, InterruptedException { 1174 getDelegationTokenCalled = true; 1175 return clientUgi.doAs(new 1176 PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { 1177 public Token<DelegationTokenIdentifier> run() throws IOException, 1178 InterruptedException { 1179 return cluster.getDelegationToken(renewer); 1180 } 1181 }); 1182 } 1183 1184 /** 1185 * Renew a delegation token 1186 * @param token the token to renew 1187 * @return true if the renewal went well 1188 * @throws InvalidToken 1189 * @throws IOException 1190 * @deprecated Use {@link Token#renew} instead 1191 */ 1192 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 1193 ) throws InvalidToken, IOException, 1194 InterruptedException { 1195 return token.renew(getConf()); 1196 } 1197 1198 /** 1199 * Cancel a delegation token from the JobTracker 1200 * @param token the token to cancel 1201 * @throws IOException 1202 * @deprecated Use {@link Token#cancel} instead 1203 */ 1204 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 1205 ) throws InvalidToken, IOException, 1206 InterruptedException { 1207 token.cancel(getConf()); 1208 } 1209 1210 /** 1211 */ 1212 public static void main(String argv[]) throws Exception { 1213 int res = ToolRunner.run(new JobClient(), argv); 1214 System.exit(res); 1215 } 1216 } 1217