001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapreduce.tools;
019    
020    import java.io.IOException;
021    import java.io.OutputStreamWriter;
022    import java.io.PrintWriter;
023    import java.util.ArrayList;
024    import java.util.List;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.classification.InterfaceAudience.Private;
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.hadoop.conf.Configured;
033    import org.apache.hadoop.ipc.RemoteException;
034    import org.apache.hadoop.mapred.JobConf;
035    import org.apache.hadoop.mapred.TIPStatus;
036    import org.apache.hadoop.mapreduce.Cluster;
037    import org.apache.hadoop.mapreduce.Counters;
038    import org.apache.hadoop.mapreduce.Job;
039    import org.apache.hadoop.mapreduce.JobID;
040    import org.apache.hadoop.mapreduce.JobPriority;
041    import org.apache.hadoop.mapreduce.JobStatus;
042    import org.apache.hadoop.mapreduce.TaskAttemptID;
043    import org.apache.hadoop.mapreduce.TaskCompletionEvent;
044    import org.apache.hadoop.mapreduce.TaskReport;
045    import org.apache.hadoop.mapreduce.TaskTrackerInfo;
046    import org.apache.hadoop.mapreduce.TaskType;
047    import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
048    import org.apache.hadoop.mapreduce.v2.LogParams;
049    import org.apache.hadoop.security.AccessControlException;
050    import org.apache.hadoop.util.Tool;
051    import org.apache.hadoop.util.ToolRunner;
052    import org.apache.hadoop.yarn.logaggregation.LogDumper;
053    
054    import com.google.common.base.Charsets;
055    
056    /**
057     * Interprets the map reduce cli options 
058     */
059    @InterfaceAudience.Public
060    @InterfaceStability.Stable
061    public class CLI extends Configured implements Tool {
062      private static final Log LOG = LogFactory.getLog(CLI.class);
063      protected Cluster cluster;
064    
065      public CLI() {
066      }
067      
068      public CLI(Configuration conf) {
069        setConf(conf);
070      }
071      
072      public int run(String[] argv) throws Exception {
073        int exitCode = -1;
074        if (argv.length < 1) {
075          displayUsage("");
076          return exitCode;
077        }    
078        // process arguments
079        String cmd = argv[0];
080        String submitJobFile = null;
081        String jobid = null;
082        String taskid = null;
083        String historyFile = null;
084        String counterGroupName = null;
085        String counterName = null;
086        JobPriority jp = null;
087        String taskType = null;
088        String taskState = null;
089        int fromEvent = 0;
090        int nEvents = 0;
091        boolean getStatus = false;
092        boolean getCounter = false;
093        boolean killJob = false;
094        boolean listEvents = false;
095        boolean viewHistory = false;
096        boolean viewAllHistory = false;
097        boolean listJobs = false;
098        boolean listAllJobs = false;
099        boolean listActiveTrackers = false;
100        boolean listBlacklistedTrackers = false;
101        boolean displayTasks = false;
102        boolean killTask = false;
103        boolean failTask = false;
104        boolean setJobPriority = false;
105        boolean logs = false;
106    
107        if ("-submit".equals(cmd)) {
108          if (argv.length != 2) {
109            displayUsage(cmd);
110            return exitCode;
111          }
112          submitJobFile = argv[1];
113        } else if ("-status".equals(cmd)) {
114          if (argv.length != 2) {
115            displayUsage(cmd);
116            return exitCode;
117          }
118          jobid = argv[1];
119          getStatus = true;
120        } else if("-counter".equals(cmd)) {
121          if (argv.length != 4) {
122            displayUsage(cmd);
123            return exitCode;
124          }
125          getCounter = true;
126          jobid = argv[1];
127          counterGroupName = argv[2];
128          counterName = argv[3];
129        } else if ("-kill".equals(cmd)) {
130          if (argv.length != 2) {
131            displayUsage(cmd);
132            return exitCode;
133          }
134          jobid = argv[1];
135          killJob = true;
136        } else if ("-set-priority".equals(cmd)) {
137          if (argv.length != 3) {
138            displayUsage(cmd);
139            return exitCode;
140          }
141          jobid = argv[1];
142          try {
143            jp = JobPriority.valueOf(argv[2]); 
144          } catch (IllegalArgumentException iae) {
145            LOG.info(iae);
146            displayUsage(cmd);
147            return exitCode;
148          }
149          setJobPriority = true; 
150        } else if ("-events".equals(cmd)) {
151          if (argv.length != 4) {
152            displayUsage(cmd);
153            return exitCode;
154          }
155          jobid = argv[1];
156          fromEvent = Integer.parseInt(argv[2]);
157          nEvents = Integer.parseInt(argv[3]);
158          listEvents = true;
159        } else if ("-history".equals(cmd)) {
160          if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
161             displayUsage(cmd);
162             return exitCode;
163          }
164          viewHistory = true;
165          if (argv.length == 3 && "all".equals(argv[1])) {
166            viewAllHistory = true;
167            historyFile = argv[2];
168          } else {
169            historyFile = argv[1];
170          }
171        } else if ("-list".equals(cmd)) {
172          if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
173            displayUsage(cmd);
174            return exitCode;
175          }
176          if (argv.length == 2 && "all".equals(argv[1])) {
177            listAllJobs = true;
178          } else {
179            listJobs = true;
180          }
181        } else if("-kill-task".equals(cmd)) {
182          if (argv.length != 2) {
183            displayUsage(cmd);
184            return exitCode;
185          }
186          killTask = true;
187          taskid = argv[1];
188        } else if("-fail-task".equals(cmd)) {
189          if (argv.length != 2) {
190            displayUsage(cmd);
191            return exitCode;
192          }
193          failTask = true;
194          taskid = argv[1];
195        } else if ("-list-active-trackers".equals(cmd)) {
196          if (argv.length != 1) {
197            displayUsage(cmd);
198            return exitCode;
199          }
200          listActiveTrackers = true;
201        } else if ("-list-blacklisted-trackers".equals(cmd)) {
202          if (argv.length != 1) {
203            displayUsage(cmd);
204            return exitCode;
205          }
206          listBlacklistedTrackers = true;
207        } else if ("-list-attempt-ids".equals(cmd)) {
208          if (argv.length != 4) {
209            displayUsage(cmd);
210            return exitCode;
211          }
212          jobid = argv[1];
213          taskType = argv[2];
214          taskState = argv[3];
215          displayTasks = true;
216        } else if ("-logs".equals(cmd)) {
217          if (argv.length == 2 || argv.length ==3) {
218            logs = true;
219            jobid = argv[1];
220            if (argv.length == 3) {
221              taskid = argv[2];
222            }  else {
223              taskid = null;
224            }
225          } else {
226            displayUsage(cmd);
227            return exitCode;
228          }
229        } else {
230          displayUsage(cmd);
231          return exitCode;
232        }
233    
234        // initialize cluster
235        cluster = new Cluster(getConf());
236            
237        // Submit the request
238        try {
239          if (submitJobFile != null) {
240            Job job = Job.getInstance(new JobConf(submitJobFile));
241            job.submit();
242            System.out.println("Created job " + job.getJobID());
243            exitCode = 0;
244          } else if (getStatus) {
245            Job job = cluster.getJob(JobID.forName(jobid));
246            if (job == null) {
247              System.out.println("Could not find job " + jobid);
248            } else {
249              Counters counters = job.getCounters();
250              System.out.println();
251              System.out.println(job);
252              if (counters != null) {
253                System.out.println(counters);
254              } else {
255                System.out.println("Counters not available. Job is retired.");
256              }
257              exitCode = 0;
258            }
259          } else if (getCounter) {
260            Job job = cluster.getJob(JobID.forName(jobid));
261            if (job == null) {
262              System.out.println("Could not find job " + jobid);
263            } else {
264              Counters counters = job.getCounters();
265              if (counters == null) {
266                System.out.println("Counters not available for retired job " + 
267                jobid);
268                exitCode = -1;
269              } else {
270                System.out.println(getCounter(counters,
271                  counterGroupName, counterName));
272                exitCode = 0;
273              }
274            }
275          } else if (killJob) {
276            Job job = cluster.getJob(JobID.forName(jobid));
277            if (job == null) {
278              System.out.println("Could not find job " + jobid);
279            } else {
280              job.killJob();
281              System.out.println("Killed job " + jobid);
282              exitCode = 0;
283            }
284          } else if (setJobPriority) {
285            Job job = cluster.getJob(JobID.forName(jobid));
286            if (job == null) {
287              System.out.println("Could not find job " + jobid);
288            } else {
289              job.setPriority(jp);
290              System.out.println("Changed job priority.");
291              exitCode = 0;
292            } 
293          } else if (viewHistory) {
294            viewHistory(historyFile, viewAllHistory);
295            exitCode = 0;
296          } else if (listEvents) {
297            listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
298            exitCode = 0;
299          } else if (listJobs) {
300            listJobs(cluster);
301            exitCode = 0;
302          } else if (listAllJobs) {
303            listAllJobs(cluster);
304            exitCode = 0;
305          } else if (listActiveTrackers) {
306            listActiveTrackers(cluster);
307            exitCode = 0;
308          } else if (listBlacklistedTrackers) {
309            listBlacklistedTrackers(cluster);
310            exitCode = 0;
311          } else if (displayTasks) {
312            displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
313          } else if(killTask) {
314            TaskAttemptID taskID = TaskAttemptID.forName(taskid);
315            Job job = cluster.getJob(taskID.getJobID());
316            if (job == null) {
317              System.out.println("Could not find job " + jobid);
318            } else if (job.killTask(taskID)) {
319              System.out.println("Killed task " + taskid);
320              exitCode = 0;
321            } else {
322              System.out.println("Could not kill task " + taskid);
323              exitCode = -1;
324            }
325          } else if(failTask) {
326            TaskAttemptID taskID = TaskAttemptID.forName(taskid);
327            Job job = cluster.getJob(taskID.getJobID());
328            if (job == null) {
329                System.out.println("Could not find job " + jobid);
330            } else if(job.failTask(taskID)) {
331              System.out.println("Killed task " + taskID + " by failing it");
332              exitCode = 0;
333            } else {
334              System.out.println("Could not fail task " + taskid);
335              exitCode = -1;
336            }
337          } else if (logs) {
338            try {
339            JobID jobID = JobID.forName(jobid);
340            TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
341            LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
342            LogDumper logDumper = new LogDumper();
343            logDumper.setConf(getConf());
344            exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
345                logParams.getContainerId(), logParams.getNodeId(),
346                logParams.getOwner());
347            } catch (IOException e) {
348              if (e instanceof RemoteException) {
349                throw e;
350              } 
351              System.out.println(e.getMessage());
352            }
353          }
354        } catch (RemoteException re) {
355          IOException unwrappedException = re.unwrapRemoteException();
356          if (unwrappedException instanceof AccessControlException) {
357            System.out.println(unwrappedException.getMessage());
358          } else {
359            throw re;
360          }
361        } finally {
362          cluster.close();
363        }
364        return exitCode;
365      }
366    
367      private String getJobPriorityNames() {
368        StringBuffer sb = new StringBuffer();
369        for (JobPriority p : JobPriority.values()) {
370          sb.append(p.name()).append(" ");
371        }
372        return sb.substring(0, sb.length()-1);
373      }
374    
375      private String getTaskTypess() {
376        StringBuffer sb = new StringBuffer();
377        for (TaskType t : TaskType.values()) {
378          sb.append(t.name()).append(" ");
379        }
380        return sb.substring(0, sb.length()-1);
381      }
382    
383      /**
384       * Display usage of the command-line tool and terminate execution.
385       */
386      private void displayUsage(String cmd) {
387        String prefix = "Usage: CLI ";
388        String jobPriorityValues = getJobPriorityNames();
389        String taskTypes = getTaskTypess();
390        String taskStates = "running, completed";
391        if ("-submit".equals(cmd)) {
392          System.err.println(prefix + "[" + cmd + " <job-file>]");
393        } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
394          System.err.println(prefix + "[" + cmd + " <job-id>]");
395        } else if ("-counter".equals(cmd)) {
396          System.err.println(prefix + "[" + cmd + 
397            " <job-id> <group-name> <counter-name>]");
398        } else if ("-events".equals(cmd)) {
399          System.err.println(prefix + "[" + cmd + 
400            " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
401        } else if ("-history".equals(cmd)) {
402          System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
403        } else if ("-list".equals(cmd)) {
404          System.err.println(prefix + "[" + cmd + " [all]]");
405        } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
406          System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
407        } else if ("-set-priority".equals(cmd)) {
408          System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
409              "Valid values for priorities are: " 
410              + jobPriorityValues); 
411        } else if ("-list-active-trackers".equals(cmd)) {
412          System.err.println(prefix + "[" + cmd + "]");
413        } else if ("-list-blacklisted-trackers".equals(cmd)) {
414          System.err.println(prefix + "[" + cmd + "]");
415        } else if ("-list-attempt-ids".equals(cmd)) {
416          System.err.println(prefix + "[" + cmd + 
417              " <job-id> <task-type> <task-state>]. " +
418              "Valid values for <task-type> are " + taskTypes + ". " +
419              "Valid values for <task-state> are " + taskStates);
420        } else if ("-logs".equals(cmd)) {
421          System.err.println(prefix + "[" + cmd +
422              " <job-id> <task-attempt-id>]. " +
423              " <task-attempt-id> is optional to get task attempt logs.");      
424        } else {
425          System.err.printf(prefix + "<command> <args>%n");
426          System.err.printf("\t[-submit <job-file>]%n");
427          System.err.printf("\t[-status <job-id>]%n");
428          System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
429          System.err.printf("\t[-kill <job-id>]%n");
430          System.err.printf("\t[-set-priority <job-id> <priority>]. " +
431            "Valid values for priorities are: " + jobPriorityValues + "%n");
432          System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
433          System.err.printf("\t[-history <jobHistoryFile>]%n");
434          System.err.printf("\t[-list [all]]%n");
435          System.err.printf("\t[-list-active-trackers]%n");
436          System.err.printf("\t[-list-blacklisted-trackers]%n");
437          System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
438            "<task-state>]. " +
439            "Valid values for <task-type> are " + taskTypes + ". " +
440            "Valid values for <task-state> are " + taskStates);
441          System.err.printf("\t[-kill-task <task-attempt-id>]%n");
442          System.err.printf("\t[-fail-task <task-attempt-id>]%n");
443          System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n%n");
444          ToolRunner.printGenericCommandUsage(System.out);
445        }
446      }
447        
448      private void viewHistory(String historyFile, boolean all) 
449        throws IOException {
450        HistoryViewer historyViewer = new HistoryViewer(historyFile,
451                                            getConf(), all);
452        historyViewer.print();
453      }
454    
455      protected long getCounter(Counters counters, String counterGroupName,
456          String counterName) throws IOException {
457        return counters.findCounter(counterGroupName, counterName).getValue();
458      }
459      
460      /**
461       * List the events for the given job
462       * @param jobId the job id for the job's events to list
463       * @throws IOException
464       */
465      private void listEvents(Job job, int fromEventId, int numEvents)
466          throws IOException, InterruptedException {
467        TaskCompletionEvent[] events = job.
468          getTaskCompletionEvents(fromEventId, numEvents);
469        System.out.println("Task completion events for " + job.getJobID());
470        System.out.println("Number of events (from " + fromEventId + ") are: " 
471          + events.length);
472        for(TaskCompletionEvent event: events) {
473          System.out.println(event.getStatus() + " " + 
474            event.getTaskAttemptId() + " " + 
475            getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
476        }
477      }
478    
479      protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
480        return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
481      }
482      
483    
484      /**
485       * Dump a list of currently running jobs
486       * @throws IOException
487       */
488      private void listJobs(Cluster cluster) 
489          throws IOException, InterruptedException {
490        List<JobStatus> runningJobs = new ArrayList<JobStatus>();
491        for (JobStatus job : cluster.getAllJobStatuses()) {
492          if (!job.isJobComplete()) {
493            runningJobs.add(job);
494          }
495        }
496        displayJobList(runningJobs.toArray(new JobStatus[0]));
497      }
498        
499      /**
500       * Dump a list of all jobs submitted.
501       * @throws IOException
502       */
503      private void listAllJobs(Cluster cluster) 
504          throws IOException, InterruptedException {
505        displayJobList(cluster.getAllJobStatuses());
506      }
507      
508      /**
509       * Display the list of active trackers
510       */
511      private void listActiveTrackers(Cluster cluster) 
512          throws IOException, InterruptedException {
513        TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
514        for (TaskTrackerInfo tracker : trackers) {
515          System.out.println(tracker.getTaskTrackerName());
516        }
517      }
518    
519      /**
520       * Display the list of blacklisted trackers
521       */
522      private void listBlacklistedTrackers(Cluster cluster) 
523          throws IOException, InterruptedException {
524        TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
525        if (trackers.length > 0) {
526          System.out.println("BlackListedNode \t Reason");
527        }
528        for (TaskTrackerInfo tracker : trackers) {
529          System.out.println(tracker.getTaskTrackerName() + "\t" + 
530            tracker.getReasonForBlacklist());
531        }
532      }
533    
534      private void printTaskAttempts(TaskReport report) {
535        if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
536          System.out.println(report.getSuccessfulTaskAttemptId());
537        } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
538          for (TaskAttemptID t : 
539            report.getRunningTaskAttemptIds()) {
540            System.out.println(t);
541          }
542        }
543      }
544    
545      /**
546       * Display the information about a job's tasks, of a particular type and
547       * in a particular state
548       * 
549       * @param job the job
550       * @param type the type of the task (map/reduce/setup/cleanup)
551       * @param state the state of the task 
552       * (pending/running/completed/failed/killed)
553       */
554      protected void displayTasks(Job job, String type, String state) 
555      throws IOException, InterruptedException {
556        TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type));
557        for (TaskReport report : reports) {
558          TIPStatus status = report.getCurrentStatus();
559          if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
560              (state.equals("running") && status ==TIPStatus.RUNNING) ||
561              (state.equals("completed") && status == TIPStatus.COMPLETE) ||
562              (state.equals("failed") && status == TIPStatus.FAILED) ||
563              (state.equals("killed") && status == TIPStatus.KILLED)) {
564            printTaskAttempts(report);
565          }
566        }
567      }
568    
569      public void displayJobList(JobStatus[] jobs) 
570          throws IOException, InterruptedException {
571        displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out,
572            Charsets.UTF_8)));
573      }
574    
575      @Private
576      public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
577      @Private
578      public static String dataPattern   = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
579      private static String memPattern   = "%dM";
580      private static String UNAVAILABLE  = "N/A";
581    
582      @Private
583      public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
584        writer.println("Total jobs:" + jobs.length);
585        writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
586          "Queue", "Priority", "UsedContainers",
587          "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
588        for (JobStatus job : jobs) {
589          int numUsedSlots = job.getNumUsedSlots();
590          int numReservedSlots = job.getNumReservedSlots();
591          int usedMem = job.getUsedMem();
592          int rsvdMem = job.getReservedMem();
593          int neededMem = job.getNeededMem();
594          writer.printf(dataPattern,
595              job.getJobID().toString(), job.getState(), job.getStartTime(),
596              job.getUsername(), job.getQueue(), 
597              job.getPriority().name(),
598              numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
599              numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
600              usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
601              rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
602              neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
603              job.getSchedulingInfo());
604        }
605        writer.flush();
606      }
607      
608      public static void main(String[] argv) throws Exception {
609        int res = ToolRunner.run(new CLI(), argv);
610        System.exit(res);
611      }
612    }