Skip to content

Commit

Permalink
DBZ-8431 Ability to access primary CR in component configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
jcechace committed Nov 20, 2024
1 parent 9f065a1 commit dcda790
Show file tree
Hide file tree
Showing 26 changed files with 147 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
*/
package io.debezium.operator.api.config;

public interface ConfigMappable {
import io.fabric8.kubernetes.api.model.HasMetadata;

ConfigMapping asConfiguration();
public interface ConfigMappable<P extends HasMetadata> {

ConfigMapping<P> asConfiguration(P primary);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.HasMetadata;

/**
* Convenience wrapper used to build properties-like configuration from arbitrary map
*/
public final class ConfigMapping {
public final class ConfigMapping<P extends HasMetadata> {

private final P primary;

/**
* Represents a key in the configuration
Expand Down Expand Up @@ -68,41 +72,42 @@ public String toString() {
private final Map<Key, String> config;
private final String prefix;

public static ConfigMapping from(Map<String, ?> properties) {
var config = ConfigMapping.empty();
public static <P extends HasMetadata> ConfigMapping<P> from(P primary, Map<String, ?> properties) {
var config = ConfigMapping.empty(primary);
config.putAll(properties);
return config;
}

public static ConfigMapping empty() {
return new ConfigMapping(null);
public static <P extends HasMetadata> ConfigMapping<P> empty(P primary) {
return new ConfigMapping<>(primary, null);
}

public static ConfigMapping prefixed(String prefix) {
return new ConfigMapping(prefix);
public static <P extends HasMetadata> ConfigMapping<P> prefixed(P primary, String prefix) {
return new ConfigMapping<>(primary, prefix);
}

public ConfigMapping(String prefix) {
public ConfigMapping(P primary, String prefix) {
this.config = new HashMap<>();
this.prefix = prefix;
this.primary = primary;
}

/**
* Creates new ConfigMapping, with the same prefix as this one, but sets all keys as absolute
*
* @return new ConfigMapping with all keys as absolute
*/
public ConfigMapping asAbsolute() {
return ConfigMapping.prefixed(prefix).putAllAbs(this);
public ConfigMapping<P> asAbsolute() {
return ConfigMapping.prefixed(primary, prefix).putAllAbs(this);
}

/**
* Creates new ConfigMapping, with the same prefix as this one, but sets all keys as relative
*
* @return new ConfigMapping with all keys as relative
*/
public ConfigMapping asRelative() {
return ConfigMapping.prefixed(prefix).putAllRel(this);
public ConfigMapping<P> asRelative() {
return ConfigMapping.prefixed(primary, prefix).putAllRel(this);
}

public Map<String, String> getAsMapSimple() {
Expand All @@ -122,12 +127,12 @@ public String getAsString() {
.collect(Collectors.joining(System.lineSeparator()));
}

public ConfigMapping rootValue(Object value) {
public ConfigMapping<P> rootValue(Object value) {
putInternal(value, Key.root());
return this;
}

public ConfigMapping put(String key, Object value) {
public ConfigMapping<P> put(String key, Object value) {
putInternal(value, Key.rel(key));
return this;
}
Expand All @@ -139,30 +144,30 @@ public ConfigMapping put(String key, Object value) {
* @param value the value to put
* @return this mapping
*/
public ConfigMapping putAbs(String key, Object value) {
public ConfigMapping<P> putAbs(String key, Object value) {
putInternal(value, Key.abs(key));
return this;
}

public ConfigMapping putAll(ConfigMappable resource) {
return putAll(resource.asConfiguration());
public ConfigMapping<P> putAll(ConfigMappable<P> resource) {
return putAll(resource.asConfiguration(primary));
}

public ConfigMapping putAll(String key, ConfigMappable resource) {
return putAll(key, resource.asConfiguration());
public ConfigMapping<P> putAll(String key, ConfigMappable<P> resource) {
return putAll(key, resource.asConfiguration(primary));
}

public ConfigMapping putAll(ConfigMapping config) {
public ConfigMapping<P> putAll(ConfigMapping<?> config) {
config.getAsMap().forEach((key, value) -> putInternal(value, key));
return this;
}

public ConfigMapping putAll(String key, ConfigMapping config) {
public ConfigMapping<P> putAll(String key, ConfigMapping<?> config) {
config.getAsMap().forEach((subKey, value) -> putInternal(value, key, subKey));
return this;
}

public ConfigMapping putAll(Map<String, ?> props) {
public ConfigMapping<P> putAll(Map<String, ?> props) {
props.forEach((key, value) -> putInternal(value, Key.rel(key)));
return this;
}
Expand All @@ -172,7 +177,7 @@ public ConfigMapping putAll(Map<String, ?> props) {
* @param config the configuration to put
* @return this mapping
*/
public ConfigMapping putAllAbs(ConfigMapping config) {
public ConfigMapping<P> putAllAbs(ConfigMapping<?> config) {
config.getAsMap().forEach((key, value) -> putInternal(value, key.asAbs()));
return this;
}
Expand All @@ -182,17 +187,18 @@ public ConfigMapping putAllAbs(ConfigMapping config) {
* @param config the configuration to put
* @return this mapping
*/
public ConfigMapping putAllRel(ConfigMapping config) {
public ConfigMapping<P> putAllRel(ConfigMapping<?> config) {
config.getAsMap().forEach((key, value) -> putInternal(value, key.asRel()));
return this;
}

public <T extends ConfigMappable> ConfigMapping putList(String key, List<T> items, String name) {
@SuppressWarnings("unchecked")
public ConfigMapping<P> putList(String key, List<? extends ConfigMappable<P>> items, String name) {
if (items.isEmpty()) {
return this;
}

record NamedItem(String name, ConfigMappable item) {
record NamedItem(String name, ConfigMappable<?> item) {
}

var named = IntStream.
Expand All @@ -205,11 +211,11 @@ record NamedItem(String name, ConfigMappable item) {
.reduce((x, y) -> x + "," + y)
.ifPresent(names -> put(key, names));

named.forEach(item -> putAll(key + "." + item.name, item.item));
named.forEach(item -> putAll(key + "." + item.name, (ConfigMappable<P>) item.item));
return this;
}

public <T extends ConfigMappable> ConfigMapping putMap(String key, Map<String, T> items) {
public ConfigMapping<P> putMap(String key, Map<String, ? extends ConfigMappable<P>> items) {
items.keySet().stream()
.reduce((x, y) -> String.join(","))
.ifPresent(names -> put(key, names));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.debezium.operator.docs.annotations.Documented;

@Documented(hidden = true, name = "Map")
public class ConfigProperties implements ConfigMappable {
public class ConfigProperties implements ConfigMappable<DebeziumServer> {

private Map<String, Object> props = new HashMap<>(0);

Expand All @@ -37,7 +37,7 @@ public void setProps(String name, Object value) {
}

@Override
public ConfigMapping asConfiguration() {
return ConfigMapping.from(props);
public ConfigMapping<DebeziumServer> asConfiguration(DebeziumServer primary) {
return ConfigMapping.from(primary, props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import com.fasterxml.jackson.annotation.JsonIgnore;

import io.debezium.operator.api.config.ConfigMappable;
import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.status.Condition;
import io.debezium.operator.api.model.status.DebeziumServerStatus;
Expand Down Expand Up @@ -35,15 +34,15 @@
}, parent = false)
public class DebeziumServer
extends CustomResource<DebeziumServerSpec, DebeziumServerStatus>
implements Namespaced, ConfigMappable {
implements Namespaced {

@Override
protected DebeziumServerSpec initSpec() {
return new DebeziumServerSpec();
}

public ConfigMapping asConfiguration() {
return spec.asConfiguration();
public ConfigMapping<DebeziumServer> asConfiguration() {
return spec.asConfiguration(this);
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
@JsonPropertyOrder({
"image", "version", "image", "quarkus", "runtime", "format", "transforms", "predicates", "source", "sink"
})
public class DebeziumServerSpec implements ConfigMappable {
public class DebeziumServerSpec implements ConfigMappable<DebeziumServer> {

@JsonPropertyDescription("Image used for Debezium Server container. This property takes precedence over version.")
private String image;
Expand Down Expand Up @@ -138,16 +138,16 @@ public void setPredicates(Map<String, Predicate> predicates) {
}

@Override
public ConfigMapping asConfiguration() {
var dbzConfig = ConfigMapping.prefixed("debezium")
public ConfigMapping<DebeziumServer> asConfiguration(DebeziumServer primary) {
var dbzConfig = ConfigMapping.prefixed(primary, "debezium")
.putAll(runtime)
.putAll("source", source)
.putAll("sink", sink)
.putAll("format", format)
.putList("transforms", transforms, "t")
.putMap("predicates", predicates);

return ConfigMapping.empty()
return ConfigMapping.empty(primary)
.putAll("quarkus", quarkus)
.putAll(dbzConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class Format implements ConfigMappable {
public class Format implements ConfigMappable<DebeziumServer> {

@JsonPropertyDescription("Message key format configuration.")
private FormatType key;
Expand Down Expand Up @@ -56,8 +56,8 @@ public void setHeader(FormatType header) {
}

@Override
public ConfigMapping asConfiguration() {
return ConfigMapping.empty()
public ConfigMapping<DebeziumServer> asConfiguration(DebeziumServer primary) {
return ConfigMapping.empty(primary)
.putAll("key", key)
.putAll("value", value)
.putAll("header", header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class FormatType implements ConfigMappable {
public class FormatType implements ConfigMappable<DebeziumServer> {

@JsonPropertyDescription("Format type recognised by Debezium Server.")
@JsonProperty(defaultValue = "json")
Expand Down Expand Up @@ -45,8 +45,8 @@ public void setType(String type) {
}

@Override
public ConfigMapping asConfiguration() {
return ConfigMapping.empty()
public ConfigMapping<DebeziumServer> asConfiguration(DebeziumServer primary) {
return ConfigMapping.empty(primary)
.putAll(this.config)
.rootValue(type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class Predicate implements ConfigMappable {
public class Predicate implements ConfigMappable<DebeziumServer> {

@JsonPropertyDescription("Fully qualified name of Java class implementing the predicate.")
@JsonProperty(required = true)
Expand Down Expand Up @@ -45,8 +45,8 @@ public void setConfig(ConfigProperties config) {
}

@Override
public ConfigMapping asConfiguration() {
return ConfigMapping.empty()
public ConfigMapping<DebeziumServer> asConfiguration(DebeziumServer primary) {
return ConfigMapping.empty(primary)
.put("type", type)
.putAll(this.config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class Quarkus implements ConfigMappable {
public class Quarkus implements ConfigMappable<DebeziumServer> {

@JsonPropertyDescription("Quarkus configuration properties.")
private ConfigProperties config;
Expand All @@ -32,8 +32,8 @@ public void setConfig(ConfigProperties config) {
}

@Override
public ConfigMapping asConfiguration() {
return ConfigMapping.empty()
public ConfigMapping<DebeziumServer> asConfiguration(DebeziumServer primary) {
return ConfigMapping.empty(primary)
.putAll(this.config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class Sink implements ConfigMappable {
public class Sink implements ConfigMappable<DebeziumServer> {

@JsonPropertyDescription("Sink type recognised by Debezium Server.")
@JsonProperty(required = true)
Expand Down Expand Up @@ -45,8 +45,8 @@ public void setConfig(ConfigProperties config) {
}

@Override
public ConfigMapping asConfiguration() {
return ConfigMapping.empty()
public ConfigMapping<DebeziumServer> asConfiguration(DebeziumServer primary) {
return ConfigMapping.empty(primary)
.put("type", type)
.putAll(type, this.config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class Transformation implements ConfigMappable {
public class Transformation implements ConfigMappable<DebeziumServer> {

@JsonPropertyDescription("Fully qualified name of Java class implementing the transformation.")
@JsonProperty(required = true)
Expand Down Expand Up @@ -62,8 +62,8 @@ public void setConfig(ConfigProperties config) {
}

@Override
public ConfigMapping asConfiguration() {
return ConfigMapping.empty()
public ConfigMapping<DebeziumServer> asConfiguration(DebeziumServer primary) {
return ConfigMapping.empty(primary)
.put("type", type)
.put("predicate", predicate)
.put("negate", negate)
Expand Down
Loading

0 comments on commit dcda790

Please sign in to comment.