diff --git a/connector/src/main/java/org/astraea/connector/Definition.java b/connector/src/main/java/org/astraea/connector/Definition.java index 800b3a36c0..97cf2ce086 100644 --- a/connector/src/main/java/org/astraea/connector/Definition.java +++ b/connector/src/main/java/org/astraea/connector/Definition.java @@ -18,13 +18,32 @@ import java.util.Collection; import java.util.Objects; +import java.util.Optional; import java.util.function.BiConsumer; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; -public interface Definition { +public record Definition( + String name, + Optional defaultValue, + String documentation, + Type type, + BiConsumer validator) { + + @Override + public Optional defaultValue() { + // ConfigDef.NO_DEFAULT_VALUE is a placeholder used to represent the lack of a default value. + return defaultValue.filter(v -> v != ConfigDef.NO_DEFAULT_VALUE); + } + + /** + * @return true if the configuration is required, and it has no default value. + */ + public boolean required() { + return defaultValue.filter(v -> v == ConfigDef.NO_DEFAULT_VALUE).isPresent(); + } - static Builder builder() { + public static Builder builder() { return new Builder(); } @@ -35,32 +54,20 @@ static ConfigDef toConfigDef(Collection defs) { def.define( d.name(), ConfigDef.Type.valueOf(d.type().name()), - d.defaultValue(), - d.validator() == null - ? null - : (n, o) -> { - try { - d.validator().accept(n, o); - } catch (Exception e) { - throw new ConfigException(n, o, e.getMessage()); - } - }, + d.required() ? ConfigDef.NO_DEFAULT_VALUE : d.defaultValue().orElse(null), + (n, o) -> { + try { + d.validator().accept(n, o); + } catch (Exception e) { + throw new ConfigException(n, o, e.getMessage()); + } + }, ConfigDef.Importance.MEDIUM, d.documentation())); return def; } - String name(); - - Object defaultValue(); - - String documentation(); - - Type type(); - - BiConsumer validator(); - - enum Type { + public enum Type { BOOLEAN, STRING, INT, @@ -72,14 +79,14 @@ enum Type { PASSWORD } - class Builder { + public static class Builder { private String name; private Object defaultValue; private String documentation = ""; private Type type = Type.STRING; - private BiConsumer validator; + private BiConsumer validator = (l, h) -> {}; private Builder() {} @@ -113,39 +120,12 @@ public Builder validator(BiConsumer validator) { } public Definition build() { - return new Definition() { - private final String name = Objects.requireNonNull(Builder.this.name); - private final String documentation = Objects.requireNonNull(Builder.this.documentation); - private final Object defaultValue = Builder.this.defaultValue; - private final Type type = Objects.requireNonNull(Builder.this.type); - - private final BiConsumer validator = Builder.this.validator; - - @Override - public String name() { - return name; - } - - @Override - public Object defaultValue() { - return defaultValue; - } - - @Override - public String documentation() { - return documentation; - } - - @Override - public Type type() { - return type; - } - - @Override - public BiConsumer validator() { - return validator; - } - }; + return new Definition( + Objects.requireNonNull(name), + Optional.ofNullable(defaultValue), + Objects.requireNonNull(documentation), + Objects.requireNonNull(type), + validator); } } } diff --git a/connector/src/main/java/org/astraea/connector/backup/Exporter.java b/connector/src/main/java/org/astraea/connector/backup/Exporter.java index 6d4128d633..4448e750b9 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Exporter.java +++ b/connector/src/main/java/org/astraea/connector/backup/Exporter.java @@ -16,6 +16,7 @@ */ package org.astraea.connector.backup; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -79,21 +80,25 @@ public class Exporter extends SinkConnector { .documentation("the path required for file storage.") .required() .build(); + + static DataSize SIZE_DEFAULT = DataSize.MB.of(100); static Definition SIZE_KEY = Definition.builder() .name("size") .type(Definition.Type.STRING) .validator((name, obj) -> DataSize.of(obj.toString())) - .defaultValue("100MB") + .defaultValue(SIZE_DEFAULT.toString()) .documentation("is the maximum number of the size will be included in each file.") .build(); + static Duration TIME_DEFAULT = Duration.ofSeconds(3); + static Definition TIME_KEY = Definition.builder() .name("roll.duration") .type(Definition.Type.STRING) .validator((name, obj) -> Utils.toDuration(obj.toString())) - .defaultValue("3s") + .defaultValue(TIME_DEFAULT.toSeconds() + "s") .documentation("the maximum time before a new archive file is rolling out.") .build(); @@ -104,6 +109,8 @@ public class Exporter extends SinkConnector { .documentation("a value that needs to be overridden in the file system.") .build(); + static DataSize BUFFER_SIZE_DEFAULT = DataSize.MB.of(300); + static Definition BUFFER_SIZE_KEY = Definition.builder() .name("writer.buffer.size") @@ -111,7 +118,7 @@ public class Exporter extends SinkConnector { .validator((name, obj) -> DataSize.of(obj.toString())) .documentation( "a value that represents the capacity of a blocking queue from which the writer can take records.") - .defaultValue("300MB") + .defaultValue(BUFFER_SIZE_DEFAULT.toString()) .build(); private Configuration configs; @@ -254,23 +261,20 @@ List> recordsFromBuffer() { protected void init(Configuration configuration) { this.topicName = configuration.requireString(TOPICS_KEY); this.path = configuration.requireString(PATH_KEY.name()); - this.size = - DataSize.of( - configuration.string(SIZE_KEY.name()).orElse(SIZE_KEY.defaultValue().toString())); + this.size = configuration.string(SIZE_KEY.name()).map(DataSize::of).orElse(SIZE_DEFAULT); this.interval = - Utils.toDuration( - configuration.string(TIME_KEY.name()).orElse(TIME_KEY.defaultValue().toString())) + configuration + .string(TIME_KEY.name()) + .map(Utils::toDuration) + .orElse(TIME_DEFAULT) .toMillis(); - this.bufferSize.reset(); - this.bufferSizeLimit = - DataSize.of( - configuration - .string(BUFFER_SIZE_KEY.name()) - .orElse(BUFFER_SIZE_KEY.defaultValue().toString())) + configuration + .string(BUFFER_SIZE_KEY.name()) + .map(DataSize::of) + .orElse(BUFFER_SIZE_DEFAULT) .bytes(); - this.fs = FileSystem.of(configuration.requireString(SCHEMA_KEY.name()), configuration); this.writerFuture = CompletableFuture.runAsync(createWriter()); } diff --git a/connector/src/main/java/org/astraea/connector/backup/Importer.java b/connector/src/main/java/org/astraea/connector/backup/Importer.java index f8d34cb8da..7f3741655a 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Importer.java +++ b/connector/src/main/java/org/astraea/connector/backup/Importer.java @@ -78,11 +78,12 @@ public class Importer extends SourceConnector { .documentation("The root directory of the file that needs to be imported.") .required() .build(); + static String CLEAN_SOURCE_DEFAULT = "off"; static Definition CLEAN_SOURCE_KEY = Definition.builder() .name("clean.source") .type(Definition.Type.STRING) - .defaultValue("off") + .defaultValue(CLEAN_SOURCE_DEFAULT) .documentation( "Clean source policy. Available policies: \"off\", \"delete\", \"archive\". Default: off") .build(); @@ -149,10 +150,7 @@ protected void init(Configuration configuration, MetadataStorage storage) { this.rootDir = configuration.requireString(PATH_KEY.name()); this.tasksCount = configuration.requireInteger(TASKS_COUNT_KEY); this.paths = new LinkedList<>(); - this.cleanSource = - configuration - .string(CLEAN_SOURCE_KEY.name()) - .orElse(CLEAN_SOURCE_KEY.defaultValue().toString()); + this.cleanSource = configuration.string(CLEAN_SOURCE_KEY.name()).orElse(CLEAN_SOURCE_DEFAULT); this.archiveDir = configuration.string(ARCHIVE_DIR_KEY.name()); } diff --git a/connector/src/main/java/org/astraea/connector/perf/PerfSink.java b/connector/src/main/java/org/astraea/connector/perf/PerfSink.java index 6ad2760ed6..898b1b3f78 100644 --- a/connector/src/main/java/org/astraea/connector/perf/PerfSink.java +++ b/connector/src/main/java/org/astraea/connector/perf/PerfSink.java @@ -27,11 +27,14 @@ import org.astraea.connector.SinkTask; public class PerfSink extends SinkConnector { + + static Duration FREQUENCY_DEFAULT = Duration.ofMillis(300); + static Definition FREQUENCY_DEF = Definition.builder() .name("frequency") .type(Definition.Type.STRING) - .defaultValue("300ms") + .defaultValue(FREQUENCY_DEFAULT.toMillis() + "ms") .validator((name, value) -> Utils.toDuration(value.toString())) .build(); @@ -59,7 +62,7 @@ protected List definitions() { public static class Task extends SinkTask { - private Duration frequency = Utils.toDuration(FREQUENCY_DEF.defaultValue().toString()); + private Duration frequency = FREQUENCY_DEFAULT; private volatile long lastPut = System.currentTimeMillis(); diff --git a/connector/src/main/java/org/astraea/connector/perf/PerfSource.java b/connector/src/main/java/org/astraea/connector/perf/PerfSource.java index a788ad4b79..0a5ff86f00 100644 --- a/connector/src/main/java/org/astraea/connector/perf/PerfSource.java +++ b/connector/src/main/java/org/astraea/connector/perf/PerfSource.java @@ -37,79 +37,97 @@ import org.astraea.connector.SourceTask; public class PerfSource extends SourceConnector { + + static DataSize THROUGHPUT_DEFAULT = DataSize.GB.of(100); + static Definition THROUGHPUT_DEF = Definition.builder() .name("throughput") .type(Definition.Type.STRING) - .defaultValue("100GB") + .defaultValue(THROUGHPUT_DEFAULT.toString()) .validator((name, value) -> DataSize.of(value.toString())) .documentation("the data rate (in second) of sending records") .build(); + static DistributionType KEY_DISTRIBUTION_DEFAULT = DistributionType.UNIFORM; + static Definition KEY_DISTRIBUTION_DEF = Definition.builder() .name("key.distribution") .type(Definition.Type.STRING) .validator((name, obj) -> DistributionType.ofAlias(obj.toString())) - .defaultValue(DistributionType.UNIFORM.alias()) + .defaultValue(KEY_DISTRIBUTION_DEFAULT.alias()) .documentation( "Distribution name for key and key size. Available distribution names: \"fixed\" \"uniform\", \"zipfian\", \"latest\". Default: uniform") .build(); + + static DistributionType KEY_SIZE_DISTRIBUTION_DEFAULT = DistributionType.FIXED; static Definition KEY_SIZE_DISTRIBUTION_DEF = Definition.builder() .name("key.size.distribution") .type(Definition.Type.STRING) .validator((name, obj) -> DistributionType.ofAlias(obj.toString())) - .defaultValue(DistributionType.FIXED.alias()) + .defaultValue(KEY_SIZE_DISTRIBUTION_DEFAULT.alias()) .documentation( "Distribution name for key size. Available distribution names: \"fixed\" \"uniform\", \"zipfian\", \"latest\". Default: fixed") .build(); + + static DataSize KEY_SIZE_DEFAULT = DataSize.Byte.of(50); + static Definition KEY_SIZE_DEF = Definition.builder() .name("key.size") .type(Definition.Type.STRING) .validator((name, obj) -> DataSize.of(obj.toString())) - .defaultValue(DataSize.Byte.of(50).toString()) + .defaultValue(KEY_SIZE_DEFAULT.toString()) .documentation( "the max length of key. The distribution of length is defined by " + KEY_DISTRIBUTION_DEF.name()) .build(); + static DistributionType VALUE_DISTRIBUTION_DEFAULT = DistributionType.UNIFORM; + static Definition VALUE_DISTRIBUTION_DEF = Definition.builder() .name("value.distribution") .type(Definition.Type.STRING) .validator((name, obj) -> DistributionType.ofAlias(obj.toString())) - .defaultValue(DistributionType.UNIFORM.alias()) + .defaultValue(VALUE_DISTRIBUTION_DEFAULT.alias()) .documentation( "Distribution name for value and value size. Available distribution names: \"fixed\" \"uniform\", \"zipfian\", \"latest\". Default: uniform") .build(); + + static DataSize VALUE_SIZE_DEFAULT = DataSize.KB.of(1); + static Definition VALUE_SIZE_DEF = Definition.builder() .name("value.size") .type(Definition.Type.STRING) .validator((name, obj) -> DataSize.of(obj.toString())) - .defaultValue(DataSize.KB.of(1).toString()) + .defaultValue(VALUE_SIZE_DEFAULT.toString()) .documentation( "the max length of value. The distribution of length is defined by " + VALUE_DISTRIBUTION_DEF.name()) .build(); + static DistributionType VALUE_SIZE_DISTRIBUTION_DEFAULT = DistributionType.FIXED; static Definition VALUE_SIZE_DISTRIBUTION_DEF = Definition.builder() .name("value.size.distribution") .type(Definition.Type.STRING) .validator((name, obj) -> DistributionType.ofAlias(obj.toString())) - .defaultValue(DistributionType.FIXED.alias()) + .defaultValue(VALUE_SIZE_DISTRIBUTION_DEFAULT.alias()) .documentation( "Distribution name for value size. Available distribution names: \"fixed\" \"uniform\", \"zipfian\", \"latest\". Default: fixed") .build(); + static int BATCH_SIZE_DEFAULT = 1; static Definition BATCH_SIZE_DEF = Definition.builder() .name("batch.size") .type(Definition.Type.INT) - .defaultValue(1) + .defaultValue(BATCH_SIZE_DEFAULT) .documentation("the max length of batching messages.") .build(); + static Definition KEY_TABLE_SEED = Definition.builder() .name("key.table.seed") @@ -182,53 +200,40 @@ public static class Task extends SourceTask { @Override protected void init(Configuration configuration, MetadataStorage storage) { var throughput = - DataSize.of( - configuration - .string(THROUGHPUT_DEF.name()) - .orElse(THROUGHPUT_DEF.defaultValue().toString())); + configuration.string(THROUGHPUT_DEF.name()).map(DataSize::of).orElse(THROUGHPUT_DEFAULT); var KeySize = - DataSize.of( - configuration - .string(KEY_SIZE_DEF.name()) - .orElse(KEY_SIZE_DEF.defaultValue().toString())); + configuration.string(KEY_SIZE_DEF.name()).map(DataSize::of).orElse(KEY_SIZE_DEFAULT); var keyDistribution = - DistributionType.ofAlias( - configuration - .string(KEY_DISTRIBUTION_DEF.name()) - .orElse(KEY_DISTRIBUTION_DEF.defaultValue().toString())); + configuration + .string(KEY_DISTRIBUTION_DEF.name()) + .map(DistributionType::ofAlias) + .orElse(KEY_DISTRIBUTION_DEFAULT); var keySizeDistribution = - DistributionType.ofAlias( - configuration - .string(KEY_SIZE_DISTRIBUTION_DEF.name()) - .orElse(KEY_SIZE_DISTRIBUTION_DEF.defaultValue().toString())); + configuration + .string(KEY_SIZE_DISTRIBUTION_DEF.name()) + .map(DistributionType::ofAlias) + .orElse(KEY_SIZE_DISTRIBUTION_DEFAULT); var valueSize = - DataSize.of( - configuration - .string(VALUE_SIZE_DEF.name()) - .orElse(VALUE_SIZE_DEF.defaultValue().toString())); + configuration.string(VALUE_SIZE_DEF.name()).map(DataSize::of).orElse(VALUE_SIZE_DEFAULT); var valueDistribution = - DistributionType.ofAlias( - configuration - .string(VALUE_DISTRIBUTION_DEF.name()) - .orElse(VALUE_DISTRIBUTION_DEF.defaultValue().toString())); + configuration + .string(VALUE_DISTRIBUTION_DEF.name()) + .map(DistributionType::ofAlias) + .orElse(VALUE_DISTRIBUTION_DEFAULT); var valueSizeDistribution = - DistributionType.ofAlias( - configuration - .string(VALUE_SIZE_DISTRIBUTION_DEF.name()) - .orElse(VALUE_SIZE_DISTRIBUTION_DEF.defaultValue().toString())); - - var batchSize = configuration - .integer(BATCH_SIZE_DEF.name()) - .orElse((Integer) BATCH_SIZE_DEF.defaultValue()); + .string(VALUE_SIZE_DISTRIBUTION_DEF.name()) + .map(DistributionType::ofAlias) + .orElse(VALUE_SIZE_DISTRIBUTION_DEFAULT); + var batchSize = configuration.integer(BATCH_SIZE_DEF.name()).orElse(BATCH_SIZE_DEFAULT); var keyTableSeed = configuration .longInteger(KEY_TABLE_SEED.name()) - .orElse((Long) KEY_TABLE_SEED.defaultValue()); + .orElse(ThreadLocalRandom.current().nextLong()); var valueTableSeed = configuration .longInteger(VALUE_TABLE_SEED.name()) - .orElse((Long) VALUE_TABLE_SEED.defaultValue()); + .orElse(ThreadLocalRandom.current().nextLong()); specifyPartitions = configuration.list(SourceConnector.TOPICS_KEY, ",").stream() diff --git a/connector/src/test/java/org/astraea/connector/DefinitionTest.java b/connector/src/test/java/org/astraea/connector/DefinitionTest.java index 2f1d593265..ded4139054 100644 --- a/connector/src/test/java/org/astraea/connector/DefinitionTest.java +++ b/connector/src/test/java/org/astraea/connector/DefinitionTest.java @@ -29,6 +29,7 @@ void testRequired() { var kafkaConf = Definition.toConfigDef(List.of(def)); var kafkaDef = kafkaConf.configKeys().entrySet().iterator().next().getValue(); Assertions.assertEquals(ConfigDef.NO_DEFAULT_VALUE, kafkaDef.defaultValue); + Assertions.assertTrue(def.required()); } @Test