Skip to content
This repository has been archived by the owner on Jul 7, 2022. It is now read-only.

Crail file/directory permissions can be set from the config file #10

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 85 additions & 79 deletions hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,57 @@

package org.apache.crail.hdfs;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;

import org.slf4j.Logger;
import org.apache.crail.CrailBlockLocation;
import org.apache.crail.CrailBufferedInputStream;
import org.apache.crail.CrailBufferedOutputStream;
import org.apache.crail.CrailDirectory;
import org.apache.crail.CrailStore;
import org.apache.crail.CrailFile;
import org.apache.crail.CrailLocationClass;
import org.apache.crail.CrailNode;
import org.apache.crail.CrailNodeType;
import org.apache.crail.CrailStorageClass;
import org.apache.crail.*;
import org.apache.crail.conf.CrailConfiguration;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.rpc.RpcErrors;
import org.apache.crail.utils.CrailUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;

public class CrailHDFS extends AbstractFileSystem {
private static final Logger LOG = CrailUtils.getLogger();
private CrailStore dfs;
private Path workingDir;

private FsPermission fileperm;
private FsPermission dirperm;

public CrailHDFS(final URI uri, final Configuration conf) throws IOException, URISyntaxException {
super(uri, "crail", true, 9000);

try {
CrailConfiguration crailConf = new CrailConfiguration();
String perm = crailConf.get("crail.hdfs.fileperm");
if (perm != null) {
fileperm = new FsPermission((short) Integer.parseInt(perm));
} else {
fileperm = FsPermission.getFileDefault();
}
perm = crailConf.get("crail.hdfs.dirperm");
if (perm != null) {
dirperm = new FsPermission((short) Integer.parseInt(perm));
} else {
dirperm = FsPermission.getDirDefault();
}
this.dfs = CrailStore.newInstance(crailConf);
Path _workingDir = new Path("/user/" + CrailConstants.USER);
this.workingDir = new Path("/user/" + CrailConstants.USER).makeQualified(uri, _workingDir);
LOG.info("CrailHDFS initialization done..");
} catch(Exception e){
} catch (Exception e) {
throw new IOException(e);
}
}
Expand All @@ -89,77 +81,77 @@ public int getUriDefaultPort() {

@Override
public FsServerDefaults getServerDefaults() throws IOException {
return new FsServerDefaults(CrailConstants.BLOCK_SIZE, 512, 64*1024, (short) 1, 4096, false, (long) 0, DataChecksum.Type.CRC32);
return new FsServerDefaults(CrailConstants.BLOCK_SIZE, 512, 64 * 1024, (short) 1, 4096, false, (long) 0, DataChecksum.Type.CRC32);
}

@Override
public FSDataOutputStream createInternal(Path path, EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, UnresolvedLinkException, IOException {
CrailFile fileInfo = null;
try {
fileInfo = dfs.create(path.toUri().getRawPath(), CrailNodeType.DATAFILE, CrailStorageClass.PARENT, CrailLocationClass.PARENT, true).get().asFile();
} catch(Exception e){
if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_PARENT_MISSING])){
} catch (Exception e) {
if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_PARENT_MISSING])) {
fileInfo = null;
} else {
throw new IOException(e);
}
}
if (fileInfo == null){

if (fileInfo == null) {
Path parent = path.getParent();
this.mkdir(parent, FsPermission.getDirDefault(), true);
this.mkdir(parent, dirperm, true);
try {
fileInfo = dfs.create(path.toUri().getRawPath(), CrailNodeType.DATAFILE, CrailStorageClass.PARENT, CrailLocationClass.PARENT, true).get().asFile();
} catch(Exception e){
} catch (Exception e) {
throw new IOException(e);
}
}

CrailBufferedOutputStream outputStream = null;
if (fileInfo != null){
if (fileInfo != null) {
try {
fileInfo.syncDir();
outputStream = fileInfo.getBufferedOutputStream(Integer.MAX_VALUE);
} catch (Exception e) {
throw new IOException(e);
}
}
} else {
throw new IOException("Failed to create file, path " + path.toString());
}
if (outputStream != null){
return new CrailHDFSOutputStream(outputStream, statistics);

if (outputStream != null) {
return new CrailHDFSOutputStream(outputStream, statistics);
} else {
throw new IOException("Failed to create file, path " + path.toString());
}
}
}

@Override
public void mkdir(Path path, FsPermission permission, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, UnresolvedLinkException, IOException {
try {
CrailDirectory file = dfs.create(path.toUri().getRawPath(), CrailNodeType.DIRECTORY, CrailStorageClass.PARENT, CrailLocationClass.DEFAULT, true).get().asDirectory();
file.syncDir();
} catch(Exception e){
if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_PARENT_MISSING])){
} catch (Exception e) {
if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_PARENT_MISSING])) {
Path parent = path.getParent();
mkdir(parent, permission, createParent);
mkdir(path, permission, createParent);
} else if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_FILE_EXISTS])){
} else if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_FILE_EXISTS])) {
} else {
throw new IOException(e);
}
}
}
}

@Override
public boolean delete(Path path, boolean recursive) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
try {
CrailNode file = dfs.delete(path.toUri().getRawPath(), recursive).get();
if (file != null){
if (file != null) {
file.syncDir();
}
return file != null;
} catch(Exception e){
} catch (Exception e) {
throw new IOException(e);
}
}
Expand All @@ -169,20 +161,20 @@ public FSDataInputStream open(Path path, int bufferSize) throws AccessControlExc
CrailFile fileInfo = null;
try {
fileInfo = dfs.lookup(path.toUri().getRawPath()).get().asFile();
} catch(Exception e){
} catch (Exception e) {
throw new IOException(e);
}

CrailBufferedInputStream inputStream = null;
if (fileInfo != null){
if (fileInfo != null) {
try {
inputStream = fileInfo.getBufferedInputStream(fileInfo.getCapacity());
} catch(Exception e){
} catch (Exception e) {
throw new IOException(e);
}
}
if (inputStream != null){

if (inputStream != null) {
return new CrailHDFSInputStream(inputStream);
} else {
throw new IOException("Failed to open file, path " + path.toString());
Expand All @@ -198,24 +190,37 @@ public boolean setReplication(Path f, short replication) throws AccessControlExc
public void renameInternal(Path src, Path dst) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnresolvedLinkException, IOException {
try {
CrailNode file = dfs.rename(src.toUri().getRawPath(), dst.toUri().getRawPath()).get();
if (file != null){
if (file != null) {
file.syncDir();
}
} catch(Exception e){
} catch (Exception e) {
throw new IOException(e);
}
}

@Override
public void setPermission(Path f, FsPermission permission) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
CrailNode directFile = null;
try {
directFile = dfs.lookup(f.toUri().getRawPath()).get();
} catch (Exception e) {
throw new IOException(e);
}
if (directFile.getType().isDirectory() && permission.toShort() != dirperm.toShort()) {
LOG.warn("Ignoring setPermission() for directory {} to {}", f, permission.toShort());
} else if (permission.toShort() != fileperm.toShort()) {
LOG.warn("Ignoring setPermission() for file {} to {}", f, permission.toShort());
}
}

@Override
public void setOwner(Path f, String username, String groupname) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
LOG.warn("Ignoring setOwner() for file {} (username = {}, groupname = {})", f, username, groupname);
}

@Override
public void setTimes(Path f, long mtime, long atime) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
LOG.warn("Ignoring setTimes() for file {} (mtime = {}, atime = {})", f, mtime, atime);
}

@Override
Expand All @@ -228,17 +233,19 @@ public FileStatus getFileStatus(Path path) throws AccessControlException, FileNo
CrailNode directFile = null;
try {
directFile = dfs.lookup(path.toUri().getRawPath()).get();
} catch(Exception e){
} catch (Exception e) {
throw new IOException(e);
}
if (directFile == null){
if (directFile == null) {
throw new FileNotFoundException("filename " + path);
}
FsPermission permission = FsPermission.getFileDefault();

FsPermission permission;
if (directFile.getType().isDirectory()) {
permission = FsPermission.getDirDefault();
}
permission = new FsPermission(dirperm);
} else {
permission = new FsPermission(fileperm);
}
FileStatus status = new FileStatus(directFile.getCapacity(), directFile.getType().isContainer(), CrailConstants.SHADOW_REPLICATION, CrailConstants.BLOCK_SIZE, directFile.getModificationTime(), directFile.getModificationTime(), permission, CrailConstants.USER, CrailConstants.USER, path.makeQualified(this.getUri(), this.workingDir));
return status;
}
Expand All @@ -248,17 +255,16 @@ public BlockLocation[] getFileBlockLocations(Path path, long start, long len) th
try {
CrailBlockLocation[] _locations = dfs.lookup(path.toUri().getRawPath()).get().asFile().getBlockLocations(start, len);
BlockLocation[] locations = new BlockLocation[_locations.length];
for (int i = 0; i < locations.length; i++){
for (int i = 0; i < locations.length; i++) {
locations[i] = new BlockLocation();
locations[i].setOffset(_locations[i].getOffset());
locations[i].setLength(_locations[i].getLength());
locations[i].setNames(_locations[i].getNames());
locations[i].setHosts(_locations[i].getHosts());
locations[i].setTopologyPaths(_locations[i].getTopology());

}
return locations;
} catch(Exception e){
} catch (Exception e) {
throw new IOException(e);
}
}
Expand All @@ -274,22 +280,22 @@ public FileStatus[] listStatus(Path path) throws AccessControlException, FileNot
CrailNode node = dfs.lookup(path.toUri().getRawPath()).get();
Iterator<String> iter = node.asContainer().listEntries();
ArrayList<FileStatus> statusList = new ArrayList<FileStatus>();
while(iter.hasNext()){
while (iter.hasNext()) {
String filepath = iter.next();
CrailNode directFile = dfs.lookup(filepath).get();
if (directFile != null){
FsPermission permission = FsPermission.getFileDefault();
if (directFile != null) {
FsPermission permission = new FsPermission(fileperm);
if (directFile.getType().isDirectory()) {
permission = FsPermission.getDirDefault();
permission = new FsPermission(dirperm);
}
FileStatus status = new FileStatus(directFile.getCapacity(), directFile.getType().isContainer(), CrailConstants.SHADOW_REPLICATION, CrailConstants.BLOCK_SIZE, directFile.getModificationTime(), directFile.getModificationTime(), permission, CrailConstants.USER, CrailConstants.USER, new Path(filepath).makeQualified(this.getUri(), workingDir));
FileStatus status = new FileStatus(directFile.getCapacity(), directFile.getType().isContainer(), CrailConstants.SHADOW_REPLICATION, CrailConstants.BLOCK_SIZE, directFile.getModificationTime(), directFile.getModificationTime(), permission, CrailConstants.USER, CrailConstants.USER, new Path(filepath).makeQualified(this.getUri(), workingDir));
statusList.add(status);
}
}
FileStatus[] list = new FileStatus[statusList.size()];
statusList.toArray(list);
return list;
} catch(Exception e){
} catch (Exception e) {
throw new FileNotFoundException(path.toUri().getRawPath());
}
}
Expand Down