001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce;
020    
021    import java.io.IOException;
022    import java.net.InetSocketAddress;
023    import java.security.PrivilegedExceptionAction;
024    import java.util.ArrayList;
025    import java.util.List;
026    import java.util.ServiceLoader;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.apache.hadoop.classification.InterfaceAudience;
031    import org.apache.hadoop.classification.InterfaceStability;
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.fs.Path;
035    import org.apache.hadoop.io.Text;
036    import org.apache.hadoop.mapred.JobConf;
037    import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038    import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
039    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
040    import org.apache.hadoop.mapreduce.util.ConfigUtil;
041    import org.apache.hadoop.mapreduce.v2.LogParams;
042    import org.apache.hadoop.security.UserGroupInformation;
043    import org.apache.hadoop.security.token.SecretManager.InvalidToken;
044    import org.apache.hadoop.security.token.Token;
045    
046    /**
047     * Provides a way to access information about the map/reduce cluster.
048     */
049    @InterfaceAudience.Public
050    @InterfaceStability.Evolving
051    public class Cluster {
052      
053      @InterfaceStability.Evolving
054      public static enum JobTrackerStatus {INITIALIZING, RUNNING};
055      
056      private ClientProtocolProvider clientProtocolProvider;
057      private ClientProtocol client;
058      private UserGroupInformation ugi;
059      private Configuration conf;
060      private FileSystem fs = null;
061      private Path sysDir = null;
062      private Path stagingAreaDir = null;
063      private Path jobHistoryDir = null;
064      private static final Log LOG = LogFactory.getLog(Cluster.class);
065    
066      private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
067          ServiceLoader.load(ClientProtocolProvider.class);
068      
069      static {
070        ConfigUtil.loadResources();
071      }
072      
073      public Cluster(Configuration conf) throws IOException {
074        this(null, conf);
075      }
076    
077      public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
078          throws IOException {
079        this.conf = conf;
080        this.ugi = UserGroupInformation.getCurrentUser();
081        initialize(jobTrackAddr, conf);
082      }
083      
084      private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
085          throws IOException {
086    
087        synchronized (frameworkLoader) {
088          for (ClientProtocolProvider provider : frameworkLoader) {
089            LOG.debug("Trying ClientProtocolProvider : "
090                + provider.getClass().getName());
091            ClientProtocol clientProtocol = null; 
092            try {
093              if (jobTrackAddr == null) {
094                clientProtocol = provider.create(conf);
095              } else {
096                clientProtocol = provider.create(jobTrackAddr, conf);
097              }
098    
099              if (clientProtocol != null) {
100                clientProtocolProvider = provider;
101                client = clientProtocol;
102                LOG.debug("Picked " + provider.getClass().getName()
103                    + " as the ClientProtocolProvider");
104                break;
105              }
106              else {
107                LOG.debug("Cannot pick " + provider.getClass().getName()
108                    + " as the ClientProtocolProvider - returned null protocol");
109              }
110            } 
111            catch (Exception e) {
112              LOG.info("Failed to use " + provider.getClass().getName()
113                  + " due to error: " + e.getMessage());
114            }
115          }
116        }
117    
118        if (null == clientProtocolProvider || null == client) {
119          throw new IOException(
120              "Cannot initialize Cluster. Please check your configuration for "
121                  + MRConfig.FRAMEWORK_NAME
122                  + " and the correspond server addresses.");
123        }
124      }
125    
126      ClientProtocol getClient() {
127        return client;
128      }
129      
130      Configuration getConf() {
131        return conf;
132      }
133      
134      /**
135       * Close the <code>Cluster</code>.
136       */
137      public synchronized void close() throws IOException {
138        clientProtocolProvider.close(client);
139      }
140    
141      private Job[] getJobs(JobStatus[] stats) throws IOException {
142        List<Job> jobs = new ArrayList<Job>();
143        for (JobStatus stat : stats) {
144          jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
145        }
146        return jobs.toArray(new Job[0]);
147      }
148    
149      /**
150       * Get the file system where job-specific files are stored
151       * 
152       * @return object of FileSystem
153       * @throws IOException
154       * @throws InterruptedException
155       */
156      public synchronized FileSystem getFileSystem() 
157          throws IOException, InterruptedException {
158        if (this.fs == null) {
159          try {
160            this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
161              public FileSystem run() throws IOException, InterruptedException {
162                final Path sysDir = new Path(client.getSystemDir());
163                return sysDir.getFileSystem(getConf());
164              }
165            });
166          } catch (InterruptedException e) {
167            throw new RuntimeException(e);
168          }
169        }
170        return fs;
171      }
172    
173      /**
174       * Get job corresponding to jobid.
175       * 
176       * @param jobId
177       * @return object of {@link Job}
178       * @throws IOException
179       * @throws InterruptedException
180       */
181      public Job getJob(JobID jobId) throws IOException, InterruptedException {
182        JobStatus status = client.getJobStatus(jobId);
183        if (status != null) {
184          return Job.getInstance(this, status, new JobConf(status.getJobFile()));
185        }
186        return null;
187      }
188      
189      /**
190       * Get all the queues in cluster.
191       * 
192       * @return array of {@link QueueInfo}
193       * @throws IOException
194       * @throws InterruptedException
195       */
196      public QueueInfo[] getQueues() throws IOException, InterruptedException {
197        return client.getQueues();
198      }
199      
200      /**
201       * Get queue information for the specified name.
202       * 
203       * @param name queuename
204       * @return object of {@link QueueInfo}
205       * @throws IOException
206       * @throws InterruptedException
207       */
208      public QueueInfo getQueue(String name) 
209          throws IOException, InterruptedException {
210        return client.getQueue(name);
211      }
212    
213      /**
214       * Get log parameters for the specified jobID or taskAttemptID
215       * @param jobID the job id.
216       * @param taskAttemptID the task attempt id. Optional.
217       * @return the LogParams
218       * @throws IOException
219       * @throws InterruptedException
220       */
221      public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
222          throws IOException, InterruptedException {
223        return client.getLogFileParams(jobID, taskAttemptID);
224      }
225    
226      /**
227       * Get current cluster status.
228       * 
229       * @return object of {@link ClusterMetrics}
230       * @throws IOException
231       * @throws InterruptedException
232       */
233      public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
234        return client.getClusterMetrics();
235      }
236      
237      /**
238       * Get all active trackers in the cluster.
239       * 
240       * @return array of {@link TaskTrackerInfo}
241       * @throws IOException
242       * @throws InterruptedException
243       */
244      public TaskTrackerInfo[] getActiveTaskTrackers() 
245          throws IOException, InterruptedException  {
246        return client.getActiveTrackers();
247      }
248      
249      /**
250       * Get blacklisted trackers.
251       * 
252       * @return array of {@link TaskTrackerInfo}
253       * @throws IOException
254       * @throws InterruptedException
255       */
256      public TaskTrackerInfo[] getBlackListedTaskTrackers() 
257          throws IOException, InterruptedException  {
258        return client.getBlacklistedTrackers();
259      }
260      
261      /**
262       * Get all the jobs in cluster.
263       * 
264       * @return array of {@link Job}
265       * @throws IOException
266       * @throws InterruptedException
267       * @deprecated Use {@link #getAllJobStatuses()} instead.
268       */
269      @Deprecated
270      public Job[] getAllJobs() throws IOException, InterruptedException {
271        return getJobs(client.getAllJobs());
272      }
273    
274      /**
275       * Get job status for all jobs in the cluster.
276       * @return job status for all jobs in cluster
277       * @throws IOException
278       * @throws InterruptedException
279       */
280      public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
281        return client.getAllJobs();
282      }
283    
284      /**
285       * Grab the jobtracker system directory path where 
286       * job-specific files will  be placed.
287       * 
288       * @return the system directory where job-specific files are to be placed.
289       */
290      public Path getSystemDir() throws IOException, InterruptedException {
291        if (sysDir == null) {
292          sysDir = new Path(client.getSystemDir());
293        }
294        return sysDir;
295      }
296      
297      /**
298       * Grab the jobtracker's view of the staging directory path where 
299       * job-specific files will  be placed.
300       * 
301       * @return the staging directory where job-specific files are to be placed.
302       */
303      public Path getStagingAreaDir() throws IOException, InterruptedException {
304        if (stagingAreaDir == null) {
305          stagingAreaDir = new Path(client.getStagingAreaDir());
306        }
307        return stagingAreaDir;
308      }
309    
310      /**
311       * Get the job history file path for a given job id. The job history file at 
312       * this path may or may not be existing depending on the job completion state.
313       * The file is present only for the completed jobs.
314       * @param jobId the JobID of the job submitted by the current user.
315       * @return the file path of the job history file
316       * @throws IOException
317       * @throws InterruptedException
318       */
319      public String getJobHistoryUrl(JobID jobId) throws IOException, 
320        InterruptedException {
321        if (jobHistoryDir == null) {
322          jobHistoryDir = new Path(client.getJobHistoryDir());
323        }
324        return new Path(jobHistoryDir, jobId.toString() + "_"
325                        + ugi.getShortUserName()).toString();
326      }
327    
328      /**
329       * Gets the Queue ACLs for current user
330       * @return array of QueueAclsInfo object for current user.
331       * @throws IOException
332       */
333      public QueueAclsInfo[] getQueueAclsForCurrentUser() 
334          throws IOException, InterruptedException  {
335        return client.getQueueAclsForCurrentUser();
336      }
337    
338      /**
339       * Gets the root level queues.
340       * @return array of JobQueueInfo object.
341       * @throws IOException
342       */
343      public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
344        return client.getRootQueues();
345      }
346      
347      /**
348       * Returns immediate children of queueName.
349       * @param queueName
350       * @return array of JobQueueInfo which are children of queueName
351       * @throws IOException
352       */
353      public QueueInfo[] getChildQueues(String queueName) 
354          throws IOException, InterruptedException {
355        return client.getChildQueues(queueName);
356      }
357      
358      /**
359       * Get the JobTracker's status.
360       * 
361       * @return {@link JobTrackerStatus} of the JobTracker
362       * @throws IOException
363       * @throws InterruptedException
364       */
365      public JobTrackerStatus getJobTrackerStatus() throws IOException,
366          InterruptedException {
367        return client.getJobTrackerStatus();
368      }
369      
370      /**
371       * Get the tasktracker expiry interval for the cluster
372       * @return the expiry interval in msec
373       */
374      public long getTaskTrackerExpiryInterval() throws IOException,
375          InterruptedException {
376        return client.getTaskTrackerExpiryInterval();
377      }
378    
379      /**
380       * Get a delegation token for the user from the JobTracker.
381       * @param renewer the user who can renew the token
382       * @return the new token
383       * @throws IOException
384       */
385      public Token<DelegationTokenIdentifier> 
386          getDelegationToken(Text renewer) throws IOException, InterruptedException{
387        // client has already set the service
388        return client.getDelegationToken(renewer);
389      }
390    
391      /**
392       * Renew a delegation token
393       * @param token the token to renew
394       * @return the new expiration time
395       * @throws InvalidToken
396       * @throws IOException
397       * @deprecated Use {@link Token#renew} instead
398       */
399      public long renewDelegationToken(Token<DelegationTokenIdentifier> token
400                                       ) throws InvalidToken, IOException,
401                                                InterruptedException {
402        return token.renew(getConf());
403      }
404    
405      /**
406       * Cancel a delegation token from the JobTracker
407       * @param token the token to cancel
408       * @throws IOException
409       * @deprecated Use {@link Token#cancel} instead
410       */
411      public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
412                                        ) throws IOException,
413                                                 InterruptedException {
414        token.cancel(getConf());
415      }
416    
417    }