From 18517ad3cc77890e7bdaffe5637515c905ca7452 Mon Sep 17 00:00:00 2001 From: Trevor Date: Tue, 15 Oct 2024 01:22:56 -0700 Subject: [PATCH] Fix sequencer startup sequence with alternate registry (#990) --- .../iot/core/proxy/IotReflectorClient.java | 103 +++++++++------ .../bos/iot/core/proxy/MockPublisher.java | 4 + .../bos/iot/core/proxy/MqttPublisher.java | 4 + .../bos/iot/core/proxy/NullPublisher.java | 4 + .../bos/iot/core/proxy/ProxyTarget.java | 6 +- .../daq/mqtt/sequencer/SequenceBase.java | 28 ++-- .../google/daq/mqtt/util/FileDataSink.java | 4 + .../daq/mqtt/util/IotReflectorClient.java | 1 + .../daq/mqtt/util/MessagePublisher.java | 2 + .../google/daq/mqtt/util/PubSubClient.java | 4 + .../mqtt/validator/MessageReadingClient.java | 4 + .../google/daq/mqtt/validator/Reflector.java | 1 + .../google/daq/mqtt/validator/Validator.java | 1 + .../com/google/udmi/util/PubSubReflector.java | 123 ++++++++++++------ 14 files changed, 194 insertions(+), 95 deletions(-) diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java index 9f4d04a67b..db17038ba2 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java @@ -20,6 +20,7 @@ import static com.google.udmi.util.GeneralUtils.getTimestamp; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; +import static com.google.udmi.util.GeneralUtils.ifTrueThen; import static com.google.udmi.util.GeneralUtils.isTrue; import static com.google.udmi.util.JsonUtil.asMap; import static com.google.udmi.util.JsonUtil.convertTo; @@ -51,6 +52,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -88,15 +90,16 @@ public class IotReflectorClient implements MessagePublisher { private static final int UPDATE_RETRIES = 6; private static final Collection COPY_IDS = ImmutableSet.of(DEVICE_ID_KEY, GATEWAY_ID_KEY, SUBTYPE_PROPERTY_KEY, SUBFOLDER_PROPERTY_KEY, TRANSACTION_KEY, PUBLISH_TIME_KEY); + public static final String TRANSACTION_ID_PREFIX = "RC:"; private static final String sessionId = format("%06x", (int) (Math.random() * 0x1000000L)); - private static final String TRANSACTION_ID_PREFIX = "RC:"; private static final String sessionPrefix = TRANSACTION_ID_PREFIX + sessionId + "."; private static final AtomicInteger sessionCounter = new AtomicInteger(); private static final long RESYNC_INTERVAL_SEC = 30; + private static final Map pubLatches = new ConcurrentHashMap<>(); + private static final Map pubCounts = new ConcurrentHashMap<>(); private final String udmiVersion; private final CountDownLatch initialConfigReceived = new CountDownLatch(1); private final CountDownLatch initializedStateSent = new CountDownLatch(1); - private final CountDownLatch validConfigReceived = new CountDownLatch(1); private final ScheduledExecutorService syncThread = Executors.newSingleThreadScheduledExecutor(); private final int requiredVersion; private final BlockingQueue messages = new LinkedBlockingQueue<>(); @@ -153,36 +156,12 @@ public IotReflectorClient(ExecutionConfiguration iotConfig, int requiredVersion, try { System.err.println("Instantiating reflector client " + clientId); publisher = MessagePublisher.from(iotConfig, this::messageHandler, this::errorHandler); + pubCounts.computeIfAbsent(publisher, key -> new AtomicInteger()); + pubLatches.computeIfAbsent(publisher, key -> new CountDownLatch(1)); } catch (Exception e) { throw new RuntimeException("While creating reflector client " + clientId, e); } subscriptionId = publisher.getSubscriptionId(); - System.err.println("Subscribed to " + subscriptionId); - - try { - System.err.println("Starting initial UDMI setup process"); - retries = updateVersion == null ? 1 : UPDATE_RETRIES; - while (validConfigReceived.getCount() > 0) { - setReflectorState(); - initializedStateSent.countDown(); - if (!validConfigReceived.await(CONFIG_TIMEOUT_SEC, TimeUnit.SECONDS)) { - retries--; - if (retries <= 0) { - throw new RuntimeException( - "Config sync timeout expired. Investigate UDMI cloud functions install.", - syncFailure); - } - } - } - - syncThread.scheduleAtFixedRate(this::setReflectorState, RESYNC_INTERVAL_SEC, - RESYNC_INTERVAL_SEC, TimeUnit.SECONDS); - - active = true; - } catch (Exception e) { - publisher.close(); - throw new RuntimeException("Waiting for initial config", e); - } } private boolean userMessageFilter(Envelope envelope) { @@ -219,13 +198,14 @@ public static ExecutionConfiguration makeReflectConfiguration(ExecutionConfigura * * @return new unique transaction id */ - public static synchronized String getNextTransactionId() { + public static String getNextTransactionId() { return format("%s%04d", sessionPrefix, sessionCounter.incrementAndGet()); } private void setReflectorState() { if (isInstallValid && expectedTxnId != null) { - System.err.println("Expected transaction still on the books, likely duplicate channel use"); + System.err.printf("Unexpected reflector transaction %s, likely duplicate channel use%n", + expectedTxnId); close(); throw new RuntimeException("Aborting due to missing transaction reply " + expectedTxnId); } @@ -249,7 +229,7 @@ private void setReflectorState() { if (isInstallValid) { debug("Sending UDMI reflector state: " + stringifyTerse(udmiState.setup)); } else { - System.err.println("Sending UDMI reflector state: " + stringify(map)); + info("Sending UDMI reflector state: " + stringify(map)); } publisher.publish(registryId, getReflectorTopic(), stringify(map)); @@ -388,14 +368,13 @@ private void ensureCloudSync(Map message) { boolean doesReplyMatch = reflectorConfig.reply.transaction_id.equals(expectedTxnId); if (!isInstallValid) { - System.err.println("Received UDMI reflector initial config: " + stringify(reflectorConfig)); + info("Received UDMI reflector initial config: " + stringify(reflectorConfig)); } else if (!shouldConsiderReply) { return; } else if (doesReplyMatch) { - debug("Received UDMI reflector matching config reply."); + debug("Received UDMI reflector matching config reply " + expectedTxnId); } else { - System.err.println( - "Received UDMI reflector mismatched config: " + stringifyTerse(reflectorConfig)); + info("Received UDMI reflector mismatched config: " + stringifyTerse(reflectorConfig)); close(); throw new IllegalStateException("There can (should) be only one instance on a channel"); } @@ -430,7 +409,7 @@ private void ensureCloudSync(Map message) { } isInstallValid = true; expectedTxnId = null; - validConfigReceived.countDown(); + pubLatches.get(publisher).countDown(); } else if (!updateMatch) { System.err.println("UDMI update version mismatch... waiting for retry..."); } else if (!timestampMatch) { @@ -450,6 +429,11 @@ private void debug(String message) { // TODO: Implement some kind of actual log-level control. } + private void info(String message) { + // TODO: Implement some kind of actual log-level control. + System.err.println(message); + } + private Envelope parseMessageTopic(String topic) { List parts = new ArrayList<>(Arrays.asList(topic.substring(1).split("/"))); String leader = parts.remove(0); @@ -503,6 +487,49 @@ public String getSubscriptionId() { return subscriptionId; } + @Override + public void activate() { + + try { + // Some publishers are shared, while others are unique, so handle accordingly. + if (pubCounts.get(publisher).getAndIncrement() > 0) { + ifTrueThen(pubLatches.get(publisher).getCount() > 0, + () -> System.err.println("Waiting for the other shoe to drop...")); + pubLatches.get(publisher).await(CONFIG_TIMEOUT_SEC, TimeUnit.SECONDS); + active = true; + isInstallValid = true; + return; + } + + publisher.activate(); + + System.err.println("Starting initial UDMI setup process"); + retries = updateVersion == null ? 1 : UPDATE_RETRIES; + while (pubLatches.get(publisher).getCount() > 0) { + setReflectorState(); + initializedStateSent.countDown(); + if (!pubLatches.get(publisher).await(CONFIG_TIMEOUT_SEC, TimeUnit.SECONDS)) { + retries--; + if (retries <= 0) { + throw new RuntimeException( + "Config sync timeout expired. Investigate UDMI cloud functions install.", + syncFailure); + } + } + } + + syncThread.scheduleAtFixedRate(this::setReflectorState, RESYNC_INTERVAL_SEC, + RESYNC_INTERVAL_SEC, TimeUnit.SECONDS); + + System.err.println("Subscribed to " + subscriptionId); + + active = true; + } catch (Exception e) { + close(); + throw new RuntimeException("Waiting for initial config", e); + } + } + @Override public boolean isActive() { return active; @@ -544,9 +571,7 @@ public String publish(String deviceId, String topic, String data) { public void close() { active = false; syncThread.shutdown(); - if (publisher != null) { - publisher.close(); - } + ifTrueThen(pubCounts.get(publisher).decrementAndGet() == 0, publisher::close); } @Override diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java b/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java index 9161371066..c6096d5bc3 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java @@ -50,6 +50,10 @@ public String getSubscriptionId() { return SiteModel.MOCK_PROJECT; } + @Override + public void activate() { + } + @Override public boolean isActive() { return active; diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java index 8bb70904b9..35fda00d75 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java @@ -354,6 +354,10 @@ public String getSubscriptionId() { return mqttClient.getClientId(); } + @Override + public void activate() { + } + @Override public boolean isActive() { return mqttClient.isConnected(); diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/NullPublisher.java b/validator/src/main/java/com/google/bos/iot/core/proxy/NullPublisher.java index 980e074e75..53094e3a64 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/NullPublisher.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/NullPublisher.java @@ -29,6 +29,10 @@ public String getSubscriptionId() { return null; } + @Override + public void activate() { + } + @Override public boolean isActive() { return false; diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/ProxyTarget.java b/validator/src/main/java/com/google/bos/iot/core/proxy/ProxyTarget.java index 97d97356e3..5e26a218e1 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/ProxyTarget.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/ProxyTarget.java @@ -1,6 +1,7 @@ package com.google.bos.iot.core.proxy; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.MetadataMapKeys.UDMI_METADATA; import com.fasterxml.jackson.annotation.JsonInclude.Include; @@ -10,6 +11,7 @@ import com.google.api.services.cloudiot.v1.model.Device; import com.google.cloud.ServiceOptions; import com.google.daq.mqtt.util.MessagePublisher; +import com.google.udmi.util.GeneralUtils; import java.time.Duration; import java.time.LocalDateTime; import java.util.Base64; @@ -132,9 +134,7 @@ boolean hasMqttPublisher(String deviceId) { void clearMqttPublisher(String deviceId) { info("Publishers remove " + deviceId); MessagePublisher publisher = messagePublishers.remove(deviceId); - if (publisher != null) { - publisher.close(); - } + ifNotNullThen(publisher, publisher::close); } private MessagePublisher newMqttPublisher(String deviceId) { diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java index 36c00a4e5b..23f966b732 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java @@ -361,9 +361,8 @@ private static void setupSequencer() { System.err.printf("Loading reflector key file from %s%n", new File(key_file).getAbsolutePath()); System.err.printf("Validating against device %s serial %s%n", getDeviceId(), serialNo); - client = getPublisherClient(); + client = checkNotNull(getPublisherClient(), "primary client not created"); sessionPrefix = client.getSessionPrefix(); - ifNotNullThen(validationState, state -> state.cloud_version = client.getVersionInformation()); String udmiNamespace = exeConfig.udmi_namespace; String altRegistryId = exeConfig.alt_registry; @@ -743,9 +742,7 @@ private Config sanitizeConfig(Config config) { */ @Before public void setUp() { - if (activeInstance == null) { - throw new IllegalStateException("Active sequencer instance not setup, aborting"); - } + checkNotNull(activeInstance, "Active sequencer instance not setup, aborting"); assumeTrue(format("Feature bucket %s not enabled", testBucket.key()), isBucketEnabled(testBucket)); @@ -1629,12 +1626,14 @@ MessageBundle nextMessageBundle() { stashedBundle = null; return bundle; } - if (!reflector().isActive()) { - throw new RuntimeException("Trying to receive message from inactive client"); + MessagePublisher reflector = reflector(); + if (!reflector.isActive()) { + throw new RuntimeException( + "Trying to receive message from inactive client " + reflector.getSubscriptionId()); } final MessageBundle bundle; try { - bundle = reflector().takeNextMessage(QuerySpeed.SHORT); + bundle = reflector.takeNextMessage(QuerySpeed.SHORT); } catch (Exception e) { throw new AbortMessageLoop("Exception receiving message", e); } @@ -1963,7 +1962,7 @@ private String configIsPending(boolean debugOut) { } return ifNotTrueGet(synced, () -> format("waiting for last_start %s, transactions %s, last_config %s", - !lastConfigSynced, + !lastStartSynced, !transactionsClean, !lastConfigSynced)); } @@ -2068,10 +2067,12 @@ protected void withAlternateClient(Runnable evaluator) { checkState(deviceConfig.system.testing.endpoint_type == null, "endpoint type not null"); try { useAlternateClient = true; + warning("Now using alternate connection client!"); deviceConfig.system.testing.endpoint_type = "alternate"; whileDoing("using alternate client", evaluator); } finally { useAlternateClient = false; + warning("Done with alternate connection client!"); catchToNull(() -> deviceConfig.system.testing.endpoint_type = null); } } @@ -2424,8 +2425,16 @@ protected void starting(@NotNull Description description) { sequenceMd = new PrintWriter(newOutputStream(new File(testDir, SEQUENCE_MD).toPath())); putSequencerResult(description, SequenceResult.START); + + client.activate(); + ifNotNullThen(validationState, + state -> state.cloud_version = client.getVersionInformation()); + + ifNotNullThen(altClient, IotReflectorClient::activate); checkState(reflector().isActive(), "Reflector is not currently active"); + activeInstance = SequenceBase.this; + validateTestSpecification(description); writeSequenceMdHeader(); @@ -2437,7 +2446,6 @@ protected void starting(@NotNull Description description) { startTestTimeMs = System.currentTimeMillis(); notice("starting test " + testName + " " + START_END_MARKER); - activeInstance = SequenceBase.this; } catch (IllegalArgumentException e) { putSequencerResult(description, ERRR); recordCompletion(ERRR, description, friendlyStackTrace(e)); diff --git a/validator/src/main/java/com/google/daq/mqtt/util/FileDataSink.java b/validator/src/main/java/com/google/daq/mqtt/util/FileDataSink.java index 630581d23f..b5f0bf5d4f 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/FileDataSink.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/FileDataSink.java @@ -106,6 +106,10 @@ public String getSubscriptionId() { return reportFile.getAbsolutePath(); } + @Override + public void activate() { + } + @Override public boolean isActive() { return true; diff --git a/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java b/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java index a6ddb4fb3e..4234dcc561 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java @@ -71,6 +71,7 @@ public IotReflectorClient(ExecutionConfiguration executionConfiguration, String executionConfiguration.key_file = siteModel.validatorKey(); messageClient = new com.google.bos.iot.core.proxy.IotReflectorClient(executionConfiguration, Validator.TOOLS_FUNCTIONS_VERSION, toolName); + messageClient.activate(); sessionPrefix = messageClient.getSessionPrefix(); executor.execute(this::processReplies); isSlow = siteModel.getDeviceIds().size() > SLOW_QUERY_THRESHOLD; diff --git a/validator/src/main/java/com/google/daq/mqtt/util/MessagePublisher.java b/validator/src/main/java/com/google/daq/mqtt/util/MessagePublisher.java index 87e7185849..568498d821 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/MessagePublisher.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/MessagePublisher.java @@ -51,6 +51,8 @@ static MessagePublisher from(ExecutionConfiguration iotConfig, String getSubscriptionId(); + void activate(); + boolean isActive(); MessageBundle takeNextMessage(QuerySpeed querySpeed); diff --git a/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java b/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java index 015c82297d..8b8849e89e 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java @@ -210,6 +210,10 @@ private SeekRequest getCurrentTimeSeekRequest(String subscription) { return SeekRequest.newBuilder().setSubscription(subscription).setTime(timestamp).build(); } + @Override + public void activate() { + } + /** * Check if the client is active. * diff --git a/validator/src/main/java/com/google/daq/mqtt/validator/MessageReadingClient.java b/validator/src/main/java/com/google/daq/mqtt/validator/MessageReadingClient.java index 4ce344d688..3477517c36 100644 --- a/validator/src/main/java/com/google/daq/mqtt/validator/MessageReadingClient.java +++ b/validator/src/main/java/com/google/daq/mqtt/validator/MessageReadingClient.java @@ -170,6 +170,10 @@ public String getSubscriptionId() { return messageDir.getAbsolutePath(); } + @Override + public void activate() { + } + @Override public boolean isActive() { return isActive; diff --git a/validator/src/main/java/com/google/daq/mqtt/validator/Reflector.java b/validator/src/main/java/com/google/daq/mqtt/validator/Reflector.java index d34d5b12f6..cfc971e104 100644 --- a/validator/src/main/java/com/google/daq/mqtt/validator/Reflector.java +++ b/validator/src/main/java/com/google/daq/mqtt/validator/Reflector.java @@ -136,6 +136,7 @@ private void initialize() { executionConfiguration.udmi_version = Common.getUdmiVersion(); client = new IotReflectorClient(executionConfiguration, TOOLS_FUNCTIONS_VERSION, REFLECTOR_TOOL_NAME); + client.activate(); } private List parseArgs(List argsList) { diff --git a/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java b/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java index d8ce0ae30e..f6c2a54985 100644 --- a/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java +++ b/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java @@ -531,6 +531,7 @@ private void validateReflector() { client = new IotReflectorClient(config, TOOLS_FUNCTIONS_VERSION, VALIDATOR_TOOL_NAME, this::messageFilter); dataSinks.add(client); + client.activate(); } private boolean messageFilter(Envelope envelope) { diff --git a/validator/src/main/java/com/google/udmi/util/PubSubReflector.java b/validator/src/main/java/com/google/udmi/util/PubSubReflector.java index 1d682ecbb4..3e656e1603 100644 --- a/validator/src/main/java/com/google/udmi/util/PubSubReflector.java +++ b/validator/src/main/java/com/google/udmi/util/PubSubReflector.java @@ -2,9 +2,11 @@ import static com.google.api.client.util.Preconditions.checkNotNull; import static com.google.bos.iot.core.proxy.ProxyTarget.STATE_TOPIC; +import static com.google.common.base.Preconditions.checkState; import static com.google.udmi.util.Common.CATEGORY_PROPERTY_KEY; import static com.google.udmi.util.Common.DEVICE_ID_KEY; import static com.google.udmi.util.Common.PUBLISH_TIME_KEY; +import static com.google.udmi.util.Common.REGISTRY_ID_PROPERTY_KEY; import static com.google.udmi.util.Common.SOURCE_KEY; import static com.google.udmi.util.Common.SOURCE_SEPARATOR; import static com.google.udmi.util.Common.SOURCE_SEPARATOR_REGEX; @@ -12,9 +14,11 @@ import static com.google.udmi.util.Common.getNamespacePrefix; import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; +import static com.google.udmi.util.GeneralUtils.ifNullThen; import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.stringify; import static com.google.udmi.util.JsonUtil.toStringMap; +import static com.google.udmi.util.SiteModel.DEFAULT_GBOS_HOSTNAME; import static java.lang.String.format; import static java.time.Instant.ofEpochSecond; import static java.util.Objects.requireNonNull; @@ -45,7 +49,9 @@ import java.util.HashMap; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import org.jetbrains.annotations.Nullable; @@ -61,17 +67,25 @@ public class PubSubReflector implements MessagePublisher { private static final String CONNECT_ERROR_FORMAT = "While connecting to project %s"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .enable(SerializationFeature.INDENT_OUTPUT) - .setSerializationInclusion(Include.NON_NULL); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().enable( + SerializationFeature.INDENT_OUTPUT).setSerializationInclusion(Include.NON_NULL); private static final String SUBSCRIPTION_ERROR_FORMAT = "While accessing subscription %s"; private static final long SUBSCRIPTION_RACE_DELAY_MS = 10000; private static final String WAS_BASE_64 = "wasBase64"; - public static final String UDMI_REFLECT_TOPIC = "udmi_reflect"; + private static final String UDMI_REFLECT_TOPIC = "udmi_reflect"; private static final String UDMI_REPLY_TOPIC = "udmi_reply"; public static final String USER_NAME_DEFAULT = "debug"; + private static final AtomicInteger sessionCount = new AtomicInteger(); + private static String subscriptionId; + private static PubSubReflector reflectorClient; + private static final Map> messageHandlers + = new ConcurrentHashMap<>(); + private static final Map> errorHandlers = new ConcurrentHashMap<>(); + private static BiConsumer defaultMessageHandler; + private static Consumer defaultErrorHandler; + private final AtomicBoolean activated = new AtomicBoolean(); private final AtomicBoolean active = new AtomicBoolean(); private final long startTimeSec = System.currentTimeMillis() / 1000; @@ -82,8 +96,6 @@ public class PubSubReflector implements MessagePublisher { private final Publisher publisher; private final boolean flushSubscription; private final String userName; - private BiConsumer messageHandler; - private Consumer errorHandler; /** * Create a new PubSub client. @@ -94,8 +106,8 @@ public class PubSubReflector implements MessagePublisher { * @param userName user id running this operation * @param subscriptionId target subscription name */ - public PubSubReflector(String projectId, String registryId, String updateTopic, - String userName, String subscriptionId) { + public PubSubReflector(String projectId, String registryId, String updateTopic, String userName, + String subscriptionId) { this(projectId, registryId, updateTopic, userName, subscriptionId, true); } @@ -109,9 +121,10 @@ public PubSubReflector(String projectId, String registryId, String updateTopic, * @param subscriptionId target subscription name * @param reset if the connection should be reset before use */ - public PubSubReflector(String projectId, String registryId, String updateTopic, - String userName, String subscriptionId, boolean reset) { + public PubSubReflector(String projectId, String registryId, String updateTopic, String userName, + String subscriptionId, boolean reset) { try { + checkState(sessionCount.incrementAndGet() == 1, "multiple internal sessions not supported"); this.projectId = checkNotNull(projectId, "project id not defined"); this.registryId = registryId; ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, @@ -144,31 +157,42 @@ public PubSubReflector(String projectId, String registryId, String updateTopic, */ public static MessagePublisher from(ExecutionConfiguration iotConfig, BiConsumer messageHandler, Consumer errorHandler) { - String projectId = requireNonNull(iotConfig.project_id, "missing project id"); String registryActual = SiteModel.getRegistryActual(iotConfig); - ExecutionConfiguration reflectorConfig = IotReflectorClient.makeReflectConfiguration(iotConfig, - registryActual); - String registryId = MessagePublisher.getRegistryId(reflectorConfig); String namespacePrefix = getNamespacePrefix(iotConfig.udmi_namespace); - String topicId = namespacePrefix + UDMI_REFLECT_TOPIC; String userName = ofNullable(iotConfig.user_name).orElse(USER_NAME_DEFAULT); - String subscriptionId = namespacePrefix + UDMI_REPLY_TOPIC + SOURCE_SEPARATOR + userName; + String newId = namespacePrefix + UDMI_REPLY_TOPIC + SOURCE_SEPARATOR + userName; + checkState(subscriptionId == null || subscriptionId.equals(newId), + "unsupported difference in subscription id"); + subscriptionId = newId; + checkState(messageHandlers.put(registryActual, messageHandler) == null, "duplicate handler"); + checkState(errorHandlers.put(registryActual, errorHandler) == null, "duplicate handler"); - PubSubReflector reflector = new PubSubReflector(projectId, registryId, topicId, userName, - subscriptionId); - reflector.activate(messageHandler, errorHandler); - return reflector; + ifNullThen(reflectorClient, () -> { + defaultMessageHandler = messageHandler; + defaultErrorHandler = errorHandler; + ExecutionConfiguration reflectorConfig = IotReflectorClient.makeReflectConfiguration( + iotConfig, registryActual); + String registryId = MessagePublisher.getRegistryId(reflectorConfig); + String projectId = requireNonNull(iotConfig.project_id, "missing project id"); + String topicId = namespacePrefix + UDMI_REFLECT_TOPIC; + reflectorClient = new PubSubReflector(projectId, registryId, topicId, userName, + subscriptionId); + }); + + return reflectorClient; } - private void activate(BiConsumer messageHandler, - Consumer errorHandler) { - this.messageHandler = messageHandler; - this.errorHandler = errorHandler; - subscriber.startAsync().awaitRunning(); - active.set(true); + /** + * Activate this instance if it hasn't already been activated. + */ + public void activate() { + if (!activated.getAndSet(true)) { + subscriber.startAsync().awaitRunning(); + active.set(true); - // TODO: Make this trigger both the config & state to be queried. - // publish(UDMI_REFLECT, UPDATE_QUERY_TOPIC, EMPTY_JSON); + // TODO: Make this trigger both the config & state to be queried. + // publish(UDMI_REFLECT, UPDATE_QUERY_TOPIC, EMPTY_JSON); + } } private SeekRequest getCurrentTimeSeekRequest(String subscription) { @@ -195,8 +219,7 @@ public MessageBundle takeNextMessage(QuerySpeed speed) { private MessageBundle processMessage(PubsubMessage message) { long seconds = message.getPublishTime().getSeconds(); if (flushSubscription && seconds < startTimeSec) { - System.err.printf("Flushing outdated message from %d seconds ago%n", - startTimeSec - seconds); + System.err.printf("Flushing outdated message from %d seconds ago%n", startTimeSec - seconds); return null; } byte[] rawData = message.getData().toByteArray(); @@ -209,8 +232,8 @@ private MessageBundle processMessage(PubsubMessage message) { } Map asMap; try { - @SuppressWarnings("unchecked") - Map dataMap = OBJECT_MAPPER.readValue(data, TreeMap.class); + @SuppressWarnings("unchecked") Map dataMap = OBJECT_MAPPER.readValue(data, + TreeMap.class); asMap = dataMap; } catch (JsonProcessingException e) { asMap = new ErrorContainer(e, getSubscriptionId(), GeneralUtils.getTimestamp()); @@ -244,10 +267,8 @@ public String publish(String deviceId, String topic, String data) { envelope.source = Common.SOURCE_SEPARATOR + userName; envelope.subFolder = STATE_TOPIC.equals(topic) ? null : SubFolder.UDMI; Map map = toStringMap(envelope); - PubsubMessage message = PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8(data)) - .putAllAttributes(map) - .build(); + PubsubMessage message = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)) + .putAllAttributes(map).build(); ApiFuture publish = publisher.publish(message); publish.get(); // Wait for publish to complete. } catch (Exception e) { @@ -284,8 +305,7 @@ private void resetSubscription(ProjectSubscriptionName subscriptionName) { } catch (NotFoundException e) { throw new RuntimeException("Missing subscription for " + subscriptionName); } catch (Exception e) { - throw new RuntimeException( - format(SUBSCRIPTION_ERROR_FORMAT, subscriptionName), e); + throw new RuntimeException(format(SUBSCRIPTION_ERROR_FORMAT, subscriptionName), e); } } @@ -296,26 +316,42 @@ public SetupUdmiConfig getVersionInformation() { return setupUdmiConfig; } + @Override + public String getBridgeHost() { + // TODO: Figure out how to properly determine the endpoint connection host. + return DEFAULT_GBOS_HOSTNAME; + } + private class MessageProcessor implements MessageReceiver { @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { + final MessageBundle messageBundle; + String deviceRegistryId; try { consumer.ack(); - MessageBundle messageBundle = processMessage(message); + messageBundle = processMessage(message); if (messageBundle == null) { return; } + // Since this is a reflector, pull the registry id from the device id field. + deviceRegistryId = requireNonNull(messageBundle.attributes.get(DEVICE_ID_KEY)); + } catch (Exception e) { + defaultErrorHandler.accept(e); + return; + } + + try { Map attributes = messageBundle.attributes; String subFolder = attributes.get(SUBFOLDER_PROPERTY_KEY); String suffix = ofNullable(subFolder).map(folder -> "/" + folder).orElse(""); String topic = format("/devices/%s/%s%s", attributes.get(DEVICE_ID_KEY), - attributes.get(CATEGORY_PROPERTY_KEY), - suffix); + attributes.get(CATEGORY_PROPERTY_KEY), suffix); String messageSource = attributes.remove(SOURCE_KEY); if (messageSource == null) { return; } + Object dstSource = messageBundle.message.remove(SOURCE_KEY); String[] source = messageSource.split(SOURCE_SEPARATOR_REGEX, 3); if (source.length == 1) { @@ -326,9 +362,10 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { } else { System.err.println("Discarding message with malformed source: " + messageSource); } - messageHandler.accept(topic, stringify(messageBundle.message)); + ofNullable(messageHandlers.get(deviceRegistryId)).orElse(defaultMessageHandler) + .accept(topic, stringify(messageBundle.message)); } catch (Exception e) { - errorHandler.accept(e); + ofNullable(errorHandlers.get(deviceRegistryId)).orElse(defaultErrorHandler).accept(e); } } }