Skip to content

Commit

Permalink
Fix state query startup delay for ClearBlade (#722)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu authored Aug 27, 2023
1 parent 5535dcf commit a2243f9
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 37 deletions.
16 changes: 15 additions & 1 deletion bin/reset_config
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,25 @@ echo Setting config timestamp ${now_date}
jq < ${src_config} .timestamp=\"${now_date}\" |\
jq .system.testing.sequence_name=\"${config_file%.json}\" > ${dst_config}

if [[ -n $project_id ]]; then
quoted_id=\"${project_id##*/}\"
else
quoted_id=null
fi

if [[ $project_id =~ ^// ]]; then
sans_project=${project_id%/*}
iot_provider=\"${sans_project#//}\"
else
iot_provider=null
fi

echo Resetting device ${device_id} config...
echo Writing config to $REFLECTOR_CONFIG
cat <<EOF > $REFLECTOR_CONFIG
{
"project_id": "$project_id",
"iot_provider": $iot_provider,
"project_id": $quoted_id,
"site_model": "$site_dir",
"device_id": "$device_id",
"registry_suffix": $registry_suffix
Expand Down
2 changes: 1 addition & 1 deletion tests/traces/simple/devices/AHU-22/003_event_system.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"logentries" : [ {
"message" : "2022-07-19T05:04:39Z sent message #70",
"category" : "device.log_message",
"timestamp" : "2022-07-19T05:04:39Z",
"timestamp" : "2022-07-19T05:04:32Z",
"level" : 200
} ],
"timestamp" : "2022-07-19T05:04:39Z",
Expand Down
4 changes: 2 additions & 2 deletions tests/traces/simple/devices/AHU-22/004_event_pointset.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
"present_value" : 21
}
},
"timestamp" : "2022-07-19T05:04:39Z",
"timestamp" : "2022-07-19T05:04:49Z",
"version" : "1.3.14"
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions udmis/bin/pod_logs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash -e

ROOT=$(dirname $0)/..
cd $ROOT
mkdir -p out
cd out

podprefix=udmis-pods-

pods=$(kubectl get pods | fgrep $podprefix | awk '{print $1}')
echo Getting logs for pods $pods

rm -f ${podprefix}*.log

for pod in $pods; do
kubectl logs $pod > $pod.log &
done

wait

cat ${podprefix}*.log | fgrep "Z " | sort -k 1 > pods.log

echo Combined logs available in $(realpath pods.log)
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,11 @@ protected String updateConfig(String registryId, String deviceId, String config,
.setName(DeviceName.of(projectId, location, registryId, deviceId).toString())
.setBinaryData(binaryData).setVersionToUpdate(updateVersion).build();
DeviceConfig response = deviceManagerClient.modifyCloudToDeviceConfig(request);
System.err.println("Config modified version " + response.getVersion());
debug("Modified %s/%s config version %s", registryId, deviceId, response.getVersion());
return config;
} catch (Exception e) {
throw new RuntimeException("While modifying device config", e);
throw new RuntimeException(
format("While modifying device config %s/%s", registryId, deviceId), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,7 @@ public final void sendCommand(String registryId, String deviceId, SubFolder fold
backoffKey, getTimestamp(until)));
}
} else {
debug("Dropping message because registry backoff for %s",
backoffKey);
debug("Dropping message because registry backoff for %s", backoffKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static com.google.udmi.util.JsonUtil.convertTo;
import static com.google.udmi.util.JsonUtil.convertToStrict;
import static com.google.udmi.util.JsonUtil.fromStringStrict;
import static com.google.udmi.util.JsonUtil.getTimestamp;
import static com.google.udmi.util.JsonUtil.loadFileStrictRequired;
import static com.google.udmi.util.JsonUtil.stringify;
import static com.google.udmi.util.JsonUtil.stringifyTerse;
Expand Down Expand Up @@ -138,8 +139,8 @@ private void processException(Envelope reflection, Exception e) {

private void processReflection(Envelope reflection, Envelope envelope,
Map<String, Object> payload) {
debug("Processing reflection %s/%s %s", envelope.subType, envelope.subFolder,
envelope.transactionId);
debug("Processing reflection %s/%s %s %s", envelope.subType, envelope.subFolder,
getTimestamp(envelope.publishTime), envelope.transactionId);
updateProviderAffinity(envelope, reflection.source);
CloudModel result = getReflectionResult(envelope, payload);
ifNotNullThen(result,
Expand Down Expand Up @@ -231,7 +232,7 @@ private void reflectStateHandler(Envelope envelope, UdmiState toolState) {

Map<String, Object> configMap = new HashMap<>();
configMap.put(SubFolder.UDMI.value(), udmiConfig);
String contents = stringify(configMap);
String contents = stringifyTerse(configMap);
debug("Setting reflector config %s %s: %s", registryId, deviceId, contents);
iotAccess.modifyConfig(registryId, deviceId, previous -> contents);
}
Expand Down Expand Up @@ -264,7 +265,7 @@ private void updateRegistryRegions(Map<String, String> regions) {

@Override
public void activate() {
debug("Deployment configuration: " + stringify(DEPLOYED_CONFIG));
debug("Deployment configuration: " + stringifyTerse(DEPLOYED_CONFIG));
super.activate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer reply) {
receiveMessage(attributesMap, messageString);
Instant end = Instant.now();
long seconds = Duration.between(start, end).getSeconds();
debug("Receive message took %ss", seconds);
if (seconds > 1) {
warn("Receive message took %ss", seconds);
}
}

Publisher getPublisher(String topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private String getSimpleName() {

private void output(Level level, String message) {
PrintStream printStream = level.value() >= Level.WARNING.value() ? System.err : System.out;
printStream.printf("%s %s %s: %s %s%n", getExecutionContext(), JsonUtil.getTimestamp(),
printStream.printf("%s %s %s: %s %s%n", JsonUtil.getTimestamp(), getExecutionContext(),
level.name().charAt(0), getSimpleName(), message);
printStream.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ public String publish(String deviceId, String topic, String data) {
envelope.payload = GeneralUtils.encodeBase64(data);
String transactionId = getNextTransactionId();
envelope.transactionId = transactionId;
envelope.publishTime = new Date();
mqttPublisher.publish(registryId, UDMI_TOPIC, JsonUtil.stringify(envelope));
return transactionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.daq.mqtt.sequencer.semantic.SemanticValue.actualize;
import static com.google.daq.mqtt.util.IotReflectorClient.REFLECTOR_PREFIX;
import static com.google.daq.mqtt.validator.Validator.CONFIG_PREFIX;
import static com.google.daq.mqtt.validator.Validator.STATE_PREFIX;
import static com.google.udmi.util.CleanDateFormat.cleanDate;
Expand Down Expand Up @@ -44,6 +45,7 @@
import com.google.bos.iot.core.proxy.IotReflectorClient;
import com.google.bos.iot.core.proxy.MockPublisher;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -83,7 +85,9 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -190,6 +194,7 @@ public class SequenceBase {
private static final ObjectDiffEngine RECV_CONFIG_DIFFERNATOR = new ObjectDiffEngine();
private static final ObjectDiffEngine RECV_STATE_DIFFERNATOR = new ObjectDiffEngine();
private static final Set<String> configTransactions = new ConcurrentSkipListSet<>();
private static final AtomicReference<String> stateTransaction = new AtomicReference<>();
private static final int MINIMUM_TEST_SEC = 15;
private static final Date RESET_LAST_START = new Date(73642);
protected static Metadata deviceMetadata;
Expand Down Expand Up @@ -937,9 +942,19 @@ private String writeLogEntry(Entry logEntry, PrintWriter printWriter) {
return messageStr;
}

private boolean stateTransactionPending() {
return stateTransaction.get() != null;
}

protected void queryState() {
debug("Sending device state query");
reflector().publish(getDeviceId(), Common.STATE_QUERY_TOPIC, EMPTY_MESSAGE);
assertConfigIsNotPending();
Preconditions.checkState(!stateTransactionPending(), "state transaction already pending");
String txnId = reflector().publish(getDeviceId(), Common.STATE_QUERY_TOPIC, EMPTY_MESSAGE);
stateTransaction.set(txnId);
debug(format("Waiting for device stateTransaction %s", txnId));
whileDoing("state query", () -> messageEvaluateLoop(this::stateTransactionPending),
e -> debug(
format("While waiting for stateTransaction %s: %s", txnId, friendlyStackTrace(e))));
}

/**
Expand Down Expand Up @@ -1184,11 +1199,20 @@ private void processLogMessages() {
}

protected void whileDoing(String condition, Runnable action) {
whileDoing(condition, action, e -> debug("Caught " + friendlyStackTrace(e)));
}

protected void whileDoing(String condition, Runnable action, Consumer<Exception> catcher) {
final Instant startTime = Instant.now();

waitingConditionPush(condition);

action.run();
try {
action.run();
} catch (Exception e) {
catcher.accept(e);
throw e;
}

waitingConditionPop(startTime);
}
Expand Down Expand Up @@ -1386,11 +1410,21 @@ private synchronized void handleUpdateMessage(String subTypeRaw,
try {
debug(format("Handling update message %s_update %s", subTypeRaw, txnId));

// Do this first to handle all cases of a Config payload, including exceptions.
if (CONFIG_SUBTYPE.equals(subTypeRaw) && txnId != null) {
debug("Removing configTransaction " + txnId);
configTransactions.remove(txnId);
// Do this first to handle all cases of update payloads, including exceptions.
if (txnId != null) {
if (CONFIG_SUBTYPE.equals(subTypeRaw)) {
ifTrueThen(configTransactions.remove(txnId),
() -> debug("Removed configTransaction " + txnId));
} else if (STATE_SUBTYPE.equals(subTypeRaw) && txnId.startsWith(REFLECTOR_PREFIX)) {
String expected = stateTransaction.getAndSet(null);
if (txnId.equals(expected)) {
debug("Removed stateTransaction " + txnId);
} else {
debug(format("Received unexpected stateTransaction %s, dropping %s", txnId, expected));
}
}
}

if (message.containsKey(EXCEPTION_KEY)) {
debug("Ignoring reflector exception:\n" + message.get(EXCEPTION_KEY).toString());
configExceptionTimestamp = (String) message.get(TIMESTAMP_KEY);
Expand Down Expand Up @@ -1686,28 +1720,22 @@ private boolean hasInterestingSystemStatus() {
}

protected void checkThatHasInterestingSystemStatus(boolean isInteresting) {
checkThatHasInterestingSystemStatusTodo(isInteresting);
}

protected void checkThatHasInterestingSystemStatusTodo(boolean isInteresting) {
BiConsumer<String, Supplier<Boolean>> check =
isInteresting ? this::checkThat : this::checkNotThat;
check.accept(SYSTEM_STATUS_MESSAGE, this::hasInterestingSystemStatus);
}

protected void untilHasInterestingSystemStatus(boolean isInteresting) {
expectedSystemStatus = null;
untilHasInterestingSystemStatusTodo(isInteresting);
BiConsumer<String, Supplier<Boolean>> until =
isInteresting ? this::untilTrue : this::untilFalse;
String message =
(isInteresting ? HAS_STATUS_PREFIX : NOT_STATUS_PREFIX) + SYSTEM_STATUS_MESSAGE;
until.accept(message, this::hasInterestingSystemStatus);
expectedSystemStatus = isInteresting;
checkThatHasInterestingSystemStatus(isInteresting);
}

protected void untilHasInterestingSystemStatusTodo(boolean isSet) {
BiConsumer<String, Supplier<Boolean>> until = isSet ? this::untilTrue : this::untilFalse;
String message = (isSet ? HAS_STATUS_PREFIX : NOT_STATUS_PREFIX) + SYSTEM_STATUS_MESSAGE;
until.accept(message, this::hasInterestingSystemStatus);
}

private void putSequencerResult(Description description, SequenceResult result) {
String resultId = getDeviceId() + "/" + description.getMethodName();
SequenceRunner.getAllTests().put(resultId, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class IotReflectorClient implements IotProvider {
// Requires functions that support cloud device manager support.
private static final int REQUIRED_FUNCTION_VER = 9;
private static final String UPDATE_CONFIG_TOPIC = "update/config";
private static final String REFLECTOR_PREFIX = "RC:";
public static final String REFLECTOR_PREFIX = "RC:";
private final com.google.bos.iot.core.proxy.IotReflectorClient messageClient;
private final Map<String, CompletableFuture<Map<String, Object>>> futures =
new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.udmi.util.JsonUtil;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -318,6 +319,7 @@ private Envelope makeReflectorMessage(String deviceId, String topic, String data
envelope.subFolder = SubFolder.fromValue(parts[0]);
envelope.subType = SubType.fromValue(parts[1]);
envelope.transactionId = IotReflectorClient.getNextTransactionId();
envelope.publishTime = new Date();
return envelope;
}

Expand Down

0 comments on commit a2243f9

Please sign in to comment.