001 /** 002 * 003 * Licensed under the Apache License, Version 2.0 004 * (the "License"); you may not use this file except in compliance with 005 * the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software 010 * distributed under the License is distributed on an "AS IS" BASIS, 011 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 012 * implied. See the License for the specific language governing 013 * permissions and limitations under the License. 014 * 015 * 016 * Implements the Hadoop FS interfaces to allow applications to store 017 *files in Kosmos File System (KFS). 018 */ 019 020 package org.apache.hadoop.fs.kfs; 021 022 import java.io.FileNotFoundException; 023 import java.io.IOException; 024 import java.net.URI; 025 026 import org.apache.hadoop.classification.InterfaceAudience; 027 import org.apache.hadoop.classification.InterfaceStability; 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.BlockLocation; 030 import org.apache.hadoop.fs.FSDataInputStream; 031 import org.apache.hadoop.fs.FSDataOutputStream; 032 import org.apache.hadoop.fs.FileStatus; 033 import org.apache.hadoop.fs.FileSystem; 034 import org.apache.hadoop.fs.FileUtil; 035 import org.apache.hadoop.fs.Path; 036 import org.apache.hadoop.fs.permission.FsPermission; 037 import org.apache.hadoop.util.Progressable; 038 039 /** 040 * A FileSystem backed by KFS. 041 * 042 */ 043 @InterfaceAudience.Public 044 @InterfaceStability.Stable 045 public class KosmosFileSystem extends FileSystem { 046 047 private FileSystem localFs; 048 private IFSImpl kfsImpl = null; 049 private URI uri; 050 private Path workingDir = new Path("/"); 051 052 public KosmosFileSystem() { 053 054 } 055 056 KosmosFileSystem(IFSImpl fsimpl) { 057 this.kfsImpl = fsimpl; 058 } 059 060 /** 061 * Return the protocol scheme for the FileSystem. 062 * <p/> 063 * 064 * @return <code>kfs</code> 065 */ 066 @Override 067 public String getScheme() { 068 return "kfs"; 069 } 070 071 @Override 072 public URI getUri() { 073 return uri; 074 } 075 076 @Override 077 public void initialize(URI uri, Configuration conf) throws IOException { 078 super.initialize(uri, conf); 079 try { 080 if (kfsImpl == null) { 081 if (uri.getHost() == null) { 082 kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""), 083 conf.getInt("fs.kfs.metaServerPort", -1), 084 statistics); 085 } else { 086 kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics); 087 } 088 } 089 090 this.localFs = FileSystem.getLocal(conf); 091 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 092 this.workingDir = new Path("/user", System.getProperty("user.name") 093 ).makeQualified(this); 094 setConf(conf); 095 096 } catch (Exception e) { 097 e.printStackTrace(); 098 System.out.println("Unable to initialize KFS"); 099 System.exit(-1); 100 } 101 } 102 103 @Override 104 public Path getWorkingDirectory() { 105 return workingDir; 106 } 107 108 @Override 109 public void setWorkingDirectory(Path dir) { 110 workingDir = makeAbsolute(dir); 111 } 112 113 private Path makeAbsolute(Path path) { 114 if (path.isAbsolute()) { 115 return path; 116 } 117 return new Path(workingDir, path); 118 } 119 120 @Override 121 public boolean mkdirs(Path path, FsPermission permission 122 ) throws IOException { 123 Path absolute = makeAbsolute(path); 124 String srep = absolute.toUri().getPath(); 125 126 int res; 127 128 // System.out.println("Calling mkdirs on: " + srep); 129 130 res = kfsImpl.mkdirs(srep); 131 132 return res == 0; 133 } 134 135 @Override 136 public boolean isDirectory(Path path) throws IOException { 137 Path absolute = makeAbsolute(path); 138 String srep = absolute.toUri().getPath(); 139 140 // System.out.println("Calling isdir on: " + srep); 141 142 return kfsImpl.isDirectory(srep); 143 } 144 145 @Override 146 public boolean isFile(Path path) throws IOException { 147 Path absolute = makeAbsolute(path); 148 String srep = absolute.toUri().getPath(); 149 return kfsImpl.isFile(srep); 150 } 151 152 @Override 153 public FileStatus[] listStatus(Path path) throws IOException { 154 Path absolute = makeAbsolute(path); 155 String srep = absolute.toUri().getPath(); 156 157 if(!kfsImpl.exists(srep)) 158 throw new FileNotFoundException("File " + path + " does not exist."); 159 160 if (kfsImpl.isFile(srep)) 161 return new FileStatus[] { getFileStatus(path) } ; 162 163 return kfsImpl.readdirplus(absolute); 164 } 165 166 @Override 167 public FileStatus getFileStatus(Path path) throws IOException { 168 Path absolute = makeAbsolute(path); 169 String srep = absolute.toUri().getPath(); 170 if (!kfsImpl.exists(srep)) { 171 throw new FileNotFoundException("File " + path + " does not exist."); 172 } 173 if (kfsImpl.isDirectory(srep)) { 174 // System.out.println("Status of path: " + path + " is dir"); 175 return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep), 176 path.makeQualified(this)); 177 } else { 178 // System.out.println("Status of path: " + path + " is file"); 179 return new FileStatus(kfsImpl.filesize(srep), false, 180 kfsImpl.getReplication(srep), 181 getDefaultBlockSize(), 182 kfsImpl.getModificationTime(srep), 183 path.makeQualified(this)); 184 } 185 } 186 187 @Override 188 public FSDataOutputStream append(Path f, int bufferSize, 189 Progressable progress) throws IOException { 190 Path parent = f.getParent(); 191 if (parent != null && !mkdirs(parent)) { 192 throw new IOException("Mkdirs failed to create " + parent); 193 } 194 195 Path absolute = makeAbsolute(f); 196 String srep = absolute.toUri().getPath(); 197 198 return kfsImpl.append(srep, bufferSize, progress); 199 } 200 201 @Override 202 public FSDataOutputStream create(Path file, FsPermission permission, 203 boolean overwrite, int bufferSize, 204 short replication, long blockSize, Progressable progress) 205 throws IOException { 206 207 if (exists(file)) { 208 if (overwrite) { 209 delete(file, true); 210 } else { 211 throw new IOException("File already exists: " + file); 212 } 213 } 214 215 Path parent = file.getParent(); 216 if (parent != null && !mkdirs(parent)) { 217 throw new IOException("Mkdirs failed to create " + parent); 218 } 219 220 Path absolute = makeAbsolute(file); 221 String srep = absolute.toUri().getPath(); 222 223 return kfsImpl.create(srep, replication, bufferSize, progress); 224 } 225 226 @Override 227 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 228 if (!exists(path)) 229 throw new IOException("File does not exist: " + path); 230 231 Path absolute = makeAbsolute(path); 232 String srep = absolute.toUri().getPath(); 233 234 return kfsImpl.open(srep, bufferSize); 235 } 236 237 @Override 238 public boolean rename(Path src, Path dst) throws IOException { 239 Path absoluteS = makeAbsolute(src); 240 String srepS = absoluteS.toUri().getPath(); 241 Path absoluteD = makeAbsolute(dst); 242 String srepD = absoluteD.toUri().getPath(); 243 244 // System.out.println("Calling rename on: " + srepS + " -> " + srepD); 245 246 return kfsImpl.rename(srepS, srepD) == 0; 247 } 248 249 // recursively delete the directory and its contents 250 @Override 251 public boolean delete(Path path, boolean recursive) throws IOException { 252 Path absolute = makeAbsolute(path); 253 String srep = absolute.toUri().getPath(); 254 if (kfsImpl.isFile(srep)) 255 return kfsImpl.remove(srep) == 0; 256 257 FileStatus[] dirEntries = listStatus(absolute); 258 if (!recursive && (dirEntries.length != 0)) { 259 throw new IOException("Directory " + path.toString() + 260 " is not empty."); 261 } 262 263 for (int i = 0; i < dirEntries.length; i++) { 264 delete(new Path(absolute, dirEntries[i].getPath()), recursive); 265 } 266 return kfsImpl.rmdir(srep) == 0; 267 } 268 269 @Override 270 public short getDefaultReplication() { 271 return 3; 272 } 273 274 @Override 275 public boolean setReplication(Path path, short replication) 276 throws IOException { 277 278 Path absolute = makeAbsolute(path); 279 String srep = absolute.toUri().getPath(); 280 281 int res = kfsImpl.setReplication(srep, replication); 282 return res >= 0; 283 } 284 285 // 64MB is the KFS block size 286 287 @Override 288 public long getDefaultBlockSize() { 289 return 1 << 26; 290 } 291 292 @Deprecated 293 public void lock(Path path, boolean shared) throws IOException { 294 295 } 296 297 @Deprecated 298 public void release(Path path) throws IOException { 299 300 } 301 302 /** 303 * Return null if the file doesn't exist; otherwise, get the 304 * locations of the various chunks of the file file from KFS. 305 */ 306 @Override 307 public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 308 long len) throws IOException { 309 310 if (file == null) { 311 return null; 312 } 313 String srep = makeAbsolute(file.getPath()).toUri().getPath(); 314 String[][] hints = kfsImpl.getDataLocation(srep, start, len); 315 if (hints == null) { 316 return null; 317 } 318 BlockLocation[] result = new BlockLocation[hints.length]; 319 long blockSize = getDefaultBlockSize(); 320 long length = len; 321 long blockStart = start; 322 for(int i=0; i < result.length; ++i) { 323 result[i] = new BlockLocation(null, hints[i], blockStart, 324 length < blockSize ? length : blockSize); 325 blockStart += blockSize; 326 length -= blockSize; 327 } 328 return result; 329 } 330 331 @Override 332 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException { 333 FileUtil.copy(localFs, src, this, dst, delSrc, getConf()); 334 } 335 336 @Override 337 public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException { 338 FileUtil.copy(this, src, localFs, dst, delSrc, getConf()); 339 } 340 341 @Override 342 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 343 throws IOException { 344 return tmpLocalFile; 345 } 346 347 @Override 348 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 349 throws IOException { 350 moveFromLocalFile(tmpLocalFile, fsOutputFile); 351 } 352 }