From 982b30a8dc606f74ec444c728878ee186cacde69 Mon Sep 17 00:00:00 2001 From: Trevor Date: Thu, 7 Dec 2023 09:47:51 -0500 Subject: [PATCH] Add multi-subscription message shunt capability (#783) --- udmis/bin/run | 3 +- udmis/etc/prod_pod.json | 2 +- .../service/messaging/impl/MessageBase.java | 17 +++++-- .../service/messaging/impl/PubSubPipe.java | 27 ++++++++---- .../bos/udmi/service/pod/ContainerBase.java | 44 ++++++++++++++++--- .../messaging/impl/MessageTestCore.java | 3 +- .../udmi/service/pod/ContainerBaseTest.java | 33 ++++++++++++++ 7 files changed, 110 insertions(+), 19 deletions(-) create mode 100644 udmis/src/test/java/com/google/bos/udmi/service/pod/ContainerBaseTest.java diff --git a/udmis/bin/run b/udmis/bin/run index ea30a5400e..319250feb3 100755 --- a/udmis/bin/run +++ b/udmis/bin/run @@ -51,8 +51,9 @@ fi if [[ -n $SHUNT_CONFIGURATION && -s $SHUNT_CONFIGURATION ]]; then export SHUNT_NAME=${UDMI_PREFIX}$(jq -r .name $SHUNT_CONFIGURATION) export SHUNT_FROM=$(jq -r .from $SHUNT_CONFIGURATION) + export SHUNT_RECV=$(jq -r .recv $SHUNT_CONFIGURATION) export SHUNT_MORF=$(jq -r .morf $SHUNT_CONFIGURATION) - echo Configured udmi shunt $SHUNT_NAME between $SHUNT_FROM and $SHUNT_MORF + echo Configured udmi shunt $SHUNT_NAME between $SHUNT_FROM $SHUNT_RECV and $SHUNT_MORF else echo No SHUNT_CONFIGURATION defined, so no SHUNT_NAME configured. fi diff --git a/udmis/etc/prod_pod.json b/udmis/etc/prod_pod.json index e771e298d0..b04b691a76 100644 --- a/udmis/etc/prod_pod.json +++ b/udmis/etc/prod_pod.json @@ -54,7 +54,7 @@ "enabled": "${SHUNT_NAME}", "from": { "hostname": "${SHUNT_FROM}", - "recv_id": "${SHUNT_NAME}-take", + "recv_id": "${SHUNT_NAME}-${SHUNT_RECV}", "send_id": "${SHUNT_NAME}-put" }, "morf": { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java index da7e032125..767023657e 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java @@ -21,6 +21,7 @@ import com.google.bos.udmi.service.messaging.MessagePipe; import com.google.bos.udmi.service.pod.ContainerBase; import com.google.bos.udmi.service.pod.UdmiServicePod; +import com.google.common.base.Strings; import java.time.Duration; import java.time.Instant; import java.util.Date; @@ -256,7 +257,12 @@ private void receiveException(Map attributesMap, String messageS private void sanitizeAttributeMap(Map attributesMap) { String subFolderRaw = attributesMap.get(SUBFOLDER_PROPERTY_KEY); - if (subFolderRaw != null) { + if (subFolderRaw == null) { + // Do nothing! + } else if (subFolderRaw.equals("")) { + debug("Coerced empty subFolder to undefined"); + attributesMap.remove(SUBFOLDER_PROPERTY_KEY); + } else { SubFolder subFolder = catchToElse(() -> SubFolder.fromValue(subFolderRaw), SubFolder.INVALID); if (!subFolder.value().equals(subFolderRaw)) { debug("Coerced subFolder " + subFolderRaw + " to " + subFolder.value()); @@ -265,10 +271,15 @@ private void sanitizeAttributeMap(Map attributesMap) { } String subTypeRaw = attributesMap.get(SUBTYPE_PROPERTY_KEY); - if (subTypeRaw != null) { + if (subTypeRaw == null) { + // Do nothing! + } else if (subTypeRaw.equals("")) { + debug("Coerced empty subType to undefined"); + attributesMap.remove(SUBTYPE_PROPERTY_KEY); + } else if (!Strings.isNullOrEmpty(subTypeRaw)) { SubType subType = catchToElse(() -> SubType.fromValue(subTypeRaw), SubType.INVALID); - debug("Coerced subFolder " + subTypeRaw + " to " + subType.value()); if (!subType.value().equals(subTypeRaw)) { + debug("Coerced subFolder " + subTypeRaw + " to " + subType.value()); attributesMap.put(SUBTYPE_PROPERTY_KEY, subType.value()); } } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java index ebf5dead56..0dc24a86d5 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java @@ -2,6 +2,7 @@ import static com.google.udmi.util.Common.SUBFOLDER_PROPERTY_KEY; import static com.google.udmi.util.Common.SUBTYPE_PROPERTY_KEY; +import static com.google.udmi.util.GeneralUtils.CSV_JOINER; import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; @@ -35,9 +36,11 @@ import java.time.Duration; import java.time.Instant; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -54,7 +57,7 @@ public class PubSubPipe extends MessageBase implements MessageReceiver { public static final String EMULATOR_HOST = System.getenv(EMULATOR_HOST_ENV); public static final String GCP_HOST = "gcp"; public static final String PS_TXN_PREFIX = "PS:"; - private final Subscriber subscriber; + private final List subscribers; private final Publisher publisher; private final String projectId; @@ -67,15 +70,20 @@ public PubSubPipe(EndpointConfiguration configuration) { "no project id defined in configuration as 'hostname'"); publisher = ifNotNullGet(variableSubstitution(configuration.send_id), this::getPublisher); ifNotNullThen(publisher, this::checkPublisher); - subscriber = ifNotNullGet(variableSubstitution(configuration.recv_id), this::getSubscriber); - String subscriptionName = ifNotNullGet(subscriber, Subscriber::getSubscriptionNameString); + subscribers = ifNotNullGet(multiSubstitution(configuration.recv_id), this::getSubscribers); + String subscriptionNames = subscribers.stream().map(Subscriber::getSubscriptionNameString) + .collect(Collectors.joining(", ")); String topicName = ifNotNullGet(publisher, Publisher::getTopicNameString); - debug("PubSub %s s -> %s", super.toString(), subscriptionName, topicName); + debug("PubSub %s -> %s", super.toString(), subscriptionNames, topicName); } catch (Exception e) { throw new RuntimeException("While creating PubSub pipe", e); } } + private List getSubscribers(Set names) { + return names.stream().map(this::getSubscriber).toList(); + } + private static void checkSubscription(ProjectSubscriptionName subscriptionName) { try (SubscriptionAdminClient client = SubscriptionAdminClient.create()) { client.getSubscription(subscriptionName).getAckDeadlineSeconds(); @@ -118,7 +126,7 @@ private void checkPublisher() { @Override public void activate(Consumer bundleConsumer) { super.activate(bundleConsumer); - subscriber.startAsync(); + subscribers.forEach(Subscriber::startAsync); } @Override @@ -163,9 +171,13 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer reply) { } } + private void stopAndWait(Subscriber subscriber) { + subscriber.stopAsync().awaitTerminated(); + } + @Override public void shutdown() { - subscriber.stopAsync().awaitTerminated(); + subscribers.forEach(this::stopAndWait); super.shutdown(); } @@ -210,8 +222,7 @@ public void failed(State from, Throwable failure) { void resetForTest() { super.resetForTest(); try { - subscriber.stopAsync(); - subscriber.awaitTerminated(); + subscribers.forEach(this::stopAndWait); publisher.shutdown(); } catch (Exception e) { throw new RuntimeException("While shutting down connections", e); diff --git a/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java b/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java index 321fdafa3f..e3a087e4bc 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java @@ -1,17 +1,23 @@ package com.google.bos.udmi.service.pod; -import static com.google.common.base.Preconditions.checkState; import static com.google.udmi.util.GeneralUtils.ifNotTrueThen; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.Optional.ofNullable; +import static java.util.stream.Collectors.toSet; import com.google.bos.udmi.service.core.ComponentName; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.udmi.util.JsonUtil; import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; +import java.util.Set; import java.util.regex.MatchResult; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.TestOnly; import udmi.schema.BasePodConfiguration; @@ -31,7 +37,8 @@ public abstract class ContainerBase { public static final String EMPTY_JSON = "{}"; public static final String REFLECT_BASE = "UDMI-REFLECT"; private static final ThreadLocal executionContext = new ThreadLocal<>(); - private static final Pattern VARIABLE_PATTERN = Pattern.compile("\\$\\{([A-Z_]+)\\}"); + private static final Pattern VARIABLE_PATTERN = Pattern.compile("\\$\\{([A-Z_]+)}"); + private static final Pattern MULTI_PATTERN = Pattern.compile("!\\{([,a-zA-Z_]+)}"); private static BasePodConfiguration basePodConfig = new BasePodConfiguration(); protected static String reflectRegistry = REFLECT_BASE; protected final PodConfiguration podConfiguration; @@ -55,8 +62,16 @@ public ContainerBase(PodConfiguration config) { info("Configured with reflect registry " + reflectRegistry); } - private static String environmentReplacer(MatchResult match) { - return ofNullable(System.getenv(match.group(1))).orElse(""); + private String environmentReplacer(MatchResult match) { + String replacement = ofNullable(getEnv(match.group(1))).orElse(""); + if (replacement.startsWith("!")) { + return format("!{%s}", replacement.substring(1)); + } + return replacement; + } + + protected String getEnv(String group) { + return System.getenv(group); } @TestOnly @@ -84,6 +99,25 @@ protected synchronized String grabExecutionContext() { return previous; } + protected Set multiSubstitution(String value) { + String raw = variableSubstitution(value); + if (raw == null) { + return ImmutableSet.of(); + } + Matcher matcher = MULTI_PATTERN.matcher(raw); + if (!matcher.find()) { + return ImmutableSet.of(raw); + } + String group = matcher.group(1); + if (matcher.find()) { + throw new RuntimeException(format("Multi multi-expansions not supported: %s", raw)); + } + String[] parts = group.split(","); + Set expanded = Arrays.stream(parts).map(matcher::replaceFirst).collect(toSet()); + expanded.forEach(set -> debug("Expanded intermediate %s with '%s'", raw, set)); + return expanded; + } + protected String variableSubstitution(String value) { if (value == null) { return null; @@ -94,7 +128,7 @@ protected String variableSubstitution(String value) { protected String variableSubstitution(String value, @NotNull String nullMessage) { requireNonNull(value, requireNonNull(nullMessage, "null message not defined")); Matcher matcher = VARIABLE_PATTERN.matcher(value); - String out = matcher.replaceAll(ContainerBase::environmentReplacer); + String out = matcher.replaceAll(this::environmentReplacer); ifNotTrueThen(value.equals(out), () -> debug("Replaced value %s with '%s'", value, out)); return out; } diff --git a/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/MessageTestCore.java b/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/MessageTestCore.java index 47002b14b7..299b09ec03 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/MessageTestCore.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/MessageTestCore.java @@ -6,6 +6,7 @@ import com.google.bos.udmi.service.core.ProcessorTestBase; import com.google.bos.udmi.service.pod.UdmiServicePod; import java.io.File; +import org.jetbrains.annotations.NotNull; import udmi.schema.EndpointConfiguration; import udmi.schema.EndpointConfiguration.Protocol; import udmi.schema.SetupUdmiConfig; @@ -51,7 +52,7 @@ public static void writeVersionDeployFile() { } } - protected void augmentConfig(EndpointConfiguration configuration) { + protected void augmentConfig(@NotNull EndpointConfiguration configuration) { configuration.protocol = Protocol.LOCAL; } diff --git a/udmis/src/test/java/com/google/bos/udmi/service/pod/ContainerBaseTest.java b/udmis/src/test/java/com/google/bos/udmi/service/pod/ContainerBaseTest.java new file mode 100644 index 0000000000..e2ed31d4d3 --- /dev/null +++ b/udmis/src/test/java/com/google/bos/udmi/service/pod/ContainerBaseTest.java @@ -0,0 +1,33 @@ +package com.google.bos.udmi.service.pod; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.Map; +import java.util.Set; +import org.junit.jupiter.api.Test; + +class ContainerBaseTest { + + private final Map mockEnvMap = ImmutableMap.of( + "B", "X", + "C", "!Y,Z"); + + private ContainerBase getMockContainer() { + return spy(ContainerBase.class); + } + + @Test + public void multiVariable() { + ContainerBase testContainer = getMockContainer(); + when(testContainer.getEnv(anyString())).thenAnswer( + i -> mockEnvMap.get((String) i.getArgument(0))); + Set strings = testContainer.multiSubstitution("A${A} B${B} C${C}"); + ImmutableSet expected = ImmutableSet.of("A BX CY", "A BX CZ"); + assertEquals(expected, strings, "expanded multi-variable"); + } +} \ No newline at end of file