001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce; 020 021 import java.io.IOException; 022 import java.net.InetSocketAddress; 023 import java.security.PrivilegedExceptionAction; 024 import java.util.ArrayList; 025 import java.util.List; 026 import java.util.ServiceLoader; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import org.apache.hadoop.classification.InterfaceAudience; 031 import org.apache.hadoop.classification.InterfaceStability; 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.hadoop.fs.FileSystem; 034 import org.apache.hadoop.fs.Path; 035 import org.apache.hadoop.io.Text; 036 import org.apache.hadoop.mapred.JobConf; 037 import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 038 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; 039 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 040 import org.apache.hadoop.mapreduce.util.ConfigUtil; 041 import org.apache.hadoop.mapreduce.v2.LogParams; 042 import org.apache.hadoop.security.UserGroupInformation; 043 import org.apache.hadoop.security.token.SecretManager.InvalidToken; 044 import org.apache.hadoop.security.token.Token; 045 046 /** 047 * Provides a way to access information about the map/reduce cluster. 048 */ 049 @InterfaceAudience.Public 050 @InterfaceStability.Evolving 051 public class Cluster { 052 053 @InterfaceStability.Evolving 054 public static enum JobTrackerStatus {INITIALIZING, RUNNING}; 055 056 private ClientProtocolProvider clientProtocolProvider; 057 private ClientProtocol client; 058 private UserGroupInformation ugi; 059 private Configuration conf; 060 private FileSystem fs = null; 061 private Path sysDir = null; 062 private Path stagingAreaDir = null; 063 private Path jobHistoryDir = null; 064 private static final Log LOG = LogFactory.getLog(Cluster.class); 065 066 private static ServiceLoader<ClientProtocolProvider> frameworkLoader = 067 ServiceLoader.load(ClientProtocolProvider.class); 068 069 static { 070 ConfigUtil.loadResources(); 071 } 072 073 public Cluster(Configuration conf) throws IOException { 074 this(null, conf); 075 } 076 077 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 078 throws IOException { 079 this.conf = conf; 080 this.ugi = UserGroupInformation.getCurrentUser(); 081 initialize(jobTrackAddr, conf); 082 } 083 084 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) 085 throws IOException { 086 087 synchronized (frameworkLoader) { 088 for (ClientProtocolProvider provider : frameworkLoader) { 089 LOG.debug("Trying ClientProtocolProvider : " 090 + provider.getClass().getName()); 091 ClientProtocol clientProtocol = null; 092 try { 093 if (jobTrackAddr == null) { 094 clientProtocol = provider.create(conf); 095 } else { 096 clientProtocol = provider.create(jobTrackAddr, conf); 097 } 098 099 if (clientProtocol != null) { 100 clientProtocolProvider = provider; 101 client = clientProtocol; 102 LOG.debug("Picked " + provider.getClass().getName() 103 + " as the ClientProtocolProvider"); 104 break; 105 } 106 else { 107 LOG.debug("Cannot pick " + provider.getClass().getName() 108 + " as the ClientProtocolProvider - returned null protocol"); 109 } 110 } 111 catch (Exception e) { 112 LOG.info("Failed to use " + provider.getClass().getName() 113 + " due to error: " + e.getMessage()); 114 } 115 } 116 } 117 118 if (null == clientProtocolProvider || null == client) { 119 throw new IOException( 120 "Cannot initialize Cluster. Please check your configuration for " 121 + MRConfig.FRAMEWORK_NAME 122 + " and the correspond server addresses."); 123 } 124 } 125 126 ClientProtocol getClient() { 127 return client; 128 } 129 130 Configuration getConf() { 131 return conf; 132 } 133 134 /** 135 * Close the <code>Cluster</code>. 136 */ 137 public synchronized void close() throws IOException { 138 clientProtocolProvider.close(client); 139 } 140 141 private Job[] getJobs(JobStatus[] stats) throws IOException { 142 List<Job> jobs = new ArrayList<Job>(); 143 for (JobStatus stat : stats) { 144 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile()))); 145 } 146 return jobs.toArray(new Job[0]); 147 } 148 149 /** 150 * Get the file system where job-specific files are stored 151 * 152 * @return object of FileSystem 153 * @throws IOException 154 * @throws InterruptedException 155 */ 156 public synchronized FileSystem getFileSystem() 157 throws IOException, InterruptedException { 158 if (this.fs == null) { 159 try { 160 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 161 public FileSystem run() throws IOException, InterruptedException { 162 final Path sysDir = new Path(client.getSystemDir()); 163 return sysDir.getFileSystem(getConf()); 164 } 165 }); 166 } catch (InterruptedException e) { 167 throw new RuntimeException(e); 168 } 169 } 170 return fs; 171 } 172 173 /** 174 * Get job corresponding to jobid. 175 * 176 * @param jobId 177 * @return object of {@link Job} 178 * @throws IOException 179 * @throws InterruptedException 180 */ 181 public Job getJob(JobID jobId) throws IOException, InterruptedException { 182 JobStatus status = client.getJobStatus(jobId); 183 if (status != null) { 184 return Job.getInstance(this, status, new JobConf(status.getJobFile())); 185 } 186 return null; 187 } 188 189 /** 190 * Get all the queues in cluster. 191 * 192 * @return array of {@link QueueInfo} 193 * @throws IOException 194 * @throws InterruptedException 195 */ 196 public QueueInfo[] getQueues() throws IOException, InterruptedException { 197 return client.getQueues(); 198 } 199 200 /** 201 * Get queue information for the specified name. 202 * 203 * @param name queuename 204 * @return object of {@link QueueInfo} 205 * @throws IOException 206 * @throws InterruptedException 207 */ 208 public QueueInfo getQueue(String name) 209 throws IOException, InterruptedException { 210 return client.getQueue(name); 211 } 212 213 /** 214 * Get log parameters for the specified jobID or taskAttemptID 215 * @param jobID the job id. 216 * @param taskAttemptID the task attempt id. Optional. 217 * @return the LogParams 218 * @throws IOException 219 * @throws InterruptedException 220 */ 221 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID) 222 throws IOException, InterruptedException { 223 return client.getLogFileParams(jobID, taskAttemptID); 224 } 225 226 /** 227 * Get current cluster status. 228 * 229 * @return object of {@link ClusterMetrics} 230 * @throws IOException 231 * @throws InterruptedException 232 */ 233 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException { 234 return client.getClusterMetrics(); 235 } 236 237 /** 238 * Get all active trackers in the cluster. 239 * 240 * @return array of {@link TaskTrackerInfo} 241 * @throws IOException 242 * @throws InterruptedException 243 */ 244 public TaskTrackerInfo[] getActiveTaskTrackers() 245 throws IOException, InterruptedException { 246 return client.getActiveTrackers(); 247 } 248 249 /** 250 * Get blacklisted trackers. 251 * 252 * @return array of {@link TaskTrackerInfo} 253 * @throws IOException 254 * @throws InterruptedException 255 */ 256 public TaskTrackerInfo[] getBlackListedTaskTrackers() 257 throws IOException, InterruptedException { 258 return client.getBlacklistedTrackers(); 259 } 260 261 /** 262 * Get all the jobs in cluster. 263 * 264 * @return array of {@link Job} 265 * @throws IOException 266 * @throws InterruptedException 267 * @deprecated Use {@link #getAllJobStatuses()} instead. 268 */ 269 @Deprecated 270 public Job[] getAllJobs() throws IOException, InterruptedException { 271 return getJobs(client.getAllJobs()); 272 } 273 274 /** 275 * Get job status for all jobs in the cluster. 276 * @return job status for all jobs in cluster 277 * @throws IOException 278 * @throws InterruptedException 279 */ 280 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException { 281 return client.getAllJobs(); 282 } 283 284 /** 285 * Grab the jobtracker system directory path where 286 * job-specific files will be placed. 287 * 288 * @return the system directory where job-specific files are to be placed. 289 */ 290 public Path getSystemDir() throws IOException, InterruptedException { 291 if (sysDir == null) { 292 sysDir = new Path(client.getSystemDir()); 293 } 294 return sysDir; 295 } 296 297 /** 298 * Grab the jobtracker's view of the staging directory path where 299 * job-specific files will be placed. 300 * 301 * @return the staging directory where job-specific files are to be placed. 302 */ 303 public Path getStagingAreaDir() throws IOException, InterruptedException { 304 if (stagingAreaDir == null) { 305 stagingAreaDir = new Path(client.getStagingAreaDir()); 306 } 307 return stagingAreaDir; 308 } 309 310 /** 311 * Get the job history file path for a given job id. The job history file at 312 * this path may or may not be existing depending on the job completion state. 313 * The file is present only for the completed jobs. 314 * @param jobId the JobID of the job submitted by the current user. 315 * @return the file path of the job history file 316 * @throws IOException 317 * @throws InterruptedException 318 */ 319 public String getJobHistoryUrl(JobID jobId) throws IOException, 320 InterruptedException { 321 if (jobHistoryDir == null) { 322 jobHistoryDir = new Path(client.getJobHistoryDir()); 323 } 324 return new Path(jobHistoryDir, jobId.toString() + "_" 325 + ugi.getShortUserName()).toString(); 326 } 327 328 /** 329 * Gets the Queue ACLs for current user 330 * @return array of QueueAclsInfo object for current user. 331 * @throws IOException 332 */ 333 public QueueAclsInfo[] getQueueAclsForCurrentUser() 334 throws IOException, InterruptedException { 335 return client.getQueueAclsForCurrentUser(); 336 } 337 338 /** 339 * Gets the root level queues. 340 * @return array of JobQueueInfo object. 341 * @throws IOException 342 */ 343 public QueueInfo[] getRootQueues() throws IOException, InterruptedException { 344 return client.getRootQueues(); 345 } 346 347 /** 348 * Returns immediate children of queueName. 349 * @param queueName 350 * @return array of JobQueueInfo which are children of queueName 351 * @throws IOException 352 */ 353 public QueueInfo[] getChildQueues(String queueName) 354 throws IOException, InterruptedException { 355 return client.getChildQueues(queueName); 356 } 357 358 /** 359 * Get the JobTracker's status. 360 * 361 * @return {@link JobTrackerStatus} of the JobTracker 362 * @throws IOException 363 * @throws InterruptedException 364 */ 365 public JobTrackerStatus getJobTrackerStatus() throws IOException, 366 InterruptedException { 367 return client.getJobTrackerStatus(); 368 } 369 370 /** 371 * Get the tasktracker expiry interval for the cluster 372 * @return the expiry interval in msec 373 */ 374 public long getTaskTrackerExpiryInterval() throws IOException, 375 InterruptedException { 376 return client.getTaskTrackerExpiryInterval(); 377 } 378 379 /** 380 * Get a delegation token for the user from the JobTracker. 381 * @param renewer the user who can renew the token 382 * @return the new token 383 * @throws IOException 384 */ 385 public Token<DelegationTokenIdentifier> 386 getDelegationToken(Text renewer) throws IOException, InterruptedException{ 387 // client has already set the service 388 return client.getDelegationToken(renewer); 389 } 390 391 /** 392 * Renew a delegation token 393 * @param token the token to renew 394 * @return the new expiration time 395 * @throws InvalidToken 396 * @throws IOException 397 * @deprecated Use {@link Token#renew} instead 398 */ 399 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 400 ) throws InvalidToken, IOException, 401 InterruptedException { 402 return token.renew(getConf()); 403 } 404 405 /** 406 * Cancel a delegation token from the JobTracker 407 * @param token the token to cancel 408 * @throws IOException 409 * @deprecated Use {@link Token#cancel} instead 410 */ 411 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 412 ) throws IOException, 413 InterruptedException { 414 token.cancel(getConf()); 415 } 416 417 }