Skip to content

Commit

Permalink
Fix sequencer startup sequence with alternate registry (faucetsdn#990)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu authored Oct 15, 2024
1 parent 0b0b15b commit 18517ad
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.udmi.util.GeneralUtils.getTimestamp;
import static com.google.udmi.util.GeneralUtils.ifNotNullGet;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
import static com.google.udmi.util.GeneralUtils.ifTrueThen;
import static com.google.udmi.util.GeneralUtils.isTrue;
import static com.google.udmi.util.JsonUtil.asMap;
import static com.google.udmi.util.JsonUtil.convertTo;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -88,15 +90,16 @@ public class IotReflectorClient implements MessagePublisher {
private static final int UPDATE_RETRIES = 6;
private static final Collection<String> COPY_IDS = ImmutableSet.of(DEVICE_ID_KEY, GATEWAY_ID_KEY,
SUBTYPE_PROPERTY_KEY, SUBFOLDER_PROPERTY_KEY, TRANSACTION_KEY, PUBLISH_TIME_KEY);
public static final String TRANSACTION_ID_PREFIX = "RC:";
private static final String sessionId = format("%06x", (int) (Math.random() * 0x1000000L));
private static final String TRANSACTION_ID_PREFIX = "RC:";
private static final String sessionPrefix = TRANSACTION_ID_PREFIX + sessionId + ".";
private static final AtomicInteger sessionCounter = new AtomicInteger();
private static final long RESYNC_INTERVAL_SEC = 30;
private static final Map<MessagePublisher, CountDownLatch> pubLatches = new ConcurrentHashMap<>();
private static final Map<MessagePublisher, AtomicInteger> pubCounts = new ConcurrentHashMap<>();
private final String udmiVersion;
private final CountDownLatch initialConfigReceived = new CountDownLatch(1);
private final CountDownLatch initializedStateSent = new CountDownLatch(1);
private final CountDownLatch validConfigReceived = new CountDownLatch(1);
private final ScheduledExecutorService syncThread = Executors.newSingleThreadScheduledExecutor();
private final int requiredVersion;
private final BlockingQueue<Validator.MessageBundle> messages = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -153,36 +156,12 @@ public IotReflectorClient(ExecutionConfiguration iotConfig, int requiredVersion,
try {
System.err.println("Instantiating reflector client " + clientId);
publisher = MessagePublisher.from(iotConfig, this::messageHandler, this::errorHandler);
pubCounts.computeIfAbsent(publisher, key -> new AtomicInteger());
pubLatches.computeIfAbsent(publisher, key -> new CountDownLatch(1));
} catch (Exception e) {
throw new RuntimeException("While creating reflector client " + clientId, e);
}
subscriptionId = publisher.getSubscriptionId();
System.err.println("Subscribed to " + subscriptionId);

try {
System.err.println("Starting initial UDMI setup process");
retries = updateVersion == null ? 1 : UPDATE_RETRIES;
while (validConfigReceived.getCount() > 0) {
setReflectorState();
initializedStateSent.countDown();
if (!validConfigReceived.await(CONFIG_TIMEOUT_SEC, TimeUnit.SECONDS)) {
retries--;
if (retries <= 0) {
throw new RuntimeException(
"Config sync timeout expired. Investigate UDMI cloud functions install.",
syncFailure);
}
}
}

syncThread.scheduleAtFixedRate(this::setReflectorState, RESYNC_INTERVAL_SEC,
RESYNC_INTERVAL_SEC, TimeUnit.SECONDS);

active = true;
} catch (Exception e) {
publisher.close();
throw new RuntimeException("Waiting for initial config", e);
}
}

private boolean userMessageFilter(Envelope envelope) {
Expand Down Expand Up @@ -219,13 +198,14 @@ public static ExecutionConfiguration makeReflectConfiguration(ExecutionConfigura
*
* @return new unique transaction id
*/
public static synchronized String getNextTransactionId() {
public static String getNextTransactionId() {
return format("%s%04d", sessionPrefix, sessionCounter.incrementAndGet());
}

private void setReflectorState() {
if (isInstallValid && expectedTxnId != null) {
System.err.println("Expected transaction still on the books, likely duplicate channel use");
System.err.printf("Unexpected reflector transaction %s, likely duplicate channel use%n",
expectedTxnId);
close();
throw new RuntimeException("Aborting due to missing transaction reply " + expectedTxnId);
}
Expand All @@ -249,7 +229,7 @@ private void setReflectorState() {
if (isInstallValid) {
debug("Sending UDMI reflector state: " + stringifyTerse(udmiState.setup));
} else {
System.err.println("Sending UDMI reflector state: " + stringify(map));
info("Sending UDMI reflector state: " + stringify(map));
}

publisher.publish(registryId, getReflectorTopic(), stringify(map));
Expand Down Expand Up @@ -388,14 +368,13 @@ private void ensureCloudSync(Map<String, Object> message) {
boolean doesReplyMatch = reflectorConfig.reply.transaction_id.equals(expectedTxnId);

if (!isInstallValid) {
System.err.println("Received UDMI reflector initial config: " + stringify(reflectorConfig));
info("Received UDMI reflector initial config: " + stringify(reflectorConfig));
} else if (!shouldConsiderReply) {
return;
} else if (doesReplyMatch) {
debug("Received UDMI reflector matching config reply.");
debug("Received UDMI reflector matching config reply " + expectedTxnId);
} else {
System.err.println(
"Received UDMI reflector mismatched config: " + stringifyTerse(reflectorConfig));
info("Received UDMI reflector mismatched config: " + stringifyTerse(reflectorConfig));
close();
throw new IllegalStateException("There can (should) be only one instance on a channel");
}
Expand Down Expand Up @@ -430,7 +409,7 @@ private void ensureCloudSync(Map<String, Object> message) {
}
isInstallValid = true;
expectedTxnId = null;
validConfigReceived.countDown();
pubLatches.get(publisher).countDown();
} else if (!updateMatch) {
System.err.println("UDMI update version mismatch... waiting for retry...");
} else if (!timestampMatch) {
Expand All @@ -450,6 +429,11 @@ private void debug(String message) {
// TODO: Implement some kind of actual log-level control.
}

private void info(String message) {
// TODO: Implement some kind of actual log-level control.
System.err.println(message);
}

private Envelope parseMessageTopic(String topic) {
List<String> parts = new ArrayList<>(Arrays.asList(topic.substring(1).split("/")));
String leader = parts.remove(0);
Expand Down Expand Up @@ -503,6 +487,49 @@ public String getSubscriptionId() {
return subscriptionId;
}

@Override
public void activate() {

try {
// Some publishers are shared, while others are unique, so handle accordingly.
if (pubCounts.get(publisher).getAndIncrement() > 0) {
ifTrueThen(pubLatches.get(publisher).getCount() > 0,
() -> System.err.println("Waiting for the other shoe to drop..."));
pubLatches.get(publisher).await(CONFIG_TIMEOUT_SEC, TimeUnit.SECONDS);
active = true;
isInstallValid = true;
return;
}

publisher.activate();

System.err.println("Starting initial UDMI setup process");
retries = updateVersion == null ? 1 : UPDATE_RETRIES;
while (pubLatches.get(publisher).getCount() > 0) {
setReflectorState();
initializedStateSent.countDown();
if (!pubLatches.get(publisher).await(CONFIG_TIMEOUT_SEC, TimeUnit.SECONDS)) {
retries--;
if (retries <= 0) {
throw new RuntimeException(
"Config sync timeout expired. Investigate UDMI cloud functions install.",
syncFailure);
}
}
}

syncThread.scheduleAtFixedRate(this::setReflectorState, RESYNC_INTERVAL_SEC,
RESYNC_INTERVAL_SEC, TimeUnit.SECONDS);

System.err.println("Subscribed to " + subscriptionId);

active = true;
} catch (Exception e) {
close();
throw new RuntimeException("Waiting for initial config", e);
}
}

@Override
public boolean isActive() {
return active;
Expand Down Expand Up @@ -544,9 +571,7 @@ public String publish(String deviceId, String topic, String data) {
public void close() {
active = false;
syncThread.shutdown();
if (publisher != null) {
publisher.close();
}
ifTrueThen(pubCounts.get(publisher).decrementAndGet() == 0, publisher::close);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public String getSubscriptionId() {
return SiteModel.MOCK_PROJECT;
}

@Override
public void activate() {
}

@Override
public boolean isActive() {
return active;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ public String getSubscriptionId() {
return mqttClient.getClientId();
}

@Override
public void activate() {
}

@Override
public boolean isActive() {
return mqttClient.isConnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public String getSubscriptionId() {
return null;
}

@Override
public void activate() {
}

@Override
public boolean isActive() {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.google.bos.iot.core.proxy;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
import static com.google.udmi.util.MetadataMapKeys.UDMI_METADATA;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
Expand All @@ -10,6 +11,7 @@
import com.google.api.services.cloudiot.v1.model.Device;
import com.google.cloud.ServiceOptions;
import com.google.daq.mqtt.util.MessagePublisher;
import com.google.udmi.util.GeneralUtils;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Base64;
Expand Down Expand Up @@ -132,9 +134,7 @@ boolean hasMqttPublisher(String deviceId) {
void clearMqttPublisher(String deviceId) {
info("Publishers remove " + deviceId);
MessagePublisher publisher = messagePublishers.remove(deviceId);
if (publisher != null) {
publisher.close();
}
ifNotNullThen(publisher, publisher::close);
}

private MessagePublisher newMqttPublisher(String deviceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,8 @@ private static void setupSequencer() {

System.err.printf("Loading reflector key file from %s%n", new File(key_file).getAbsolutePath());
System.err.printf("Validating against device %s serial %s%n", getDeviceId(), serialNo);
client = getPublisherClient();
client = checkNotNull(getPublisherClient(), "primary client not created");
sessionPrefix = client.getSessionPrefix();
ifNotNullThen(validationState, state -> state.cloud_version = client.getVersionInformation());

String udmiNamespace = exeConfig.udmi_namespace;
String altRegistryId = exeConfig.alt_registry;
Expand Down Expand Up @@ -743,9 +742,7 @@ private Config sanitizeConfig(Config config) {
*/
@Before
public void setUp() {
if (activeInstance == null) {
throw new IllegalStateException("Active sequencer instance not setup, aborting");
}
checkNotNull(activeInstance, "Active sequencer instance not setup, aborting");

assumeTrue(format("Feature bucket %s not enabled", testBucket.key()),
isBucketEnabled(testBucket));
Expand Down Expand Up @@ -1629,12 +1626,14 @@ MessageBundle nextMessageBundle() {
stashedBundle = null;
return bundle;
}
if (!reflector().isActive()) {
throw new RuntimeException("Trying to receive message from inactive client");
MessagePublisher reflector = reflector();
if (!reflector.isActive()) {
throw new RuntimeException(
"Trying to receive message from inactive client " + reflector.getSubscriptionId());
}
final MessageBundle bundle;
try {
bundle = reflector().takeNextMessage(QuerySpeed.SHORT);
bundle = reflector.takeNextMessage(QuerySpeed.SHORT);
} catch (Exception e) {
throw new AbortMessageLoop("Exception receiving message", e);
}
Expand Down Expand Up @@ -1963,7 +1962,7 @@ private String configIsPending(boolean debugOut) {
}
return ifNotTrueGet(synced,
() -> format("waiting for last_start %s, transactions %s, last_config %s",
!lastConfigSynced,
!lastStartSynced,
!transactionsClean, !lastConfigSynced));
}

Expand Down Expand Up @@ -2068,10 +2067,12 @@ protected void withAlternateClient(Runnable evaluator) {
checkState(deviceConfig.system.testing.endpoint_type == null, "endpoint type not null");
try {
useAlternateClient = true;
warning("Now using alternate connection client!");
deviceConfig.system.testing.endpoint_type = "alternate";
whileDoing("using alternate client", evaluator);
} finally {
useAlternateClient = false;
warning("Done with alternate connection client!");
catchToNull(() -> deviceConfig.system.testing.endpoint_type = null);
}
}
Expand Down Expand Up @@ -2424,8 +2425,16 @@ protected void starting(@NotNull Description description) {
sequenceMd = new PrintWriter(newOutputStream(new File(testDir, SEQUENCE_MD).toPath()));

putSequencerResult(description, SequenceResult.START);

client.activate();
ifNotNullThen(validationState,
state -> state.cloud_version = client.getVersionInformation());

ifNotNullThen(altClient, IotReflectorClient::activate);
checkState(reflector().isActive(), "Reflector is not currently active");

activeInstance = SequenceBase.this;

validateTestSpecification(description);

writeSequenceMdHeader();
Expand All @@ -2437,7 +2446,6 @@ protected void starting(@NotNull Description description) {
startTestTimeMs = System.currentTimeMillis();
notice("starting test " + testName + " " + START_END_MARKER);

activeInstance = SequenceBase.this;
} catch (IllegalArgumentException e) {
putSequencerResult(description, ERRR);
recordCompletion(ERRR, description, friendlyStackTrace(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public String getSubscriptionId() {
return reportFile.getAbsolutePath();
}

@Override
public void activate() {
}

@Override
public boolean isActive() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public IotReflectorClient(ExecutionConfiguration executionConfiguration, String
executionConfiguration.key_file = siteModel.validatorKey();
messageClient = new com.google.bos.iot.core.proxy.IotReflectorClient(executionConfiguration,
Validator.TOOLS_FUNCTIONS_VERSION, toolName);
messageClient.activate();
sessionPrefix = messageClient.getSessionPrefix();
executor.execute(this::processReplies);
isSlow = siteModel.getDeviceIds().size() > SLOW_QUERY_THRESHOLD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ static MessagePublisher from(ExecutionConfiguration iotConfig,

String getSubscriptionId();

void activate();

boolean isActive();

MessageBundle takeNextMessage(QuerySpeed querySpeed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ private SeekRequest getCurrentTimeSeekRequest(String subscription) {
return SeekRequest.newBuilder().setSubscription(subscription).setTime(timestamp).build();
}

@Override
public void activate() {
}

/**
* Check if the client is active.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ public String getSubscriptionId() {
return messageDir.getAbsolutePath();
}

@Override
public void activate() {
}

@Override
public boolean isActive() {
return isActive;
Expand Down
Loading

0 comments on commit 18517ad

Please sign in to comment.