diff --git a/build.gradle b/build.gradle index 569cd91..fbf82f2 100644 --- a/build.gradle +++ b/build.gradle @@ -223,31 +223,31 @@ gemPush { } test { - jvmArgs '-XX:MaxPermSize=128M', '-Xmx2048m' + jvmArgs "-XX:MaxPermSize=128M", "-Xmx2048m" testLogging { - outputs.upToDateWhen { false } + events "passed", "skipped", "failed", "standardOut", "standardError" exceptionFormat = org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL showCauses = true showExceptions = true showStackTraces = true showStandardStreams = true - events "passed", "skipped", "failed", "standardOut", "standardError" + outputs.upToDateWhen { false } } } -checkstyle { - configFile = file("${project.rootDir}/config/checkstyle/checkstyle.xml") - toolVersion = '6.14.1' -} -checkstyleMain { - configFile = file("${project.rootDir}/config/checkstyle/default.xml") - ignoreFailures = false +tasks.withType(Checkstyle) { + reports { + // Not to skip up-to-date checkstyles. + outputs.upToDateWhen { false } + } } -checkstyleTest { - configFile = file("${project.rootDir}/config/checkstyle/default.xml") + +checkstyle { + toolVersion = libs.versions.checkstyle.get() + configFile = file("${rootProject.projectDir}/config/checkstyle/checkstyle.xml") + configProperties = [ + "org.checkstyle.google.suppressionfilter.config": file("${rootProject.projectDir}/config/checkstyle/checkstyle-suppressions.xml"), + ] ignoreFailures = false -} -task checkstyle(type: Checkstyle) { - classpath = sourceSets.main.output + sourceSets.test.output - source = sourceSets.main.allJava + sourceSets.test.allJava + maxWarnings = 0 } diff --git a/config/checkstyle/README.md b/config/checkstyle/README.md new file mode 100644 index 0000000..403c736 --- /dev/null +++ b/config/checkstyle/README.md @@ -0,0 +1,11 @@ +Checkstyle for the Embulk project +================================== + +* google_check.xml: Downloaded from: https://github.com/checkstyle/checkstyle/blob/checkstyle-9.3/src/main/resources/google_checks.xml + * Commit: 5c1903792f8432243cc8ae5cd79a03a004d3c09c +* checkstyle.xml: Customized from google_check.xml. + * To enable suppressions through checkstyle-suppressions.xml. + * To enable suppressions with @SuppressWarnings. + * To indent with 4-column spaces. + * To limit columns to 180 characters. + * To reject unused imports. diff --git a/config/checkstyle/checkstyle-suppressions.xml b/config/checkstyle/checkstyle-suppressions.xml new file mode 100644 index 0000000..aefd4d6 --- /dev/null +++ b/config/checkstyle/checkstyle-suppressions.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml index 3c8d926..c70b0c4 100644 --- a/config/checkstyle/checkstyle.xml +++ b/config/checkstyle/checkstyle.xml @@ -1,130 +1,368 @@ - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN" + "https://checkstyle.org/dtds/configuration_1_3.dtd"> + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + - - - - - - - - - - - - + - - - - - - - - - - - - - - - + + + + + - - - - - - - - - + + + + + - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/config/checkstyle/default.xml b/config/checkstyle/default.xml deleted file mode 100644 index 6367c22..0000000 --- a/config/checkstyle/default.xml +++ /dev/null @@ -1,110 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/config/checkstyle/google_checks.xml b/config/checkstyle/google_checks.xml new file mode 100644 index 0000000..515a844 --- /dev/null +++ b/config/checkstyle/google_checks.xml @@ -0,0 +1,364 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 47dedf5..9824d0a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -23,6 +23,8 @@ embulk-formatter-csv = "0.10.36" embulk-output-file = "0.10.36" embulk-parser-csv = "0.10.36" +checkstyle = "9.3" + [libraries] embulk-spi = { group = "org.embulk", name = "embulk-spi", version.ref = "embulk-spi" } diff --git a/src/main/java/org/embulk/input/gcs/AuthUtils.java b/src/main/java/org/embulk/input/gcs/AuthUtils.java index 913f22f..14655da 100644 --- a/src/main/java/org/embulk/input/gcs/AuthUtils.java +++ b/src/main/java/org/embulk/input/gcs/AuthUtils.java @@ -7,11 +7,6 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; -import org.embulk.config.ConfigException; -import org.embulk.util.config.Config; -import org.embulk.util.config.ConfigDefault; -import org.embulk.util.config.units.LocalFile; - import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -20,16 +15,17 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Optional; +import org.embulk.config.ConfigException; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; +import org.embulk.util.config.units.LocalFile; -class AuthUtils -{ - public enum AuthMethod - { +class AuthUtils { + public enum AuthMethod { private_key, compute_engine, json_key } - interface Task - { + interface Task { @Config("auth_method") @ConfigDefault("\"private_key\"") AuthUtils.AuthMethod getAuthMethod(); @@ -46,6 +42,7 @@ interface Task @Config("p12_keyfile") @ConfigDefault("null") Optional getP12Keyfile(); + void setP12Keyfile(Optional p12Keyfile); @Config("json_keyfile") @@ -53,12 +50,10 @@ interface Task Optional getJsonKeyfile(); } - private AuthUtils() - { + private AuthUtils() { } - static Storage newClient(final PluginTask task) - { + static Storage newClient(final PluginTask task) { try { final StorageOptions.Builder builder = StorageOptions.newBuilder(); switch (task.getAuthMethod()) { @@ -76,14 +71,12 @@ static Storage newClient(final PluginTask task) final Storage client = builder.build().getService(); client.list(task.getBucket(), Storage.BlobListOption.pageSize(1)); return client; - } - catch (StorageException | IOException | GeneralSecurityException e) { + } catch (final StorageException | IOException | GeneralSecurityException e) { throw new ConfigException(e); } } - static Credentials fromP12(final Task task) throws IOException, GeneralSecurityException - { + static Credentials fromP12(final Task task) throws IOException, GeneralSecurityException { final String path = task.getP12Keyfile().get().getPath().toString(); try (final InputStream p12InputStream = new FileInputStream(path)) { final PrivateKey pk = SecurityUtils.loadPrivateKeyFromKeyStore( @@ -99,8 +92,7 @@ static Credentials fromP12(final Task task) throws IOException, GeneralSecurityE } } - static Credentials fromJson(final Task task) throws IOException - { + static Credentials fromJson(final Task task) throws IOException { final String path = task.getJsonKeyfile().map(f -> f.getPath().toString()).get(); final InputStream jsonStream = new FileInputStream(path); return ServiceAccountCredentials.fromStream(jsonStream); diff --git a/src/main/java/org/embulk/input/gcs/FileList.java b/src/main/java/org/embulk/input/gcs/FileList.java index b6290e2..4dec82a 100644 --- a/src/main/java/org/embulk/input/gcs/FileList.java +++ b/src/main/java/org/embulk/input/gcs/FileList.java @@ -3,10 +3,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import org.embulk.config.ConfigSource; -import org.embulk.util.config.Config; -import org.embulk.util.config.ConfigDefault; - import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; @@ -23,11 +19,12 @@ import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import org.embulk.config.ConfigSource; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; -public class FileList -{ - public interface Task - { +public class FileList { + public interface Task { @Config("path_match_pattern") @ConfigDefault("\".*\"") String getPathMatchPattern(); @@ -42,35 +39,30 @@ public interface Task long getMinTaskSize(); } - public static class Entry - { + public static class Entry { private int index; private long size; @JsonCreator public Entry( - @JsonProperty("index") int index, - @JsonProperty("size") long size) - { + @JsonProperty("index") final int index, + @JsonProperty("size") final long size) { this.index = index; this.size = size; } @JsonProperty("index") - public int getIndex() - { + public int getIndex() { return index; } @JsonProperty("size") - public long getSize() - { + public long getSize() { return size; } } - public static class Builder - { + public static class Builder { private final ByteArrayOutputStream binary; private final OutputStream stream; private final List entries = new ArrayList<>(); @@ -82,64 +74,54 @@ public static class Builder private final ByteBuffer castBuffer = ByteBuffer.allocate(4); - public Builder(Task task) - { + public Builder(final Task task) { this(); this.pathMatchPattern = Pattern.compile(task.getPathMatchPattern()); this.limitCount = task.getTotalFileCountLimit(); this.minTaskSize = task.getMinTaskSize(); } - public Builder(ConfigSource config) - { + public Builder(final ConfigSource config) { this(); this.pathMatchPattern = Pattern.compile(config.get(String.class, "path_match_pattern", ".*")); this.limitCount = config.get(int.class, "total_file_count_limit", Integer.MAX_VALUE); this.minTaskSize = config.get(long.class, "min_task_size", 0L); } - public Builder() - { + public Builder() { binary = new ByteArrayOutputStream(); try { stream = new BufferedOutputStream(new GZIPOutputStream(binary)); - } - catch (IOException ex) { + } catch (final IOException ex) { throw new RuntimeException(ex); } } - public Builder limitTotalFileCount(int limitCount) - { + public Builder limitTotalFileCount(final int limitCount) { this.limitCount = limitCount; return this; } - public Builder minTaskSize(long bytes) - { + public Builder minTaskSize(final long bytes) { this.minTaskSize = bytes; return this; } - public synchronized Builder pathMatchPattern(String pattern) - { + public synchronized Builder pathMatchPattern(final String pattern) { this.pathMatchPattern = Pattern.compile(pattern); return this; } - public int size() - { + public int size() { return entries.size(); } - public boolean needsMore() - { + public boolean needsMore() { return size() < limitCount; } // returns true if this file is used - public synchronized boolean add(String path, long size) - { + public synchronized boolean add(final String path, final long size) { // TODO throw IllegalStateException if stream is already closed if (!needsMore()) { @@ -158,8 +140,7 @@ public synchronized boolean add(String path, long size) try { stream.write(castBuffer.array()); stream.write(data); - } - catch (IOException ex) { + } catch (final IOException ex) { throw new RuntimeException(ex); } @@ -167,19 +148,16 @@ public synchronized boolean add(String path, long size) return true; } - public synchronized FileList build() - { + public synchronized FileList build() { try { stream.close(); - } - catch (IOException ex) { + } catch (final IOException ex) { throw new RuntimeException(ex); } return new FileList(binary.toByteArray(), getSplits(entries), Optional.ofNullable(last)); } - private List> getSplits(List all) - { + private List> getSplits(final List all) { List> tasks = new ArrayList<>(); long currentTaskSize = 0; List currentTask = new ArrayList<>(); @@ -206,18 +184,16 @@ private List> getSplits(List all) @JsonCreator @Deprecated public FileList( - @JsonProperty("data") byte[] data, - @JsonProperty("tasks") List> tasks, - @JsonProperty("last") Optional last) - { + @JsonProperty("data") final byte[] data, + @JsonProperty("tasks") final List> tasks, + @JsonProperty("last") final Optional last) { this.data = data.clone(); this.tasks = tasks; this.last = last; } @JsonIgnore - public Optional getLastPath(Optional lastLastPath) - { + public Optional getLastPath(final Optional lastLastPath) { if (last.isPresent()) { return last; } @@ -225,41 +201,34 @@ public Optional getLastPath(Optional lastLastPath) } @JsonIgnore - public int getTaskCount() - { + public int getTaskCount() { return tasks.size(); } @JsonIgnore - public List get(int i) - { + public List get(final int i) { return new EntryList(data, tasks.get(i)); } @JsonProperty("data") @Deprecated - public byte[] getData() - { + public byte[] getData() { return data.clone(); } @JsonProperty("tasks") @Deprecated - public List> getTasks() - { + public List> getTasks() { return tasks; } @JsonProperty("last") @Deprecated - public Optional getLast() - { + public Optional getLast() { return last; } - private static class EntryList - extends AbstractList - { + private static class EntryList extends AbstractList { private final byte[] data; private final List entries; private InputStream stream; @@ -267,30 +236,26 @@ private static class EntryList private final ByteBuffer castBuffer = ByteBuffer.allocate(4); - public EntryList(byte[] data, List entries) - { + public EntryList(final byte[] data, final List entries) { this.data = data; this.entries = entries; try { this.stream = new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(data))); - } - catch (IOException ex) { + } catch (final IOException ex) { throw new RuntimeException(ex); } this.current = 0; } @Override - public synchronized String get(int i) - { + public synchronized String get(final int i) { Entry e = entries.get(i); if (e.getIndex() < current) { // rewind to the head try { stream.close(); stream = new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(data))); - } - catch (IOException ex) { + } catch (final IOException ex) { throw new RuntimeException(ex); } current = 0; @@ -304,13 +269,11 @@ public synchronized String get(int i) } @Override - public int size() - { + public int size() { return entries.size(); } - private byte[] readNext() - { + private byte[] readNext() { try { int n = stream.read(castBuffer.array()); if (n != castBuffer.capacity()) { @@ -328,14 +291,12 @@ private byte[] readNext() current++; return b; - } - catch (IOException ex) { + } catch (final IOException ex) { throw new RuntimeException(ex); } } - private String readNextString() - { + private String readNextString() { return new String(readNext(), StandardCharsets.UTF_8); } } diff --git a/src/main/java/org/embulk/input/gcs/GcsFileInput.java b/src/main/java/org/embulk/input/gcs/GcsFileInput.java index 60ef7ab..5466617 100644 --- a/src/main/java/org/embulk/input/gcs/GcsFileInput.java +++ b/src/main/java/org/embulk/input/gcs/GcsFileInput.java @@ -1,5 +1,7 @@ package org.embulk.input.gcs; +import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER_FACTORY; + import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; @@ -14,31 +16,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER_FACTORY; - -public class GcsFileInput - extends InputStreamFileInput - implements TransactionalFileInput -{ +public class GcsFileInput extends InputStreamFileInput implements TransactionalFileInput { private static final Logger LOG = LoggerFactory.getLogger(org.embulk.input.gcs.GcsFileInput.class); - GcsFileInput(PluginTask task, int taskIndex) - { + GcsFileInput(final PluginTask task, final int taskIndex) { super(Exec.getBufferAllocator(), new SingleFileProvider(task, taskIndex)); } - public void abort() - { + public void abort() { } - public TaskReport commit() - { + public TaskReport commit() { return CONFIG_MAPPER_FACTORY.newTaskReport(); } @Override - public void close() - { + public void close() { } /** @@ -46,8 +39,7 @@ public void close() * * The resulting list does not include the file that's size == 0. */ - static FileList listFiles(PluginTask task) - { + static FileList listFiles(final PluginTask task) { Storage client = AuthUtils.newClient(task); String bucket = task.getBucket(); @@ -70,8 +62,7 @@ static FileList listFiles(PluginTask task) LOG.debug("filename: {}", blob.getName()); LOG.debug("updated: {}", blob.getUpdateTime()); } - } - catch (RuntimeException e) { + } catch (final RuntimeException e) { if ((e instanceof StorageException) && ((StorageException) e).getCode() == 400) { throw new ConfigException(String.format("Files listing failed: bucket:%s, prefix:%s, last_path:%s", bucket, prefix, lastKey), e); } @@ -83,8 +74,7 @@ static FileList listFiles(PluginTask task) } // String nextToken = base64Encode(0x0a + ASCII character according to utf8EncodeLength position+ filePath); - static String base64Encode(String path) - { + static String base64Encode(final String path) { byte[] encoding; byte[] utf8 = path.getBytes(StandardCharsets.UTF_8); LOG.debug("path string: {} ,path length:{} \" + ", path, utf8.length); @@ -107,8 +97,7 @@ static String base64Encode(String path) return s; } - private static void printBucketInfo(Storage client, String bucket) - { + private static void printBucketInfo(final Storage client, final String bucket) { // get Bucket Storage.BucketGetOption fields = Storage.BucketGetOption.fields( Storage.BucketField.LOCATION, diff --git a/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java b/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java index faf5266..d74cb2c 100644 --- a/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java @@ -1,5 +1,8 @@ package org.embulk.input.gcs; +import java.io.IOException; +import java.util.List; +import java.util.Optional; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; @@ -14,22 +17,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; -import java.util.Optional; - -public class GcsFileInputPlugin - implements FileInputPlugin -{ - public static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder() - .addDefaultModules().build(); +public class GcsFileInputPlugin implements FileInputPlugin { + public static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build(); public static final ConfigMapper CONFIG_MAPPER = CONFIG_MAPPER_FACTORY.createConfigMapper(); public static final TaskMapper TASK_MAPPER = CONFIG_MAPPER_FACTORY.createTaskMapper(); private static final Logger logger = LoggerFactory.getLogger(GcsFileInputPlugin.class); + @Override - public ConfigDiff transaction(ConfigSource config, - FileInputPlugin.Control control) - { + public ConfigDiff transaction(final ConfigSource config, final FileInputPlugin.Control control) { PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class); if (task.getP12KeyfileFullpath().isPresent()) { @@ -38,8 +33,7 @@ public ConfigDiff transaction(ConfigSource config, } try { task.setP12Keyfile(Optional.of(LocalFile.of(task.getP12KeyfileFullpath().get()))); - } - catch (IOException ex) { + } catch (final IOException ex) { throw new RuntimeException(ex); } } @@ -48,8 +42,7 @@ public ConfigDiff transaction(ConfigSource config, if (!task.getJsonKeyfile().isPresent()) { throw new ConfigException("If auth_method is json_key, you have to set json_keyfile"); } - } - else if (AuthUtils.AuthMethod.private_key.equals(task.getAuthMethod())) { + } else if (AuthUtils.AuthMethod.private_key.equals(task.getAuthMethod())) { if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) { throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile"); } @@ -68,8 +61,7 @@ else if (AuthUtils.AuthMethod.private_key.equals(task.getAuthMethod())) { if (task.getFiles().getTaskCount() == 0) { logger.info("No file is found in the path(s) identified by path_prefix"); } - } - else { + } else { if (task.getPathFiles().isEmpty()) { throw new ConfigException("No file is found. Confirm paths option isn't empty"); } @@ -84,10 +76,10 @@ else if (AuthUtils.AuthMethod.private_key.equals(task.getAuthMethod())) { } @Override - public ConfigDiff resume(TaskSource taskSource, - int taskCount, - FileInputPlugin.Control control) - { + public ConfigDiff resume( + final TaskSource taskSource, + final int taskCount, + final FileInputPlugin.Control control) { PluginTask task = TASK_MAPPER.map(taskSource, PluginTask.class); control.run(taskSource, taskCount); @@ -102,15 +94,14 @@ public ConfigDiff resume(TaskSource taskSource, } @Override - public void cleanup(TaskSource taskSource, - int taskCount, - List successTaskReports) - { + public void cleanup( + final TaskSource taskSource, + final int taskCount, + final List successTaskReports) { } @Override - public TransactionalFileInput open(TaskSource taskSource, int taskIndex) - { + public TransactionalFileInput open(final TaskSource taskSource, final int taskIndex) { PluginTask task = TASK_MAPPER.map(taskSource, PluginTask.class); return new GcsFileInput(task, taskIndex); } diff --git a/src/main/java/org/embulk/input/gcs/PluginTask.java b/src/main/java/org/embulk/input/gcs/PluginTask.java index a064e10..3d3265a 100644 --- a/src/main/java/org/embulk/input/gcs/PluginTask.java +++ b/src/main/java/org/embulk/input/gcs/PluginTask.java @@ -1,15 +1,12 @@ package org.embulk.input.gcs; +import java.util.List; +import java.util.Optional; import org.embulk.util.config.Config; import org.embulk.util.config.ConfigDefault; import org.embulk.util.config.Task; -import java.util.List; -import java.util.Optional; - -public interface PluginTask - extends Task, AuthUtils.Task, FileList.Task, RetryUtils.Task -{ +public interface PluginTask extends Task, AuthUtils.Task, FileList.Task, RetryUtils.Task { @Config("bucket") String getBucket(); @@ -34,5 +31,6 @@ public interface PluginTask List getPathFiles(); FileList getFiles(); + void setFiles(FileList files); } diff --git a/src/main/java/org/embulk/input/gcs/RetryUtils.java b/src/main/java/org/embulk/input/gcs/RetryUtils.java index 082ea09..b7918b9 100644 --- a/src/main/java/org/embulk/input/gcs/RetryUtils.java +++ b/src/main/java/org/embulk/input/gcs/RetryUtils.java @@ -5,6 +5,8 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; +import java.util.Optional; +import java.util.function.Predicate; import org.embulk.util.config.Config; import org.embulk.util.config.ConfigDefault; import org.embulk.util.retryhelper.RetryExecutor; @@ -13,13 +15,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Optional; -import java.util.function.Predicate; - -class RetryUtils -{ - interface Task extends org.embulk.util.config.Task - { +class RetryUtils { + interface Task extends org.embulk.util.config.Task { @Config("max_connection_retry") @ConfigDefault("10") // 10 times retry to connect GCS server if failed. int getMaxConnectionRetry(); @@ -33,8 +30,7 @@ interface Task extends org.embulk.util.config.Task int getMaximumRetryIntervalMillis(); } - private RetryUtils() - { + private RetryUtils() { } private static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class); @@ -75,57 +71,41 @@ private RetryUtils() /** * A default (abstract) retryable impl, which makes use of above 2 predicates * With default behaviors onRetry, etc. - * - * @param */ - public abstract static class DefaultRetryable implements Retryable - { + public abstract static class DefaultRetryable implements Retryable { @Override - public boolean isRetryableException(Exception exception) - { + public boolean isRetryableException(final Exception exception) { if (exception instanceof GoogleJsonResponseException) { return API_ERROR_NOT_RETRY_4XX.test((GoogleJsonResponseException) exception); - } - else if (exception instanceof TokenResponseException) { + } else if (exception instanceof TokenResponseException) { return TOKEN_ERROR_NOT_RETRY_4XX.test((TokenResponseException) exception); } return true; } @Override - public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) - { + public void onRetry(final Exception exception, final int retryCount, final int retryLimit, final int retryWait) { String message = String.format("GCS GET request failed. Retrying %d/%d after %d seconds. Message: %s: %s", retryCount, retryLimit, retryWait / 1000, exception.getClass(), exception.getMessage()); if (retryCount % 3 == 0) { LOG.warn(message, exception); - } - else { + } else { LOG.warn(message); } } @Override - public void onGiveup(Exception firstException, Exception lastException) - { + public void onGiveup(final Exception firstException, final Exception lastException) { } } /** * Return Blob GET op that is ready for {@code withRetry} - * - * @param client - * @param bucket - * @param key - * @return */ - static DefaultRetryable get(Storage client, String bucket, String key) - { - return new DefaultRetryable() - { + static DefaultRetryable get(final Storage client, final String bucket, final String key) { + return new DefaultRetryable() { @Override - public Blob call() - { + public Blob call() { return client.get(bucket, key); } }; @@ -133,14 +113,8 @@ public Blob call() /** * Utility method - * - * @param task - * @param op - * @param - * @return */ - static T withRetry(Task task, Retryable op) - { + static T withRetry(final Task task, final Retryable op) { try { return RetryExecutor.builder() .withInitialRetryWaitMillis(task.getInitialRetryIntervalMillis()) @@ -148,8 +122,7 @@ static T withRetry(Task task, Retryable op) .withRetryLimit(task.getMaxConnectionRetry()) .build() .runInterruptible(op); - } - catch (RetryGiveupException | InterruptedException e) { + } catch (final RetryGiveupException | InterruptedException e) { throw new RuntimeException(e); } } diff --git a/src/main/java/org/embulk/input/gcs/SingleFileProvider.java b/src/main/java/org/embulk/input/gcs/SingleFileProvider.java index cdaea03..7d55096 100644 --- a/src/main/java/org/embulk/input/gcs/SingleFileProvider.java +++ b/src/main/java/org/embulk/input/gcs/SingleFileProvider.java @@ -2,36 +2,29 @@ import com.google.cloud.ReadChannel; import com.google.cloud.storage.Storage; -import org.embulk.util.file.InputStreamFileInput; -import org.embulk.util.file.ResumableInputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.io.InputStream; import java.nio.channels.Channels; import java.util.Iterator; +import org.embulk.util.file.InputStreamFileInput; +import org.embulk.util.file.ResumableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static java.lang.String.format; - -public class SingleFileProvider - implements InputStreamFileInput.Provider -{ +public class SingleFileProvider implements InputStreamFileInput.Provider { private final Storage client; private final String bucket; private final Iterator iterator; private boolean opened = false; - SingleFileProvider(PluginTask task, int taskIndex) - { + SingleFileProvider(final PluginTask task, final int taskIndex) { this.client = AuthUtils.newClient(task); this.bucket = task.getBucket(); this.iterator = task.getFiles().get(taskIndex).iterator(); } @Override - public InputStreamFileInput.InputStreamWithHints openNextWithHints() - { + public InputStreamFileInput.InputStreamWithHints openNextWithHints() { if (opened) { return null; } @@ -48,29 +41,24 @@ public InputStreamFileInput.InputStreamWithHints openNextWithHints() } @Override - public void close() - { + public void close() { } - static class InputStreamReopener - implements ResumableInputStream.Reopener - { + static class InputStreamReopener implements ResumableInputStream.Reopener { private Logger logger = LoggerFactory.getLogger(getClass()); private final Storage client; private final String bucket; private final String key; - InputStreamReopener(Storage client, String bucket, String key) - { + InputStreamReopener(final Storage client, final String bucket, final String key) { this.client = client; this.bucket = bucket; this.key = key; } @Override - public InputStream reopen(long offset, Exception closedCause) throws IOException - { - logger.warn(format("GCS read failed. Retrying GET request with %,d bytes offset", offset), closedCause); + public InputStream reopen(final long offset, final Exception closedCause) throws IOException { + logger.warn(String.format("GCS read failed. Retrying GET request with %,d bytes offset", offset), closedCause); ReadChannel ch = client.get(bucket, key).reader(); ch.seek(offset); return Channels.newInputStream(ch); diff --git a/src/test/java/org/embulk/input/gcs/TestAuthUtils.java b/src/test/java/org/embulk/input/gcs/TestAuthUtils.java index 3423ff9..0792b60 100644 --- a/src/test/java/org/embulk/input/gcs/TestAuthUtils.java +++ b/src/test/java/org/embulk/input/gcs/TestAuthUtils.java @@ -1,6 +1,17 @@ package org.embulk.input.gcs; +import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER; +import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER_FACTORY; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNotNull; + import com.google.auth.Credentials; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Path; +import java.security.GeneralSecurityException; +import java.util.Base64; +import java.util.Optional; import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; import org.embulk.test.EmbulkTestRuntime; @@ -10,20 +21,8 @@ import org.junit.Rule; import org.junit.Test; import org.mockito.Mockito; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.file.Path; -import java.security.GeneralSecurityException; -import java.util.Base64; -import java.util.Optional; - -import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER; -import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER_FACTORY; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeNotNull; -public class TestAuthUtils -{ +public class TestAuthUtils { private static Optional GCP_EMAIL; private static Optional GCP_P12_KEYFILE; private static Optional GCP_JSON_KEYFILE; @@ -38,8 +37,7 @@ public class TestAuthUtils * GCP_BUCKET */ @BeforeClass - public static void initializeConstant() - { + public static void initializeConstant() { String gcpEmail = System.getenv("GCP_EMAIL"); String gcpP12KeyFile = System.getenv("GCP_PRIVATE_KEYFILE"); String gcpJsonKeyFile = System.getenv("GCP_JSON_KEYFILE"); @@ -59,22 +57,18 @@ public static void initializeConstant() private ConfigSource config; @Before - public void setUp() - { + public void setUp() { config = config(); } @Test - public void testGetServiceAccountCredentialSuccess() throws IOException, GeneralSecurityException - { + public void testGetServiceAccountCredentialSuccess() throws IOException, GeneralSecurityException { PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class); assertTrue(AuthUtils.fromP12(task) instanceof Credentials); } @Test(expected = FileNotFoundException.class) - public void testGetServiceAccountCredentialThrowFileNotFoundException() - throws IOException, GeneralSecurityException - { + public void testGetServiceAccountCredentialThrowFileNotFoundException() throws IOException, GeneralSecurityException { Path mockNotFound = Mockito.mock(Path.class); Mockito.when(mockNotFound.toString()).thenReturn("/path/to/notfound.p12"); LocalFile p12File = Mockito.mock(LocalFile.class); @@ -86,31 +80,25 @@ public void testGetServiceAccountCredentialThrowFileNotFoundException() } @Test - public void testGetGcsClientUsingServiceAccountCredentialSuccess() - { + public void testGetGcsClientUsingServiceAccountCredentialSuccess() { PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class); assertTrue(AuthUtils.newClient(task) instanceof com.google.cloud.storage.Storage); } @Test(expected = ConfigException.class) - public void testGetGcsClientUsingServiceAccountCredentialThrowJsonResponseException() - { + public void testGetGcsClientUsingServiceAccountCredentialThrowJsonResponseException() { PluginTask task = CONFIG_MAPPER.map(config.set("bucket", "non-exists-bucket"), PluginTask.class); AuthUtils.newClient(task); } @Test - public void testGetServiceAccountCredentialFromJsonFileSuccess() - throws IOException - { + public void testGetServiceAccountCredentialFromJsonFileSuccess() throws IOException { PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class); assertTrue(AuthUtils.fromJson(task) instanceof Credentials); } @Test(expected = FileNotFoundException.class) - public void testGetServiceAccountCredentialFromJsonThrowFileFileNotFoundException() - throws IOException - { + public void testGetServiceAccountCredentialFromJsonThrowFileFileNotFoundException() throws IOException { Path mockNotFound = Mockito.mock(Path.class); Mockito.when(mockNotFound.toString()).thenReturn("/path/to/notfound.json"); LocalFile jsonFile = Mockito.mock(LocalFile.class); @@ -122,22 +110,19 @@ public void testGetServiceAccountCredentialFromJsonThrowFileFileNotFoundExceptio } @Test - public void testGetServiceAccountCredentialFromJsonSuccess() - { + public void testGetServiceAccountCredentialFromJsonSuccess() { PluginTask task = CONFIG_MAPPER.map(config.set("auth_method", AuthUtils.AuthMethod.json_key), PluginTask.class); assertTrue(AuthUtils.newClient(task) instanceof com.google.cloud.storage.Storage); } @Test(expected = ConfigException.class) - public void testGetServiceAccountCredentialFromJsonThrowGoogleJsonResponseException() - { + public void testGetServiceAccountCredentialFromJsonThrowGoogleJsonResponseException() { PluginTask task = CONFIG_MAPPER.map(config.set("auth_method", AuthUtils.AuthMethod.json_key) .set("bucket", "non-exists-bucket"), PluginTask.class); AuthUtils.newClient(task); } - private ConfigSource config() - { + private ConfigSource config() { byte[] keyBytes = Base64.getDecoder().decode(GCP_P12_KEYFILE.get()); Optional p12Key = Optional.of(LocalFile.ofContent(keyBytes)); Optional jsonKey = Optional.of(LocalFile.ofContent(GCP_JSON_KEYFILE.get().getBytes())); diff --git a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java index 62830bf..a57776c 100644 --- a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java +++ b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java @@ -1,8 +1,26 @@ package org.embulk.input.gcs; +import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER; +import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER_FACTORY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNotNull; + import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.Optional; +import java.util.Properties; import org.embulk.EmbulkSystemProperties; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; @@ -25,27 +43,7 @@ import org.junit.Rule; import org.junit.Test; -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Base64; -import java.util.List; -import java.util.Optional; -import java.util.Properties; - -import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER; -import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER_FACTORY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeNotNull; - -public class TestGcsFileInputPlugin -{ +public class TestGcsFileInputPlugin { private static Optional GCP_EMAIL; private static Optional GCP_P12_KEYFILE; private static Optional GCP_JSON_KEYFILE; @@ -60,6 +58,7 @@ public class TestGcsFileInputPlugin properties.setProperty("default_guess_plugins", "gzip,bzip2,json,csv"); EMBULK_SYSTEM_PROPERTIES = EmbulkSystemProperties.of(properties); } + /* * This test case requires environment variables * GCP_EMAIL @@ -68,8 +67,7 @@ public class TestGcsFileInputPlugin * GCP_BUCKET */ @BeforeClass - public static void initializeConstant() - { + public static void initializeConstant() { String gcpEmail = System.getenv("GCP_EMAIL"); String gcpP12KeyFile = System.getenv("GCP_PRIVATE_KEYFILE"); String gcpJsonKeyFile = System.getenv("GCP_JSON_KEYFILE"); @@ -102,15 +100,13 @@ public static void initializeConstant() private GcsFileInputPlugin plugin; @Before - public void createResources() - { + public void createResources() { config = config(); plugin = new GcsFileInputPlugin(); } @Test - public void checkDefaultValues() - { + public void checkDefaultValues() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("path_prefix", "my-prefix"); @@ -123,8 +119,7 @@ public void checkDefaultValues() // paths are set @Test - public void checkDefaultValuesPathsSpecified() - { + public void checkDefaultValuesPathsSpecified() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("paths", Arrays.asList("object1", "object2")) @@ -138,8 +133,7 @@ public void checkDefaultValuesPathsSpecified() // both path_prefix and paths are not set @Test(expected = ConfigException.class) - public void checkDefaultValuesNoPathSpecified() - { + public void checkDefaultValuesNoPathSpecified() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("auth_method", "private_key") @@ -153,8 +147,7 @@ public void checkDefaultValuesNoPathSpecified() // p12_keyfile is null when auth_method is private_key @Test(expected = ConfigException.class) - public void checkDefaultValuesP12keyNull() - { + public void checkDefaultValuesP12keyNull() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("path_prefix", "my-prefix") @@ -167,8 +160,7 @@ public void checkDefaultValuesP12keyNull() // both p12_keyfile and p12_keyfile_fullpath set @Test(expected = ConfigException.class) - public void checkDefaultValuesConflictSetting() - { + public void checkDefaultValuesConflictSetting() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("path_prefix", "my-prefix") @@ -182,8 +174,7 @@ public void checkDefaultValuesConflictSetting() // invalid p12keyfile when auth_method is private_key @Test(expected = ConfigException.class) - public void checkDefaultValuesInvalidPrivateKey() - { + public void checkDefaultValuesInvalidPrivateKey() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("path_prefix", "my-prefix") @@ -197,8 +188,7 @@ public void checkDefaultValuesInvalidPrivateKey() // json_keyfile is null when auth_method is json_key @Test(expected = ConfigException.class) - public void checkDefaultValuesJsonKeyfileNull() - { + public void checkDefaultValuesJsonKeyfileNull() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("path_prefix", "my-prefix") @@ -212,8 +202,7 @@ public void checkDefaultValuesJsonKeyfileNull() // last_path length is too long @Test(expected = ConfigException.class) - public void checkDefaultValuesLongLastPath() - { + public void checkDefaultValuesLongLastPath() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("path_prefix", "my-prefix") @@ -227,15 +216,13 @@ public void checkDefaultValuesLongLastPath() } @Test - public void testGcsClientCreateSuccessfully() - { + public void testGcsClientCreateSuccessfully() { PluginTask task = CONFIG_MAPPER.map(config(), PluginTask.class); AuthUtils.newClient(task); } @Test(expected = ConfigException.class) - public void testGcsClientCreateThrowConfigException() - { + public void testGcsClientCreateThrowConfigException() { // verify AuthUtils#newClient() to throws ConfigException for non-exists-bucket ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", "non-exists-bucket") @@ -249,8 +236,7 @@ public void testGcsClientCreateThrowConfigException() } @Test - public void testResume() - { + public void testResume() { PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class); FileList.Builder builder = new FileList.Builder(config); builder.add("in/aa/a", 1); @@ -260,15 +246,13 @@ public void testResume() } @Test - public void testCleanup() - { + public void testCleanup() { PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class); plugin.cleanup(task.toTaskSource(), 0, Lists.newArrayList()); // no errors happens } @Test - public void testListFilesByPrefix() - { + public void testListFilesByPrefix() { List expected = Arrays.asList( GCP_BUCKET_DIRECTORY + "sample_01.csv", GCP_BUCKET_DIRECTORY + "sample_02.csv" @@ -287,8 +271,7 @@ public void testListFilesByPrefix() } @Test - public void testListFilesByPrefixWithPattern() - { + public void testListFilesByPrefixWithPattern() { List expected = Arrays.asList( GCP_BUCKET_DIRECTORY + "sample_01.csv" ); @@ -306,8 +289,7 @@ public void testListFilesByPrefixWithPattern() } @Test - public void testListFilesByPrefixIncrementalFalse() - { + public void testListFilesByPrefixIncrementalFalse() { ConfigSource config = config().deepCopy() .set("incremental", false); @@ -317,8 +299,7 @@ public void testListFilesByPrefixIncrementalFalse() } @Test(expected = ConfigException.class) - public void testListFilesByPrefixNonExistsBucket() - { + public void testListFilesByPrefixNonExistsBucket() { PluginTask task = CONFIG_MAPPER.map(config .set("bucket", "non-exists-bucket") .set("path_prefix", "prefix"), PluginTask.class); @@ -330,8 +311,7 @@ public void testListFilesByPrefixNonExistsBucket() } @Test - public void testNonExistingFilesWithPathPrefix() - { + public void testNonExistingFilesWithPathPrefix() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("path_prefix", "/path/to/notfound") @@ -347,8 +327,7 @@ public void testNonExistingFilesWithPathPrefix() } @Test(expected = ConfigException.class) - public void testNonExistingFilesWithPaths() throws Exception - { + public void testNonExistingFilesWithPaths() throws Exception { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("paths", Arrays.asList()) @@ -362,8 +341,7 @@ public void testNonExistingFilesWithPaths() throws Exception } @Test(expected = ConfigException.class) - public void testLastPathTooLong() throws Exception - { + public void testLastPathTooLong() throws Exception { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("paths", Arrays.asList()) @@ -393,9 +371,9 @@ public void testGcsFileInputByOpen() throws IOException { assertRecords(tempFile); } + @SuppressWarnings("checkstyle:LineLength") @Test - public void testBase64() - { + public void testBase64() { assertEquals("CgFj", GcsFileInput.base64Encode("c")); assertEquals("CgJjMg==", GcsFileInput.base64Encode("c2")); assertEquals("Cgh0ZXN0LmNzdg==", GcsFileInput.base64Encode("test.csv")); @@ -409,8 +387,7 @@ public void testBase64() assertEquals(expected, GcsFileInput.base64Encode(params)); } - private ConfigSource config() - { + private ConfigSource config() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) .set("path_prefix", GCP_PATH_PREFIX) @@ -422,8 +399,7 @@ private ConfigSource config() return config; } - private static List emptyTaskReports(int taskCount) - { + private static List emptyTaskReports(final int taskCount) { ImmutableList.Builder reports = new ImmutableList.Builder<>(); for (int i = 0; i < taskCount; i++) { reports.add(CONFIG_MAPPER_FACTORY.newTaskReport()); @@ -431,17 +407,14 @@ private static List emptyTaskReports(int taskCount) return reports.build(); } - private class Control - implements FileInputPlugin.Control - { + private class Control implements FileInputPlugin.Control { @Override public List run(TaskSource taskSource, int taskCount) { return ImmutableList.of(CONFIG_MAPPER_FACTORY.newTaskReport()); } } - private ImmutableMap parserConfig(ImmutableList schemaConfig) - { + private ImmutableMap parserConfig(ImmutableList schemaConfig) { ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); builder.put("type", "csv"); builder.put("newline", "CRLF"); @@ -456,8 +429,7 @@ private ImmutableMap parserConfig(ImmutableList schemaCo return builder.build(); } - private ImmutableList schemaConfig() - { + private ImmutableList schemaConfig() { ImmutableList.Builder builder = new ImmutableList.Builder<>(); builder.add(ImmutableMap.of("name", "id", "type", "long")); builder.add(ImmutableMap.of("name", "account", "type", "long")); @@ -477,27 +449,27 @@ private void assertRecords(Path tempFile) throws IOException { records.add(record); } assertEquals(8, records.size()); - { - String[] record = records.get(0); - assertEquals("1", record[0]); - assertEquals("32864", record[1]); - assertEquals("2015-01-27 19:23:49.000000 +0000", record[2]); - assertEquals("2015-01-27 00:00:00.000000 +0000", record[3]); - assertEquals("embulk", record[4]); - } - { - Object[] record = records.get(1); - assertEquals("2", record[0]); - assertEquals("14824", record[1]); - assertEquals("2015-01-27 19:01:23.000000 +0000", record[2].toString()); - assertEquals("2015-01-27 00:00:00.000000 +0000", record[3].toString()); - assertEquals("embulk jruby", record[4]); - } + { + final String[] record = records.get(0); + assertEquals("1", record[0]); + assertEquals("32864", record[1]); + assertEquals("2015-01-27 19:23:49.000000 +0000", record[2]); + assertEquals("2015-01-27 00:00:00.000000 +0000", record[3]); + assertEquals("embulk", record[4]); + } + + { + final Object[] record = records.get(1); + assertEquals("2", record[0]); + assertEquals("14824", record[1]); + assertEquals("2015-01-27 19:01:23.000000 +0000", record[2].toString()); + assertEquals("2015-01-27 00:00:00.000000 +0000", record[3].toString()); + assertEquals("embulk jruby", record[4]); + } } - private static String getDirectory(String dir) - { + private static String getDirectory(String dir) { if (dir != null) { if (!dir.endsWith("/")) { dir = dir + "/"; @@ -509,8 +481,7 @@ private static String getDirectory(String dir) return dir; } - private ConfigSource setKeys(ConfigSource configSource) - { + private ConfigSource setKeys(final ConfigSource configSource) { byte[] keyBytes = Base64.getDecoder().decode(GCP_P12_KEYFILE.get()); Optional p12Key = Optional.of(LocalFile.ofContent(keyBytes)); Optional jsonKey = Optional.of(LocalFile.ofContent(GCP_JSON_KEYFILE.get().getBytes())); diff --git a/src/test/java/org/embulk/input/gcs/TestInputStreamReopener.java b/src/test/java/org/embulk/input/gcs/TestInputStreamReopener.java index 3e9fab0..09a592b 100644 --- a/src/test/java/org/embulk/input/gcs/TestInputStreamReopener.java +++ b/src/test/java/org/embulk/input/gcs/TestInputStreamReopener.java @@ -1,77 +1,67 @@ package org.embulk.input.gcs; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; + import com.google.cloud.ReadChannel; import com.google.cloud.RestorableState; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; import com.google.common.base.Charsets; import com.google.common.io.Files; -import org.embulk.test.EmbulkTestRuntime; -import org.embulk.util.file.ResumableInputStream; -import org.junit.Rule; -import org.junit.Test; -import org.mockito.Mockito; - import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Paths; +import org.embulk.test.EmbulkTestRuntime; +import org.embulk.util.file.ResumableInputStream; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mockito; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; - -public class TestInputStreamReopener -{ - private static class MockReadChannel implements ReadChannel - { +public class TestInputStreamReopener { + private static class MockReadChannel implements ReadChannel { private FileChannel ch; - MockReadChannel(FileChannel ch) - { + MockReadChannel(final FileChannel ch) { this.ch = ch; } @Override - public boolean isOpen() - { + public boolean isOpen() { return this.ch.isOpen(); } @Override - public void close() - { + public void close() { try { this.ch.close(); - } - catch (IOException ignored) { + } catch (final IOException ignored) { + // no-op } } @Override - public void seek(long position) throws IOException - { + public void seek(final long position) throws IOException { this.ch.position(position); } @Override - public void setChunkSize(int chunkSize) - { + public void setChunkSize(final int chunkSize) { // no-op } @Override - public RestorableState capture() - { + public RestorableState capture() { return null; } @Override - public int read(ByteBuffer dst) throws IOException - { + public int read(final ByteBuffer dst) throws IOException { return this.ch.read(dst); } } @@ -82,8 +72,7 @@ public int read(ByteBuffer dst) throws IOException public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); @Test - public void testResume() - { + public void testResume() { final String bucket = "any_bucket"; final String key = "any_file"; @@ -101,15 +90,13 @@ public void testResume() String content = out.toString("UTF-8"); // assert content assertString(content); - } - catch (IOException e) { + } catch (final IOException e) { e.printStackTrace(); fail("Should not throw"); } } - private Storage mockStorage() - { + private Storage mockStorage() { Blob blob = Mockito.mock(Blob.class); // mock Storage to return ReadChannel Storage client = Mockito.mock(Storage.class); @@ -121,12 +108,8 @@ private Storage mockStorage() /** * Return a mock FileChannel, with simulated error during reads - * - * @return - * @throws IOException */ - private static FileChannel mockChannel() throws IOException - { + private static FileChannel mockChannel() throws IOException { FileChannel ch = Mockito.spy(FileChannel.open(Paths.get(SAMPLE_PATH))); // success -> error -> success -> error... Mockito.doCallRealMethod() @@ -135,8 +118,7 @@ private static FileChannel mockChannel() throws IOException return ch; } - private static void assertString(final String actual) throws IOException - { + private static void assertString(final String actual) throws IOException { final String expected = Files.asCharSource(new File(SAMPLE_PATH), Charsets.UTF_8).read(); assertEquals(expected, actual); } diff --git a/src/test/java/org/embulk/input/gcs/TestRetryUtils.java b/src/test/java/org/embulk/input/gcs/TestRetryUtils.java index 32367ec..1b5077e 100644 --- a/src/test/java/org/embulk/input/gcs/TestRetryUtils.java +++ b/src/test/java/org/embulk/input/gcs/TestRetryUtils.java @@ -1,5 +1,13 @@ package org.embulk.input.gcs; +import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER; +import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER_FACTORY; +import static org.embulk.input.gcs.RetryUtils.withRetry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import com.google.api.client.auth.oauth2.TokenResponseException; import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonResponseException; @@ -15,45 +23,31 @@ import com.google.api.client.testing.http.MockLowLevelHttpRequest; import com.google.api.client.testing.http.MockLowLevelHttpResponse; import com.google.cloud.storage.StorageException; +import java.io.IOException; import org.embulk.test.EmbulkTestRuntime; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.mockito.Mockito; -import java.io.IOException; - -import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER; -import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER_FACTORY; -import static org.embulk.input.gcs.RetryUtils.withRetry; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class TestRetryUtils -{ +public class TestRetryUtils { @Rule public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); private RetryUtils.DefaultRetryable mock; @Before - public void setUp() - { - mock = new RetryUtils.DefaultRetryable() - { + public void setUp() { + mock = new RetryUtils.DefaultRetryable() { @Override - public Object call() - { + public Object call() { return null; } }; } @Test - public void testRetryable() throws IOException - { + public void testRetryable() throws IOException { // verify that #isRetryable() returns false for below cases: // - GoogleJsonResponseException && details.code == 4xx assertFalse(mock.isRetryableException(fakeJsonException(400, "fake_400_ex", null))); @@ -73,8 +67,7 @@ public void testRetryable() throws IOException } @Test - public void testWithRetry() throws Exception - { + public void testWithRetry() throws Exception { mock = Mockito.spy(mock); Exception ex = new StorageException(403, "Fake Exception"); Mockito.doThrow(ex).doThrow(ex).doReturn(null).when(mock).call(); @@ -85,21 +78,17 @@ public void testWithRetry() throws Exception } @Test - public void testWithRetryGiveUp() - { + public void testWithRetryGiveUp() { final String expectMsg = "Will retry and give up"; - mock = new RetryUtils.DefaultRetryable() - { + mock = new RetryUtils.DefaultRetryable() { @Override - public Object call() - { + public Object call() { throw new IllegalStateException(expectMsg); } }; try { withRetry(params(), mock); - } - catch (RuntimeException e) { + } catch (final RuntimeException e) { // root cause -> RetryGiveUpException -> RuntimeException Throwable rootCause = e.getCause().getCause(); assertEquals(expectMsg, rootCause.getMessage()); @@ -107,43 +96,36 @@ public Object call() } } - private static RetryUtils.Task params() - { + private static RetryUtils.Task params() { return CONFIG_MAPPER.map(CONFIG_MAPPER_FACTORY.newConfigSource().set("initial_retry_interval_millis", 1), RetryUtils.Task.class); } - private static GoogleJsonResponseException fakeJsonException(final int code, final String message, final String content) - { + private static GoogleJsonResponseException fakeJsonException(final int code, final String message, final String content) { GoogleJsonResponseException.Builder builder = new GoogleJsonResponseException.Builder(code, message, new HttpHeaders()); builder.setContent(content); return new GoogleJsonResponseException(builder, fakeJsonError(code, message)); } - private static GoogleJsonResponseException fakeJsonExceptionWithoutDetails(final int code, final String message, final String content) - { + private static GoogleJsonResponseException fakeJsonExceptionWithoutDetails(final int code, final String message, final String content) { GoogleJsonResponseException.Builder builder = new GoogleJsonResponseException.Builder(code, message, new HttpHeaders()); builder.setContent(content); return new GoogleJsonResponseException(builder, null); } - private static GoogleJsonError fakeJsonError(final int code, final String message) - { + private static GoogleJsonError fakeJsonError(final int code, final String message) { GoogleJsonError error = new GoogleJsonError(); error.setCode(code); error.setMessage(message); return error; } - private static TokenResponseException fakeTokenException(final int code, final String content) throws IOException - { + private static TokenResponseException fakeTokenException(final int code, final String content) throws IOException { HttpTransport transport = new MockHttpTransport() { @Override - public LowLevelHttpRequest buildRequest(String method, String url) - { + public LowLevelHttpRequest buildRequest(final String method, final String url) { return new MockLowLevelHttpRequest() { @Override - public LowLevelHttpResponse execute() - { + public LowLevelHttpResponse execute() { MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); response.addHeader("custom_header", "value"); response.setStatusCode(code);