Skip to content
This repository has been archived by the owner on Feb 10, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2 from Statflo/develop
Browse files Browse the repository at this point in the history
Merge develop into master
  • Loading branch information
FabioBatSilva authored Aug 15, 2018
2 parents 4fe1a64 + 51eb116 commit b513c8d
Show file tree
Hide file tree
Showing 16 changed files with 713 additions and 42 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ Map<String, ?> env = ImmutableMap.<String, Object> builder()
FileSystems.newFileSystem("s3:///", env, Thread.currentThread().getContextClassLoader());
```

##### Uploading Objects Using Multipart Upload API

By default s3fs will upload an object to s3 by calling `AmazonS3Client#putObject()`.
If you whant to upload file using the Multipart Upload API :

```java
Map<String, ?> env = ImmutableMap.<String, Object> builder()
.put(com.upplication.s3fs.AmazonS3Factory.MULTIPART_UPLOAD_ENABLED, "true")
.build()
FileSystems.newFileSystem("s3:///", env, Thread.currentThread().getContextClassLoader());
```

Complete settings lists:

* s3fs_access_key
Expand Down
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.upplication</groupId>
<artifactId>s3fs</artifactId>
<packaging>jar</packaging>
<version>2.2.2</version>
<version>2.3.0-SNAPSHOT</version>
<name>s3fs</name>
<description>S3 filesystem provider for Java 7</description>
<url>https://github.com/Upplication/Amazon-S3-FileSystem-NIO2</url>
Expand Down Expand Up @@ -62,6 +62,7 @@
<com.google.guava.guava.version>18.0</com.google.guava.guava.version>
<org.apache.tika.tika-core.version>1.5</org.apache.tika.tika-core.version>
<com.google.code.findbugs.jsr305.version>1.3.9</com.google.code.findbugs.jsr305.version>
<s3.stream.upload.version>1.0.1</s3.stream.upload.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -101,6 +102,11 @@
<artifactId>jsr305</artifactId>
<version>${com.google.code.findbugs.jsr305.version}</version>
</dependency>
<dependency>
<groupId>com.github.alexmojaki</groupId>
<artifactId>s3-stream-upload</artifactId>
<version>${s3.stream.upload.version}</version>
</dependency>

<!-- for testing -->
<dependency>
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/upplication/s3fs/S3FileChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.util.IOUtils;
import org.apache.tika.Tika;

import java.io.*;
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/com/upplication/s3fs/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.amazonaws.services.s3.model.Bucket;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.Properties;

/**
* S3FileSystem with a concrete client configured and ready to use.
Expand All @@ -24,16 +25,18 @@
public class S3FileSystem extends FileSystem implements Comparable<S3FileSystem> {

private final S3FileSystemProvider provider;
private final Properties properties;
private final String key;
private final AmazonS3 client;
private final String endpoint;
private int cache;

public S3FileSystem(S3FileSystemProvider provider, String key, AmazonS3 client, String endpoint) {
public S3FileSystem(S3FileSystemProvider provider, String key, AmazonS3 client, String endpoint, Properties props) {
this.provider = provider;
this.key = key;
this.client = client;
this.endpoint = endpoint;
this.properties = props;
this.cache = 60000; // 1 minute cache for the s3Path
}

Expand Down Expand Up @@ -176,4 +179,8 @@ public int compareTo(S3FileSystem o) {
public int getCache() {
return cache;
}

public Properties getProperties() {
return properties;
}
}
121 changes: 115 additions & 6 deletions src/main/java/com/upplication/s3fs/S3FileSystemProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectId;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand All @@ -31,9 +32,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.collect.Sets.difference;
import static com.upplication.s3fs.AmazonS3Factory.*;
import static java.lang.String.format;
import static java.lang.String.format;

/**
* Spec:
Expand Down Expand Up @@ -65,13 +70,21 @@
*/
public class S3FileSystemProvider extends FileSystemProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(S3FileSystemProvider.class);
public static final String CHARSET_KEY = "s3fs_charset";
public static final String AMAZON_S3_FACTORY_CLASS = "s3fs_amazon_s3_factory";
public static final String MULTIPART_UPLOAD_ENABLED = "s3fs_multipart_upload_enabled";
public static final String MULTIPART_UPLOAD_PART_SIZE = "s3fs_multipart_upload_part_size";
public static final String MULTIPART_UPLOAD_NUM_STREAMS = "s3fs_multipart_upload_num_streams";
public static final String MULTIPART_UPLOAD_QUEUE_CAPACITY = "s3fs_multipart_upload_queue_capacity";
public static final String MULTIPART_UPLOAD_NUM_UPLOAD_THREADS = "s3fs_multipart_upload_num_upload_threads";

private static final ConcurrentMap<String, S3FileSystem> fileSystems = new ConcurrentHashMap<>();
private static final List<String> PROPS_TO_OVERLOAD = Arrays.asList(ACCESS_KEY, SECRET_KEY, REQUEST_METRIC_COLLECTOR_CLASS, CONNECTION_TIMEOUT, MAX_CONNECTIONS, MAX_ERROR_RETRY, PROTOCOL, PROXY_DOMAIN,
PROXY_HOST, PROXY_PASSWORD, PROXY_PORT, PROXY_USERNAME, PROXY_WORKSTATION, SOCKET_SEND_BUFFER_SIZE_HINT, SOCKET_RECEIVE_BUFFER_SIZE_HINT, SOCKET_TIMEOUT,
USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS);
USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS,
MULTIPART_UPLOAD_ENABLED, MULTIPART_UPLOAD_PART_SIZE, MULTIPART_UPLOAD_NUM_STREAMS,
MULTIPART_UPLOAD_QUEUE_CAPACITY, MULTIPART_UPLOAD_NUM_UPLOAD_THREADS);

private S3Utils s3Utils = new S3Utils();
private Cache cache = new Cache();
Expand All @@ -95,6 +108,9 @@ public FileSystem newFileSystem(URI uri, Map<String, ?> env) {
// create the filesystem with the final properties, store and return
S3FileSystem fileSystem = createFileSystem(uri, props);
fileSystems.put(fileSystem.getKey(), fileSystem);

LOGGER.debug("New file system created. url:{}, props:{}", uri, props);

return fileSystem;
}

Expand Down Expand Up @@ -302,6 +318,8 @@ public Path getPath(URI uri) {

@Override
public DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter<? super Path> filter) throws IOException {
LOGGER.debug("New directory stream. path:{}, filter:{}", dir, filter);

final S3Path s3Path = toS3Path(dir);
return new DirectoryStream<Path>() {
@Override
Expand All @@ -316,8 +334,47 @@ public Iterator<Path> iterator() {
};
}

private S3MultipartUploadOutputStream createMultipartUploadOutputStream(final S3Path s3Path, Set<? extends OpenOption> opts) throws IOException {
final S3ObjectId objectId = s3Path.toS3ObjectId();
final Set<OpenOption> options = Sets.newHashSet(opts);
final S3FileSystem fileSystem = s3Path.getFileSystem();
final Properties properties = fileSystem.getProperties();
final AmazonS3 client = s3Path.getFileSystem().getClient();
final boolean createOpt = options.remove(StandardOpenOption.CREATE);
final boolean createNewOpt = options.remove(StandardOpenOption.CREATE_NEW);
final S3MultipartUploadOutputStream stream = new S3MultipartUploadOutputStream(client, objectId, properties);

// validate options
if (options.isEmpty()) {
return stream;
}

// Remove irrelevant/ignored options
options.remove(StandardOpenOption.WRITE);
options.remove(StandardOpenOption.SPARSE);
options.remove(StandardOpenOption.TRUNCATE_EXISTING);

if (!options.isEmpty()) {
throw new UnsupportedOperationException(format("Unsupported operation: %s", options));
}

if (createNewOpt && fileSystem.provider().exists(s3Path)) {
fileSystem.provider().delete(s3Path);
}

if (!createOpt && fileSystem.provider().exists(s3Path)) {
throw new FileAlreadyExistsException(format("Target already exists: %s", s3Path));
}

return stream;
}

@Override
public InputStream newInputStream(Path path, OpenOption... options) throws IOException {
LOGGER.debug("New input stream. path:{}, options:{}", path, options);

System.out.println("newInputStream");

S3Path s3Path = toS3Path(path);
String key = s3Path.getKey();

Expand All @@ -342,14 +399,46 @@ public InputStream newInputStream(Path path, OpenOption... options) throws IOExc

@Override
public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
S3Path s3Path = toS3Path(path);
return new S3SeekableByteChannel(s3Path, options);
LOGGER.debug("New byte channel. path:{}, options:{}", path, options);

final S3Path s3Path = toS3Path(path);
final boolean multipartEnabled = isMultipartUploadCapable(s3Path, options);

if (!multipartEnabled) {

LOGGER.debug("Using S3SeekableByteChannel");

return new S3SeekableByteChannel(s3Path, options);
}

LOGGER.debug("Using S3MultipartFileChannel");

final S3MultipartUploadOutputStream outputStream = createMultipartUploadOutputStream(s3Path, options);
final FileChannel channel = new S3MultipartUploadChannel(outputStream);

return channel;
}

@Override
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
S3Path s3Path = toS3Path(path);
return new S3FileChannel(s3Path, options);
LOGGER.debug("New file channel. path:{}, filter:{}", path, options);

final S3Path s3Path = toS3Path(path);
final boolean multipartEnabled = isMultipartUploadCapable(s3Path, options);

if (!multipartEnabled) {

LOGGER.debug("Using S3FileChannel");

return new S3FileChannel(s3Path, options);
}

LOGGER.debug("Using S3MultipartFileChannel");

final S3MultipartUploadOutputStream outputStream = createMultipartUploadOutputStream(s3Path, options);
final FileChannel channel = new S3MultipartUploadChannel(outputStream);

return channel;
}

/**
Expand All @@ -359,6 +448,8 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options,
*/
@Override
public void createDirectory(Path dir, FileAttribute<?>... attrs) throws IOException {
LOGGER.debug("Create directory. path:{}, attrs:{}", dir, attrs);

S3Path s3Path = toS3Path(dir);
Preconditions.checkArgument(attrs.length == 0, "attrs not yet supported: %s", ImmutableList.copyOf(attrs)); // TODO
if (exists(s3Path))
Expand All @@ -378,6 +469,8 @@ public void createDirectory(Path dir, FileAttribute<?>... attrs) throws IOExcept

@Override
public void delete(Path path) throws IOException {
LOGGER.debug("Delete path:{}", path);

S3Path s3Path = toS3Path(path);
if (Files.notExists(s3Path))
throw new NoSuchFileException("the path: " + this + " not exists");
Expand All @@ -393,6 +486,8 @@ public void delete(Path path) throws IOException {

@Override
public void copy(Path source, Path target, CopyOption... options) throws IOException {
LOGGER.debug("Copy {} to target. options:{}", source, target, options);

if (isSameFile(source, target))
return;

Expand Down Expand Up @@ -424,6 +519,8 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep

@Override
public void move(Path source, Path target, CopyOption... options) throws IOException {
LOGGER.debug("Move {} to target. options:{}", source, target, options);

if (options != null && Arrays.asList(options).contains(StandardCopyOption.ATOMIC_MOVE))
throw new AtomicMoveNotSupportedException(source.toString(), target.toString(), "Atomic not supported");
copy(source, target, options);
Expand Down Expand Up @@ -550,7 +647,7 @@ public void setAttribute(Path path, String attribute, Object value, LinkOption..
* @return S3FileSystem never null
*/
public S3FileSystem createFileSystem(URI uri, Properties props) {
return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost());
return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost(), props);
}

protected AmazonS3 getAmazonS3(URI uri, Properties props) {
Expand Down Expand Up @@ -634,4 +731,16 @@ public Cache getCache() {
public void setCache(Cache cache) {
this.cache = cache;
}

private boolean isMultipartUploadCapable(final S3Path s3Path, final Set<? extends OpenOption> options) {
// Not supported options
if (options.contains(StandardOpenOption.READ) || options.contains(StandardOpenOption.APPEND)) {
return false;
}

final S3FileSystem fileSystem = s3Path.getFileSystem();
final Properties properties = fileSystem.getProperties();

return Boolean.parseBoolean(properties.getProperty(MULTIPART_UPLOAD_ENABLED, "false"));
}
}
Loading

0 comments on commit b513c8d

Please sign in to comment.