Skip to content

Commit

Permalink
Add multi-subscription message shunt capability (#783)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu authored Dec 7, 2023
1 parent 3a5f29e commit 982b30a
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 19 deletions.
3 changes: 2 additions & 1 deletion udmis/bin/run
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ fi
if [[ -n $SHUNT_CONFIGURATION && -s $SHUNT_CONFIGURATION ]]; then
export SHUNT_NAME=${UDMI_PREFIX}$(jq -r .name $SHUNT_CONFIGURATION)
export SHUNT_FROM=$(jq -r .from $SHUNT_CONFIGURATION)
export SHUNT_RECV=$(jq -r .recv $SHUNT_CONFIGURATION)
export SHUNT_MORF=$(jq -r .morf $SHUNT_CONFIGURATION)
echo Configured udmi shunt $SHUNT_NAME between $SHUNT_FROM and $SHUNT_MORF
echo Configured udmi shunt $SHUNT_NAME between $SHUNT_FROM $SHUNT_RECV and $SHUNT_MORF
else
echo No SHUNT_CONFIGURATION defined, so no SHUNT_NAME configured.
fi
Expand Down
2 changes: 1 addition & 1 deletion udmis/etc/prod_pod.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"enabled": "${SHUNT_NAME}",
"from": {
"hostname": "${SHUNT_FROM}",
"recv_id": "${SHUNT_NAME}-take",
"recv_id": "${SHUNT_NAME}-${SHUNT_RECV}",
"send_id": "${SHUNT_NAME}-put"
},
"morf": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.bos.udmi.service.messaging.MessagePipe;
import com.google.bos.udmi.service.pod.ContainerBase;
import com.google.bos.udmi.service.pod.UdmiServicePod;
import com.google.common.base.Strings;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
Expand Down Expand Up @@ -256,7 +257,12 @@ private void receiveException(Map<String, String> attributesMap, String messageS

private void sanitizeAttributeMap(Map<String, String> attributesMap) {
String subFolderRaw = attributesMap.get(SUBFOLDER_PROPERTY_KEY);
if (subFolderRaw != null) {
if (subFolderRaw == null) {
// Do nothing!
} else if (subFolderRaw.equals("")) {
debug("Coerced empty subFolder to undefined");
attributesMap.remove(SUBFOLDER_PROPERTY_KEY);
} else {
SubFolder subFolder = catchToElse(() -> SubFolder.fromValue(subFolderRaw), SubFolder.INVALID);
if (!subFolder.value().equals(subFolderRaw)) {
debug("Coerced subFolder " + subFolderRaw + " to " + subFolder.value());
Expand All @@ -265,10 +271,15 @@ private void sanitizeAttributeMap(Map<String, String> attributesMap) {
}

String subTypeRaw = attributesMap.get(SUBTYPE_PROPERTY_KEY);
if (subTypeRaw != null) {
if (subTypeRaw == null) {
// Do nothing!
} else if (subTypeRaw.equals("")) {
debug("Coerced empty subType to undefined");
attributesMap.remove(SUBTYPE_PROPERTY_KEY);
} else if (!Strings.isNullOrEmpty(subTypeRaw)) {
SubType subType = catchToElse(() -> SubType.fromValue(subTypeRaw), SubType.INVALID);
debug("Coerced subFolder " + subTypeRaw + " to " + subType.value());
if (!subType.value().equals(subTypeRaw)) {
debug("Coerced subFolder " + subTypeRaw + " to " + subType.value());
attributesMap.put(SUBTYPE_PROPERTY_KEY, subType.value());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.google.udmi.util.Common.SUBFOLDER_PROPERTY_KEY;
import static com.google.udmi.util.Common.SUBTYPE_PROPERTY_KEY;
import static com.google.udmi.util.GeneralUtils.CSV_JOINER;
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
import static com.google.udmi.util.GeneralUtils.ifNotNullGet;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
Expand Down Expand Up @@ -35,9 +36,11 @@
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -54,7 +57,7 @@ public class PubSubPipe extends MessageBase implements MessageReceiver {
public static final String EMULATOR_HOST = System.getenv(EMULATOR_HOST_ENV);
public static final String GCP_HOST = "gcp";
public static final String PS_TXN_PREFIX = "PS:";
private final Subscriber subscriber;
private final List<Subscriber> subscribers;
private final Publisher publisher;
private final String projectId;

Expand All @@ -67,15 +70,20 @@ public PubSubPipe(EndpointConfiguration configuration) {
"no project id defined in configuration as 'hostname'");
publisher = ifNotNullGet(variableSubstitution(configuration.send_id), this::getPublisher);
ifNotNullThen(publisher, this::checkPublisher);
subscriber = ifNotNullGet(variableSubstitution(configuration.recv_id), this::getSubscriber);
String subscriptionName = ifNotNullGet(subscriber, Subscriber::getSubscriptionNameString);
subscribers = ifNotNullGet(multiSubstitution(configuration.recv_id), this::getSubscribers);
String subscriptionNames = subscribers.stream().map(Subscriber::getSubscriptionNameString)
.collect(Collectors.joining(", "));
String topicName = ifNotNullGet(publisher, Publisher::getTopicNameString);
debug("PubSub %s s -> %s", super.toString(), subscriptionName, topicName);
debug("PubSub %s -> %s", super.toString(), subscriptionNames, topicName);
} catch (Exception e) {
throw new RuntimeException("While creating PubSub pipe", e);
}
}

private List<Subscriber> getSubscribers(Set<String> names) {
return names.stream().map(this::getSubscriber).toList();
}

private static void checkSubscription(ProjectSubscriptionName subscriptionName) {
try (SubscriptionAdminClient client = SubscriptionAdminClient.create()) {
client.getSubscription(subscriptionName).getAckDeadlineSeconds();
Expand Down Expand Up @@ -118,7 +126,7 @@ private void checkPublisher() {
@Override
public void activate(Consumer<Bundle> bundleConsumer) {
super.activate(bundleConsumer);
subscriber.startAsync();
subscribers.forEach(Subscriber::startAsync);
}

@Override
Expand Down Expand Up @@ -163,9 +171,13 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer reply) {
}
}

private void stopAndWait(Subscriber subscriber) {
subscriber.stopAsync().awaitTerminated();
}

@Override
public void shutdown() {
subscriber.stopAsync().awaitTerminated();
subscribers.forEach(this::stopAndWait);
super.shutdown();
}

Expand Down Expand Up @@ -210,8 +222,7 @@ public void failed(State from, Throwable failure) {
void resetForTest() {
super.resetForTest();
try {
subscriber.stopAsync();
subscriber.awaitTerminated();
subscribers.forEach(this::stopAndWait);
publisher.shutdown();
} catch (Exception e) {
throw new RuntimeException("While shutting down connections", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package com.google.bos.udmi.service.pod;

import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.GeneralUtils.ifNotTrueThen;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toSet;

import com.google.bos.udmi.service.core.ComponentName;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.udmi.util.JsonUtil;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.regex.MatchResult;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;
import udmi.schema.BasePodConfiguration;
Expand All @@ -31,7 +37,8 @@ public abstract class ContainerBase {
public static final String EMPTY_JSON = "{}";
public static final String REFLECT_BASE = "UDMI-REFLECT";
private static final ThreadLocal<String> executionContext = new ThreadLocal<>();
private static final Pattern VARIABLE_PATTERN = Pattern.compile("\\$\\{([A-Z_]+)\\}");
private static final Pattern VARIABLE_PATTERN = Pattern.compile("\\$\\{([A-Z_]+)}");
private static final Pattern MULTI_PATTERN = Pattern.compile("!\\{([,a-zA-Z_]+)}");
private static BasePodConfiguration basePodConfig = new BasePodConfiguration();
protected static String reflectRegistry = REFLECT_BASE;
protected final PodConfiguration podConfiguration;
Expand All @@ -55,8 +62,16 @@ public ContainerBase(PodConfiguration config) {
info("Configured with reflect registry " + reflectRegistry);
}

private static String environmentReplacer(MatchResult match) {
return ofNullable(System.getenv(match.group(1))).orElse("");
private String environmentReplacer(MatchResult match) {
String replacement = ofNullable(getEnv(match.group(1))).orElse("");
if (replacement.startsWith("!")) {
return format("!{%s}", replacement.substring(1));
}
return replacement;
}

protected String getEnv(String group) {
return System.getenv(group);
}

@TestOnly
Expand Down Expand Up @@ -84,6 +99,25 @@ protected synchronized String grabExecutionContext() {
return previous;
}

protected Set<String> multiSubstitution(String value) {
String raw = variableSubstitution(value);
if (raw == null) {
return ImmutableSet.of();
}
Matcher matcher = MULTI_PATTERN.matcher(raw);
if (!matcher.find()) {
return ImmutableSet.of(raw);
}
String group = matcher.group(1);
if (matcher.find()) {
throw new RuntimeException(format("Multi multi-expansions not supported: %s", raw));
}
String[] parts = group.split(",");
Set<String> expanded = Arrays.stream(parts).map(matcher::replaceFirst).collect(toSet());
expanded.forEach(set -> debug("Expanded intermediate %s with '%s'", raw, set));
return expanded;
}

protected String variableSubstitution(String value) {
if (value == null) {
return null;
Expand All @@ -94,7 +128,7 @@ protected String variableSubstitution(String value) {
protected String variableSubstitution(String value, @NotNull String nullMessage) {
requireNonNull(value, requireNonNull(nullMessage, "null message not defined"));
Matcher matcher = VARIABLE_PATTERN.matcher(value);
String out = matcher.replaceAll(ContainerBase::environmentReplacer);
String out = matcher.replaceAll(this::environmentReplacer);
ifNotTrueThen(value.equals(out), () -> debug("Replaced value %s with '%s'", value, out));
return out;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.bos.udmi.service.core.ProcessorTestBase;
import com.google.bos.udmi.service.pod.UdmiServicePod;
import java.io.File;
import org.jetbrains.annotations.NotNull;
import udmi.schema.EndpointConfiguration;
import udmi.schema.EndpointConfiguration.Protocol;
import udmi.schema.SetupUdmiConfig;
Expand Down Expand Up @@ -51,7 +52,7 @@ public static void writeVersionDeployFile() {
}
}

protected void augmentConfig(EndpointConfiguration configuration) {
protected void augmentConfig(@NotNull EndpointConfiguration configuration) {
configuration.protocol = Protocol.LOCAL;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.google.bos.udmi.service.pod;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
import org.junit.jupiter.api.Test;

class ContainerBaseTest {

private final Map<String, String> mockEnvMap = ImmutableMap.of(
"B", "X",
"C", "!Y,Z");

private ContainerBase getMockContainer() {
return spy(ContainerBase.class);
}

@Test
public void multiVariable() {
ContainerBase testContainer = getMockContainer();
when(testContainer.getEnv(anyString())).thenAnswer(
i -> mockEnvMap.get((String) i.getArgument(0)));
Set<String> strings = testContainer.multiSubstitution("A${A} B${B} C${C}");
ImmutableSet<String> expected = ImmutableSet.of("A BX CY", "A BX CZ");
assertEquals(expected, strings, "expanded multi-variable");
}
}

0 comments on commit 982b30a

Please sign in to comment.