001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.client; 020 021 import java.net.InetSocketAddress; 022 import java.util.ArrayList; 023 import java.util.List; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.hadoop.io.Text; 031 import org.apache.hadoop.ipc.RPC; 032 import org.apache.hadoop.yarn.api.ClientRMProtocol; 033 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; 034 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; 035 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; 036 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; 037 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; 038 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; 039 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; 040 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; 041 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; 042 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; 043 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; 044 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 045 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; 046 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; 047 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; 048 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; 049 import org.apache.hadoop.yarn.api.records.ApplicationId; 050 import org.apache.hadoop.yarn.api.records.ApplicationReport; 051 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 052 import org.apache.hadoop.yarn.api.records.DelegationToken; 053 import org.apache.hadoop.yarn.api.records.NodeReport; 054 import org.apache.hadoop.yarn.api.records.QueueInfo; 055 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; 056 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; 057 import org.apache.hadoop.yarn.conf.YarnConfiguration; 058 import org.apache.hadoop.yarn.exceptions.YarnRemoteException; 059 import org.apache.hadoop.yarn.ipc.YarnRPC; 060 import org.apache.hadoop.yarn.service.AbstractService; 061 import org.apache.hadoop.yarn.util.Records; 062 063 @InterfaceAudience.Public 064 @InterfaceStability.Evolving 065 public class YarnClientImpl extends AbstractService implements YarnClient { 066 067 private static final Log LOG = LogFactory.getLog(YarnClientImpl.class); 068 069 protected ClientRMProtocol rmClient; 070 protected InetSocketAddress rmAddress; 071 072 private static final String ROOT = "root"; 073 074 public YarnClientImpl() { 075 this(null); 076 } 077 078 public YarnClientImpl(InetSocketAddress rmAddress) { 079 super(YarnClientImpl.class.getName()); 080 this.rmAddress = rmAddress; 081 } 082 083 private static InetSocketAddress getRmAddress(Configuration conf) { 084 return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, 085 YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); 086 } 087 088 @Override 089 public synchronized void init(Configuration conf) { 090 if (this.rmAddress == null) { 091 this.rmAddress = getRmAddress(conf); 092 } 093 super.init(conf); 094 } 095 096 @Override 097 public synchronized void start() { 098 YarnRPC rpc = YarnRPC.create(getConfig()); 099 100 this.rmClient = (ClientRMProtocol) rpc.getProxy( 101 ClientRMProtocol.class, rmAddress, getConfig()); 102 if (LOG.isDebugEnabled()) { 103 LOG.debug("Connecting to ResourceManager at " + rmAddress); 104 } 105 super.start(); 106 } 107 108 @Override 109 public synchronized void stop() { 110 if (this.rmClient != null) { 111 RPC.stopProxy(this.rmClient); 112 } 113 super.stop(); 114 } 115 116 @Override 117 public GetNewApplicationResponse getNewApplication() 118 throws YarnRemoteException { 119 GetNewApplicationRequest request = 120 Records.newRecord(GetNewApplicationRequest.class); 121 return rmClient.getNewApplication(request); 122 } 123 124 @Override 125 public ApplicationId 126 submitApplication(ApplicationSubmissionContext appContext) 127 throws YarnRemoteException { 128 ApplicationId applicationId = appContext.getApplicationId(); 129 appContext.setApplicationId(applicationId); 130 SubmitApplicationRequest request = 131 Records.newRecord(SubmitApplicationRequest.class); 132 request.setApplicationSubmissionContext(appContext); 133 rmClient.submitApplication(request); 134 LOG.info("Submitted application " + applicationId + " to ResourceManager" 135 + " at " + rmAddress); 136 return applicationId; 137 } 138 139 @Override 140 public void killApplication(ApplicationId applicationId) 141 throws YarnRemoteException { 142 LOG.info("Killing application " + applicationId); 143 KillApplicationRequest request = 144 Records.newRecord(KillApplicationRequest.class); 145 request.setApplicationId(applicationId); 146 rmClient.forceKillApplication(request); 147 } 148 149 @Override 150 public ApplicationReport getApplicationReport(ApplicationId appId) 151 throws YarnRemoteException { 152 GetApplicationReportRequest request = 153 Records.newRecord(GetApplicationReportRequest.class); 154 request.setApplicationId(appId); 155 GetApplicationReportResponse response = 156 rmClient.getApplicationReport(request); 157 return response.getApplicationReport(); 158 } 159 160 @Override 161 public List<ApplicationReport> getApplicationList() 162 throws YarnRemoteException { 163 GetAllApplicationsRequest request = 164 Records.newRecord(GetAllApplicationsRequest.class); 165 GetAllApplicationsResponse response = rmClient.getAllApplications(request); 166 return response.getApplicationList(); 167 } 168 169 @Override 170 public YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException { 171 GetClusterMetricsRequest request = 172 Records.newRecord(GetClusterMetricsRequest.class); 173 GetClusterMetricsResponse response = rmClient.getClusterMetrics(request); 174 return response.getClusterMetrics(); 175 } 176 177 @Override 178 public List<NodeReport> getNodeReports() throws YarnRemoteException { 179 GetClusterNodesRequest request = 180 Records.newRecord(GetClusterNodesRequest.class); 181 GetClusterNodesResponse response = rmClient.getClusterNodes(request); 182 return response.getNodeReports(); 183 } 184 185 @Override 186 public DelegationToken getRMDelegationToken(Text renewer) 187 throws YarnRemoteException { 188 /* get the token from RM */ 189 GetDelegationTokenRequest rmDTRequest = 190 Records.newRecord(GetDelegationTokenRequest.class); 191 rmDTRequest.setRenewer(renewer.toString()); 192 GetDelegationTokenResponse response = 193 rmClient.getDelegationToken(rmDTRequest); 194 return response.getRMDelegationToken(); 195 } 196 197 198 private GetQueueInfoRequest 199 getQueueInfoRequest(String queueName, boolean includeApplications, 200 boolean includeChildQueues, boolean recursive) { 201 GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class); 202 request.setQueueName(queueName); 203 request.setIncludeApplications(includeApplications); 204 request.setIncludeChildQueues(includeChildQueues); 205 request.setRecursive(recursive); 206 return request; 207 } 208 209 @Override 210 public QueueInfo getQueueInfo(String queueName) throws YarnRemoteException { 211 GetQueueInfoRequest request = 212 getQueueInfoRequest(queueName, true, false, false); 213 Records.newRecord(GetQueueInfoRequest.class); 214 return rmClient.getQueueInfo(request).getQueueInfo(); 215 } 216 217 @Override 218 public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException { 219 GetQueueUserAclsInfoRequest request = 220 Records.newRecord(GetQueueUserAclsInfoRequest.class); 221 return rmClient.getQueueUserAcls(request).getUserAclsInfoList(); 222 } 223 224 @Override 225 public List<QueueInfo> getAllQueues() throws YarnRemoteException { 226 List<QueueInfo> queues = new ArrayList<QueueInfo>(); 227 228 QueueInfo rootQueue = 229 rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true)) 230 .getQueueInfo(); 231 getChildQueues(rootQueue, queues, true); 232 return queues; 233 } 234 235 @Override 236 public List<QueueInfo> getRootQueueInfos() throws YarnRemoteException { 237 List<QueueInfo> queues = new ArrayList<QueueInfo>(); 238 239 QueueInfo rootQueue = 240 rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true)) 241 .getQueueInfo(); 242 getChildQueues(rootQueue, queues, false); 243 return queues; 244 } 245 246 @Override 247 public List<QueueInfo> getChildQueueInfos(String parent) 248 throws YarnRemoteException { 249 List<QueueInfo> queues = new ArrayList<QueueInfo>(); 250 251 QueueInfo parentQueue = 252 rmClient.getQueueInfo(getQueueInfoRequest(parent, false, true, false)) 253 .getQueueInfo(); 254 getChildQueues(parentQueue, queues, true); 255 return queues; 256 } 257 258 private void getChildQueues(QueueInfo parent, List<QueueInfo> queues, 259 boolean recursive) { 260 List<QueueInfo> childQueues = parent.getChildQueues(); 261 262 for (QueueInfo child : childQueues) { 263 queues.add(child); 264 if (recursive) { 265 getChildQueues(child, queues, recursive); 266 } 267 } 268 } 269 }