001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.fs.s3; 019 020 import java.io.IOException; 021 import java.io.InputStream; 022 import java.io.UnsupportedEncodingException; 023 import java.net.URI; 024 import java.net.URLDecoder; 025 import java.net.URLEncoder; 026 import java.util.Set; 027 import java.util.TreeSet; 028 029 import org.apache.hadoop.classification.InterfaceAudience; 030 import org.apache.hadoop.classification.InterfaceStability; 031 import org.apache.hadoop.conf.Configured; 032 import org.apache.hadoop.fs.Path; 033 import org.apache.hadoop.util.Tool; 034 import org.apache.hadoop.util.ToolRunner; 035 import org.jets3t.service.S3Service; 036 import org.jets3t.service.S3ServiceException; 037 import org.jets3t.service.impl.rest.httpclient.RestS3Service; 038 import org.jets3t.service.model.S3Bucket; 039 import org.jets3t.service.model.S3Object; 040 import org.jets3t.service.security.AWSCredentials; 041 042 /** 043 * <p> 044 * This class is a tool for migrating data from an older to a newer version 045 * of an S3 filesystem. 046 * </p> 047 * <p> 048 * All files in the filesystem are migrated by re-writing the block metadata 049 * - no datafiles are touched. 050 * </p> 051 */ 052 @InterfaceAudience.Public 053 @InterfaceStability.Unstable 054 public class MigrationTool extends Configured implements Tool { 055 056 private S3Service s3Service; 057 private S3Bucket bucket; 058 059 public static void main(String[] args) throws Exception { 060 int res = ToolRunner.run(new MigrationTool(), args); 061 System.exit(res); 062 } 063 064 @Override 065 public int run(String[] args) throws Exception { 066 067 if (args.length == 0) { 068 System.err.println("Usage: MigrationTool <S3 file system URI>"); 069 System.err.println("\t<S3 file system URI>\tfilesystem to migrate"); 070 ToolRunner.printGenericCommandUsage(System.err); 071 return -1; 072 } 073 074 URI uri = URI.create(args[0]); 075 076 initialize(uri); 077 078 FileSystemStore newStore = new Jets3tFileSystemStore(); 079 newStore.initialize(uri, getConf()); 080 081 if (get("%2F") != null) { 082 System.err.println("Current version number is [unversioned]."); 083 System.err.println("Target version number is " + 084 newStore.getVersion() + "."); 085 Store oldStore = new UnversionedStore(); 086 migrate(oldStore, newStore); 087 return 0; 088 } else { 089 S3Object root = get("/"); 090 if (root != null) { 091 String version = (String) root.getMetadata("fs-version"); 092 if (version == null) { 093 System.err.println("Can't detect version - exiting."); 094 } else { 095 String newVersion = newStore.getVersion(); 096 System.err.println("Current version number is " + version + "."); 097 System.err.println("Target version number is " + newVersion + "."); 098 if (version.equals(newStore.getVersion())) { 099 System.err.println("No migration required."); 100 return 0; 101 } 102 // use version number to create Store 103 //Store oldStore = ... 104 //migrate(oldStore, newStore); 105 System.err.println("Not currently implemented."); 106 return 0; 107 } 108 } 109 System.err.println("Can't detect version - exiting."); 110 return 0; 111 } 112 113 } 114 115 public void initialize(URI uri) throws IOException { 116 117 118 119 try { 120 String accessKey = null; 121 String secretAccessKey = null; 122 String userInfo = uri.getUserInfo(); 123 if (userInfo != null) { 124 int index = userInfo.indexOf(':'); 125 if (index != -1) { 126 accessKey = userInfo.substring(0, index); 127 secretAccessKey = userInfo.substring(index + 1); 128 } else { 129 accessKey = userInfo; 130 } 131 } 132 if (accessKey == null) { 133 accessKey = getConf().get("fs.s3.awsAccessKeyId"); 134 } 135 if (secretAccessKey == null) { 136 secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey"); 137 } 138 if (accessKey == null && secretAccessKey == null) { 139 throw new IllegalArgumentException("AWS " + 140 "Access Key ID and Secret Access Key " + 141 "must be specified as the username " + 142 "or password (respectively) of a s3 URL, " + 143 "or by setting the " + 144 "fs.s3.awsAccessKeyId or " + 145 "fs.s3.awsSecretAccessKey properties (respectively)."); 146 } else if (accessKey == null) { 147 throw new IllegalArgumentException("AWS " + 148 "Access Key ID must be specified " + 149 "as the username of a s3 URL, or by setting the " + 150 "fs.s3.awsAccessKeyId property."); 151 } else if (secretAccessKey == null) { 152 throw new IllegalArgumentException("AWS " + 153 "Secret Access Key must be specified " + 154 "as the password of a s3 URL, or by setting the " + 155 "fs.s3.awsSecretAccessKey property."); 156 } 157 AWSCredentials awsCredentials = 158 new AWSCredentials(accessKey, secretAccessKey); 159 this.s3Service = new RestS3Service(awsCredentials); 160 } catch (S3ServiceException e) { 161 if (e.getCause() instanceof IOException) { 162 throw (IOException) e.getCause(); 163 } 164 throw new S3Exception(e); 165 } 166 bucket = new S3Bucket(uri.getHost()); 167 } 168 169 private void migrate(Store oldStore, FileSystemStore newStore) 170 throws IOException { 171 for (Path path : oldStore.listAllPaths()) { 172 INode inode = oldStore.retrieveINode(path); 173 oldStore.deleteINode(path); 174 newStore.storeINode(path, inode); 175 } 176 } 177 178 private S3Object get(String key) { 179 try { 180 return s3Service.getObject(bucket, key); 181 } catch (S3ServiceException e) { 182 if ("NoSuchKey".equals(e.getS3ErrorCode())) { 183 return null; 184 } 185 } 186 return null; 187 } 188 189 interface Store { 190 191 Set<Path> listAllPaths() throws IOException; 192 INode retrieveINode(Path path) throws IOException; 193 void deleteINode(Path path) throws IOException; 194 195 } 196 197 class UnversionedStore implements Store { 198 199 @Override 200 public Set<Path> listAllPaths() throws IOException { 201 try { 202 String prefix = urlEncode(Path.SEPARATOR); 203 S3Object[] objects = s3Service.listObjects(bucket, prefix, null); 204 Set<Path> prefixes = new TreeSet<Path>(); 205 for (int i = 0; i < objects.length; i++) { 206 prefixes.add(keyToPath(objects[i].getKey())); 207 } 208 return prefixes; 209 } catch (S3ServiceException e) { 210 if (e.getCause() instanceof IOException) { 211 throw (IOException) e.getCause(); 212 } 213 throw new S3Exception(e); 214 } 215 } 216 217 @Override 218 public void deleteINode(Path path) throws IOException { 219 delete(pathToKey(path)); 220 } 221 222 private void delete(String key) throws IOException { 223 try { 224 s3Service.deleteObject(bucket, key); 225 } catch (S3ServiceException e) { 226 if (e.getCause() instanceof IOException) { 227 throw (IOException) e.getCause(); 228 } 229 throw new S3Exception(e); 230 } 231 } 232 233 @Override 234 public INode retrieveINode(Path path) throws IOException { 235 return INode.deserialize(get(pathToKey(path))); 236 } 237 238 private InputStream get(String key) throws IOException { 239 try { 240 S3Object object = s3Service.getObject(bucket, key); 241 return object.getDataInputStream(); 242 } catch (S3ServiceException e) { 243 if ("NoSuchKey".equals(e.getS3ErrorCode())) { 244 return null; 245 } 246 if (e.getCause() instanceof IOException) { 247 throw (IOException) e.getCause(); 248 } 249 throw new S3Exception(e); 250 } 251 } 252 253 private String pathToKey(Path path) { 254 if (!path.isAbsolute()) { 255 throw new IllegalArgumentException("Path must be absolute: " + path); 256 } 257 return urlEncode(path.toUri().getPath()); 258 } 259 260 private Path keyToPath(String key) { 261 return new Path(urlDecode(key)); 262 } 263 264 private String urlEncode(String s) { 265 try { 266 return URLEncoder.encode(s, "UTF-8"); 267 } catch (UnsupportedEncodingException e) { 268 // Should never happen since every implementation of the Java Platform 269 // is required to support UTF-8. 270 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html 271 throw new IllegalStateException(e); 272 } 273 } 274 275 private String urlDecode(String s) { 276 try { 277 return URLDecoder.decode(s, "UTF-8"); 278 } catch (UnsupportedEncodingException e) { 279 // Should never happen since every implementation of the Java Platform 280 // is required to support UTF-8. 281 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html 282 throw new IllegalStateException(e); 283 } 284 } 285 286 } 287 288 }