001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.mapreduce.tools; 019 020 import java.io.IOException; 021 import java.io.OutputStreamWriter; 022 import java.io.PrintWriter; 023 import java.util.ArrayList; 024 import java.util.List; 025 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.hadoop.classification.InterfaceAudience; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.classification.InterfaceAudience.Private; 031 import org.apache.hadoop.conf.Configuration; 032 import org.apache.hadoop.conf.Configured; 033 import org.apache.hadoop.ipc.RemoteException; 034 import org.apache.hadoop.mapred.JobConf; 035 import org.apache.hadoop.mapred.TIPStatus; 036 import org.apache.hadoop.mapreduce.Cluster; 037 import org.apache.hadoop.mapreduce.Counters; 038 import org.apache.hadoop.mapreduce.Job; 039 import org.apache.hadoop.mapreduce.JobID; 040 import org.apache.hadoop.mapreduce.JobPriority; 041 import org.apache.hadoop.mapreduce.JobStatus; 042 import org.apache.hadoop.mapreduce.TaskAttemptID; 043 import org.apache.hadoop.mapreduce.TaskCompletionEvent; 044 import org.apache.hadoop.mapreduce.TaskReport; 045 import org.apache.hadoop.mapreduce.TaskTrackerInfo; 046 import org.apache.hadoop.mapreduce.TaskType; 047 import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; 048 import org.apache.hadoop.mapreduce.v2.LogParams; 049 import org.apache.hadoop.security.AccessControlException; 050 import org.apache.hadoop.util.Tool; 051 import org.apache.hadoop.util.ToolRunner; 052 import org.apache.hadoop.yarn.logaggregation.LogDumper; 053 054 import com.google.common.base.Charsets; 055 056 /** 057 * Interprets the map reduce cli options 058 */ 059 @InterfaceAudience.Public 060 @InterfaceStability.Stable 061 public class CLI extends Configured implements Tool { 062 private static final Log LOG = LogFactory.getLog(CLI.class); 063 protected Cluster cluster; 064 065 public CLI() { 066 } 067 068 public CLI(Configuration conf) { 069 setConf(conf); 070 } 071 072 public int run(String[] argv) throws Exception { 073 int exitCode = -1; 074 if (argv.length < 1) { 075 displayUsage(""); 076 return exitCode; 077 } 078 // process arguments 079 String cmd = argv[0]; 080 String submitJobFile = null; 081 String jobid = null; 082 String taskid = null; 083 String historyFile = null; 084 String counterGroupName = null; 085 String counterName = null; 086 JobPriority jp = null; 087 String taskType = null; 088 String taskState = null; 089 int fromEvent = 0; 090 int nEvents = 0; 091 boolean getStatus = false; 092 boolean getCounter = false; 093 boolean killJob = false; 094 boolean listEvents = false; 095 boolean viewHistory = false; 096 boolean viewAllHistory = false; 097 boolean listJobs = false; 098 boolean listAllJobs = false; 099 boolean listActiveTrackers = false; 100 boolean listBlacklistedTrackers = false; 101 boolean displayTasks = false; 102 boolean killTask = false; 103 boolean failTask = false; 104 boolean setJobPriority = false; 105 boolean logs = false; 106 107 if ("-submit".equals(cmd)) { 108 if (argv.length != 2) { 109 displayUsage(cmd); 110 return exitCode; 111 } 112 submitJobFile = argv[1]; 113 } else if ("-status".equals(cmd)) { 114 if (argv.length != 2) { 115 displayUsage(cmd); 116 return exitCode; 117 } 118 jobid = argv[1]; 119 getStatus = true; 120 } else if("-counter".equals(cmd)) { 121 if (argv.length != 4) { 122 displayUsage(cmd); 123 return exitCode; 124 } 125 getCounter = true; 126 jobid = argv[1]; 127 counterGroupName = argv[2]; 128 counterName = argv[3]; 129 } else if ("-kill".equals(cmd)) { 130 if (argv.length != 2) { 131 displayUsage(cmd); 132 return exitCode; 133 } 134 jobid = argv[1]; 135 killJob = true; 136 } else if ("-set-priority".equals(cmd)) { 137 if (argv.length != 3) { 138 displayUsage(cmd); 139 return exitCode; 140 } 141 jobid = argv[1]; 142 try { 143 jp = JobPriority.valueOf(argv[2]); 144 } catch (IllegalArgumentException iae) { 145 LOG.info(iae); 146 displayUsage(cmd); 147 return exitCode; 148 } 149 setJobPriority = true; 150 } else if ("-events".equals(cmd)) { 151 if (argv.length != 4) { 152 displayUsage(cmd); 153 return exitCode; 154 } 155 jobid = argv[1]; 156 fromEvent = Integer.parseInt(argv[2]); 157 nEvents = Integer.parseInt(argv[3]); 158 listEvents = true; 159 } else if ("-history".equals(cmd)) { 160 if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) { 161 displayUsage(cmd); 162 return exitCode; 163 } 164 viewHistory = true; 165 if (argv.length == 3 && "all".equals(argv[1])) { 166 viewAllHistory = true; 167 historyFile = argv[2]; 168 } else { 169 historyFile = argv[1]; 170 } 171 } else if ("-list".equals(cmd)) { 172 if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) { 173 displayUsage(cmd); 174 return exitCode; 175 } 176 if (argv.length == 2 && "all".equals(argv[1])) { 177 listAllJobs = true; 178 } else { 179 listJobs = true; 180 } 181 } else if("-kill-task".equals(cmd)) { 182 if (argv.length != 2) { 183 displayUsage(cmd); 184 return exitCode; 185 } 186 killTask = true; 187 taskid = argv[1]; 188 } else if("-fail-task".equals(cmd)) { 189 if (argv.length != 2) { 190 displayUsage(cmd); 191 return exitCode; 192 } 193 failTask = true; 194 taskid = argv[1]; 195 } else if ("-list-active-trackers".equals(cmd)) { 196 if (argv.length != 1) { 197 displayUsage(cmd); 198 return exitCode; 199 } 200 listActiveTrackers = true; 201 } else if ("-list-blacklisted-trackers".equals(cmd)) { 202 if (argv.length != 1) { 203 displayUsage(cmd); 204 return exitCode; 205 } 206 listBlacklistedTrackers = true; 207 } else if ("-list-attempt-ids".equals(cmd)) { 208 if (argv.length != 4) { 209 displayUsage(cmd); 210 return exitCode; 211 } 212 jobid = argv[1]; 213 taskType = argv[2]; 214 taskState = argv[3]; 215 displayTasks = true; 216 } else if ("-logs".equals(cmd)) { 217 if (argv.length == 2 || argv.length ==3) { 218 logs = true; 219 jobid = argv[1]; 220 if (argv.length == 3) { 221 taskid = argv[2]; 222 } else { 223 taskid = null; 224 } 225 } else { 226 displayUsage(cmd); 227 return exitCode; 228 } 229 } else { 230 displayUsage(cmd); 231 return exitCode; 232 } 233 234 // initialize cluster 235 cluster = new Cluster(getConf()); 236 237 // Submit the request 238 try { 239 if (submitJobFile != null) { 240 Job job = Job.getInstance(new JobConf(submitJobFile)); 241 job.submit(); 242 System.out.println("Created job " + job.getJobID()); 243 exitCode = 0; 244 } else if (getStatus) { 245 Job job = cluster.getJob(JobID.forName(jobid)); 246 if (job == null) { 247 System.out.println("Could not find job " + jobid); 248 } else { 249 Counters counters = job.getCounters(); 250 System.out.println(); 251 System.out.println(job); 252 if (counters != null) { 253 System.out.println(counters); 254 } else { 255 System.out.println("Counters not available. Job is retired."); 256 } 257 exitCode = 0; 258 } 259 } else if (getCounter) { 260 Job job = cluster.getJob(JobID.forName(jobid)); 261 if (job == null) { 262 System.out.println("Could not find job " + jobid); 263 } else { 264 Counters counters = job.getCounters(); 265 if (counters == null) { 266 System.out.println("Counters not available for retired job " + 267 jobid); 268 exitCode = -1; 269 } else { 270 System.out.println(getCounter(counters, 271 counterGroupName, counterName)); 272 exitCode = 0; 273 } 274 } 275 } else if (killJob) { 276 Job job = cluster.getJob(JobID.forName(jobid)); 277 if (job == null) { 278 System.out.println("Could not find job " + jobid); 279 } else { 280 job.killJob(); 281 System.out.println("Killed job " + jobid); 282 exitCode = 0; 283 } 284 } else if (setJobPriority) { 285 Job job = cluster.getJob(JobID.forName(jobid)); 286 if (job == null) { 287 System.out.println("Could not find job " + jobid); 288 } else { 289 job.setPriority(jp); 290 System.out.println("Changed job priority."); 291 exitCode = 0; 292 } 293 } else if (viewHistory) { 294 viewHistory(historyFile, viewAllHistory); 295 exitCode = 0; 296 } else if (listEvents) { 297 listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents); 298 exitCode = 0; 299 } else if (listJobs) { 300 listJobs(cluster); 301 exitCode = 0; 302 } else if (listAllJobs) { 303 listAllJobs(cluster); 304 exitCode = 0; 305 } else if (listActiveTrackers) { 306 listActiveTrackers(cluster); 307 exitCode = 0; 308 } else if (listBlacklistedTrackers) { 309 listBlacklistedTrackers(cluster); 310 exitCode = 0; 311 } else if (displayTasks) { 312 displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState); 313 } else if(killTask) { 314 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 315 Job job = cluster.getJob(taskID.getJobID()); 316 if (job == null) { 317 System.out.println("Could not find job " + jobid); 318 } else if (job.killTask(taskID)) { 319 System.out.println("Killed task " + taskid); 320 exitCode = 0; 321 } else { 322 System.out.println("Could not kill task " + taskid); 323 exitCode = -1; 324 } 325 } else if(failTask) { 326 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 327 Job job = cluster.getJob(taskID.getJobID()); 328 if (job == null) { 329 System.out.println("Could not find job " + jobid); 330 } else if(job.failTask(taskID)) { 331 System.out.println("Killed task " + taskID + " by failing it"); 332 exitCode = 0; 333 } else { 334 System.out.println("Could not fail task " + taskid); 335 exitCode = -1; 336 } 337 } else if (logs) { 338 try { 339 JobID jobID = JobID.forName(jobid); 340 TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid); 341 LogParams logParams = cluster.getLogParams(jobID, taskAttemptID); 342 LogDumper logDumper = new LogDumper(); 343 logDumper.setConf(getConf()); 344 exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(), 345 logParams.getContainerId(), logParams.getNodeId(), 346 logParams.getOwner()); 347 } catch (IOException e) { 348 if (e instanceof RemoteException) { 349 throw e; 350 } 351 System.out.println(e.getMessage()); 352 } 353 } 354 } catch (RemoteException re) { 355 IOException unwrappedException = re.unwrapRemoteException(); 356 if (unwrappedException instanceof AccessControlException) { 357 System.out.println(unwrappedException.getMessage()); 358 } else { 359 throw re; 360 } 361 } finally { 362 cluster.close(); 363 } 364 return exitCode; 365 } 366 367 private String getJobPriorityNames() { 368 StringBuffer sb = new StringBuffer(); 369 for (JobPriority p : JobPriority.values()) { 370 sb.append(p.name()).append(" "); 371 } 372 return sb.substring(0, sb.length()-1); 373 } 374 375 private String getTaskTypess() { 376 StringBuffer sb = new StringBuffer(); 377 for (TaskType t : TaskType.values()) { 378 sb.append(t.name()).append(" "); 379 } 380 return sb.substring(0, sb.length()-1); 381 } 382 383 /** 384 * Display usage of the command-line tool and terminate execution. 385 */ 386 private void displayUsage(String cmd) { 387 String prefix = "Usage: CLI "; 388 String jobPriorityValues = getJobPriorityNames(); 389 String taskTypes = getTaskTypess(); 390 String taskStates = "running, completed"; 391 if ("-submit".equals(cmd)) { 392 System.err.println(prefix + "[" + cmd + " <job-file>]"); 393 } else if ("-status".equals(cmd) || "-kill".equals(cmd)) { 394 System.err.println(prefix + "[" + cmd + " <job-id>]"); 395 } else if ("-counter".equals(cmd)) { 396 System.err.println(prefix + "[" + cmd + 397 " <job-id> <group-name> <counter-name>]"); 398 } else if ("-events".equals(cmd)) { 399 System.err.println(prefix + "[" + cmd + 400 " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1."); 401 } else if ("-history".equals(cmd)) { 402 System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]"); 403 } else if ("-list".equals(cmd)) { 404 System.err.println(prefix + "[" + cmd + " [all]]"); 405 } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) { 406 System.err.println(prefix + "[" + cmd + " <task-attempt-id>]"); 407 } else if ("-set-priority".equals(cmd)) { 408 System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " + 409 "Valid values for priorities are: " 410 + jobPriorityValues); 411 } else if ("-list-active-trackers".equals(cmd)) { 412 System.err.println(prefix + "[" + cmd + "]"); 413 } else if ("-list-blacklisted-trackers".equals(cmd)) { 414 System.err.println(prefix + "[" + cmd + "]"); 415 } else if ("-list-attempt-ids".equals(cmd)) { 416 System.err.println(prefix + "[" + cmd + 417 " <job-id> <task-type> <task-state>]. " + 418 "Valid values for <task-type> are " + taskTypes + ". " + 419 "Valid values for <task-state> are " + taskStates); 420 } else if ("-logs".equals(cmd)) { 421 System.err.println(prefix + "[" + cmd + 422 " <job-id> <task-attempt-id>]. " + 423 " <task-attempt-id> is optional to get task attempt logs."); 424 } else { 425 System.err.printf(prefix + "<command> <args>%n"); 426 System.err.printf("\t[-submit <job-file>]%n"); 427 System.err.printf("\t[-status <job-id>]%n"); 428 System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n"); 429 System.err.printf("\t[-kill <job-id>]%n"); 430 System.err.printf("\t[-set-priority <job-id> <priority>]. " + 431 "Valid values for priorities are: " + jobPriorityValues + "%n"); 432 System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n"); 433 System.err.printf("\t[-history <jobHistoryFile>]%n"); 434 System.err.printf("\t[-list [all]]%n"); 435 System.err.printf("\t[-list-active-trackers]%n"); 436 System.err.printf("\t[-list-blacklisted-trackers]%n"); 437 System.err.println("\t[-list-attempt-ids <job-id> <task-type> " + 438 "<task-state>]. " + 439 "Valid values for <task-type> are " + taskTypes + ". " + 440 "Valid values for <task-state> are " + taskStates); 441 System.err.printf("\t[-kill-task <task-attempt-id>]%n"); 442 System.err.printf("\t[-fail-task <task-attempt-id>]%n"); 443 System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n%n"); 444 ToolRunner.printGenericCommandUsage(System.out); 445 } 446 } 447 448 private void viewHistory(String historyFile, boolean all) 449 throws IOException { 450 HistoryViewer historyViewer = new HistoryViewer(historyFile, 451 getConf(), all); 452 historyViewer.print(); 453 } 454 455 protected long getCounter(Counters counters, String counterGroupName, 456 String counterName) throws IOException { 457 return counters.findCounter(counterGroupName, counterName).getValue(); 458 } 459 460 /** 461 * List the events for the given job 462 * @param jobId the job id for the job's events to list 463 * @throws IOException 464 */ 465 private void listEvents(Job job, int fromEventId, int numEvents) 466 throws IOException, InterruptedException { 467 TaskCompletionEvent[] events = job. 468 getTaskCompletionEvents(fromEventId, numEvents); 469 System.out.println("Task completion events for " + job.getJobID()); 470 System.out.println("Number of events (from " + fromEventId + ") are: " 471 + events.length); 472 for(TaskCompletionEvent event: events) { 473 System.out.println(event.getStatus() + " " + 474 event.getTaskAttemptId() + " " + 475 getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp())); 476 } 477 } 478 479 protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 480 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 481 } 482 483 484 /** 485 * Dump a list of currently running jobs 486 * @throws IOException 487 */ 488 private void listJobs(Cluster cluster) 489 throws IOException, InterruptedException { 490 List<JobStatus> runningJobs = new ArrayList<JobStatus>(); 491 for (JobStatus job : cluster.getAllJobStatuses()) { 492 if (!job.isJobComplete()) { 493 runningJobs.add(job); 494 } 495 } 496 displayJobList(runningJobs.toArray(new JobStatus[0])); 497 } 498 499 /** 500 * Dump a list of all jobs submitted. 501 * @throws IOException 502 */ 503 private void listAllJobs(Cluster cluster) 504 throws IOException, InterruptedException { 505 displayJobList(cluster.getAllJobStatuses()); 506 } 507 508 /** 509 * Display the list of active trackers 510 */ 511 private void listActiveTrackers(Cluster cluster) 512 throws IOException, InterruptedException { 513 TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers(); 514 for (TaskTrackerInfo tracker : trackers) { 515 System.out.println(tracker.getTaskTrackerName()); 516 } 517 } 518 519 /** 520 * Display the list of blacklisted trackers 521 */ 522 private void listBlacklistedTrackers(Cluster cluster) 523 throws IOException, InterruptedException { 524 TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers(); 525 if (trackers.length > 0) { 526 System.out.println("BlackListedNode \t Reason"); 527 } 528 for (TaskTrackerInfo tracker : trackers) { 529 System.out.println(tracker.getTaskTrackerName() + "\t" + 530 tracker.getReasonForBlacklist()); 531 } 532 } 533 534 private void printTaskAttempts(TaskReport report) { 535 if (report.getCurrentStatus() == TIPStatus.COMPLETE) { 536 System.out.println(report.getSuccessfulTaskAttemptId()); 537 } else if (report.getCurrentStatus() == TIPStatus.RUNNING) { 538 for (TaskAttemptID t : 539 report.getRunningTaskAttemptIds()) { 540 System.out.println(t); 541 } 542 } 543 } 544 545 /** 546 * Display the information about a job's tasks, of a particular type and 547 * in a particular state 548 * 549 * @param job the job 550 * @param type the type of the task (map/reduce/setup/cleanup) 551 * @param state the state of the task 552 * (pending/running/completed/failed/killed) 553 */ 554 protected void displayTasks(Job job, String type, String state) 555 throws IOException, InterruptedException { 556 TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type)); 557 for (TaskReport report : reports) { 558 TIPStatus status = report.getCurrentStatus(); 559 if ((state.equals("pending") && status ==TIPStatus.PENDING) || 560 (state.equals("running") && status ==TIPStatus.RUNNING) || 561 (state.equals("completed") && status == TIPStatus.COMPLETE) || 562 (state.equals("failed") && status == TIPStatus.FAILED) || 563 (state.equals("killed") && status == TIPStatus.KILLED)) { 564 printTaskAttempts(report); 565 } 566 } 567 } 568 569 public void displayJobList(JobStatus[] jobs) 570 throws IOException, InterruptedException { 571 displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out, 572 Charsets.UTF_8))); 573 } 574 575 @Private 576 public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 577 @Private 578 public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 579 private static String memPattern = "%dM"; 580 private static String UNAVAILABLE = "N/A"; 581 582 @Private 583 public void displayJobList(JobStatus[] jobs, PrintWriter writer) { 584 writer.println("Total jobs:" + jobs.length); 585 writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName", 586 "Queue", "Priority", "UsedContainers", 587 "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info"); 588 for (JobStatus job : jobs) { 589 int numUsedSlots = job.getNumUsedSlots(); 590 int numReservedSlots = job.getNumReservedSlots(); 591 int usedMem = job.getUsedMem(); 592 int rsvdMem = job.getReservedMem(); 593 int neededMem = job.getNeededMem(); 594 writer.printf(dataPattern, 595 job.getJobID().toString(), job.getState(), job.getStartTime(), 596 job.getUsername(), job.getQueue(), 597 job.getPriority().name(), 598 numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots, 599 numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots, 600 usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem), 601 rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem), 602 neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem), 603 job.getSchedulingInfo()); 604 } 605 writer.flush(); 606 } 607 608 public static void main(String[] argv) throws Exception { 609 int res = ToolRunner.run(new CLI(), argv); 610 System.exit(res); 611 } 612 }