diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 0dbb63d026..b3512d7ccf 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -243,13 +243,17 @@ jobs: bin/clone_model bin/registrar sites/udmi_site_model - name: local setup - run: bin/start_local sites/udmi_site_model //mqtt/localhost + run: | + bin/start_local sites/udmi_site_model //mqtt/localhost + bin/pull_messages //mqtt/localhost > out/message_capture.log 2>&1 & - name: bin/test_etcd run: bin/test_etcd - name: bin/test_mosquitto run: bin/test_mosquitto - name: bin/test_regclean run: bin/test_regclean //mqtt/localhost + - name: bin/test_runlocal + run: bin/test_runlocal - name: bin/test_sequencer run: bin/test_sequencer local full //mqtt/localhost $(< etc/local_tests.txt) - name: bin/test_udmis @@ -352,3 +356,51 @@ jobs: - name: Itemized validation if: ${{ !cancelled() }} run: bin/test_itemcheck + + discovery: + name: Discovery Tests + runs-on: ubuntu-24.04 + timeout-minutes: 15 + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + fetch-tags: true + - uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '17' + - name: Setup prerequisites + run: | + bin/setup_base + bin/clone_model + ln -s sites/udmi_site_model/ site_model + (cd site_model; git log -n 1) + docker network create udminet --subnet 192.168.99.0/24 + - name: Build UDMIS container + run: | + udmis/bin/build check + bin/container udmis build --no-check latest + echo Built local UDMIS + - name: Start UDMIS container + run: | + docker run -d --net udminet --name udmis -p 8883:8883 \ + -v $PWD/site_model:/root/site_model \ + -v $PWD/var/tmp:/tmp \ + -v $PWD/var/etcd:/root/udmi/var/etcd \ + -v $PWD/var/mosquitto:/etc/mosquitto \ + udmis udmi/bin/start_local block site_model/cloud_iot_config.json + for count in `seq 0 30`; do + echo Waiting for UDMIS startup $((30 - count)) + [[ ! -f var/tmp/pod_ready.txt ]] || break + (docker ps | fgrep -q udmis) || break + sleep 1 + done + ls -l var/tmp/pod_ready.txt 2>&1 + - name: Run Tests + # This runs as sudo because docker containers in Github CI run as root. + # So, when the UDMIS container runs `keygen`, the CSR and SRL files are + # owned by root, and cannot be ovewritten by system calls to keygen, + # unless it too is root, or the behaviour is changed, or the permisions + # are fixed. + run: sudo misc/discoverynode/testing/e2e/test_local site_model diff --git a/bin/pull_mqtt b/bin/pull_mqtt index 26452f2549..400b5bda9e 100755 --- a/bin/pull_mqtt +++ b/bin/pull_mqtt @@ -31,24 +31,25 @@ sudo mosquitto_sub $SERVER_OPTS -R -F "%j" -t $topic_filter | json=$(jq . <<< "$payload" 2> /dev/null) || json=$payload readarray -d '/' -t array <<< "${topic}/" - registryId=${array[2]} - deviceId=${array[4]} - subType=${array[5]} - subFolder=${array[6]} - # Trim whitespace - subFolder=$(echo $subFolder | xargs) + registryId=${array[2]:-} # Use :- to substute default empty to prevent script termination-on-undefined + deviceId=${array[4]:-} + subType=${array[5]:-} + subFolder=${array[6]:-} + + + subFolder=$(echo $subFolder | xargs) # Trim whitespace + [[ -n $subFolder ]] || subFolder=update + + subType=$(echo $subType | xargs) + [[ -n $subType ]] || subType=events + [[ $subType != "null" ]] || subType=events - [[ -n ${subFolder% } ]] || subFolder=update if [[ $deviceId == "" ]]; then deviceId=empty fi - if [[ $subType == null ]]; then - subType=events - fi - timepath=$(echo ${timestamp%:*} | tr T: //) # Bucket messages by minute usetime=$(echo $timestamp | tr : x) # Colon is not allowed on Windows! out_path=$OUT_BASE/$registryId/devices/$deviceId/${timepath}/${usetime}_${subFolder}_${subType} diff --git a/bin/test_redirect b/bin/test_redirect index 90fc55de5e..79c772b22d 100755 --- a/bin/test_redirect +++ b/bin/test_redirect @@ -137,7 +137,7 @@ log bin/reset_config $site_path $project_spec $device_id shutdown_config.json bin/reset_config $site_path $project_spec $device_id shutdown_config.json log And let it settle for last start termination... -sleep 120 +sleep 125 tail out/pubber.log.2 diff --git a/bin/test_runlocal b/bin/test_runlocal new file mode 100755 index 0000000000..271149c1f0 --- /dev/null +++ b/bin/test_runlocal @@ -0,0 +1,24 @@ +#!/bin/bash -e +# +# This doesn't actually run the local stuff, it's just used post-execution to validate results. +# + +UDMI_ROOT=$(dirname $0)/.. +cd $UDMI_ROOT + +source etc/shell_common.sh + +cat out/message_capture.log + +echo Captured $(wc -l out/message_capture.log) messages. + +systems=$(find out/registries/ -name \*update_model.json | xargs jq .system | fgrep -v null | wc -l) +[[ $systems == 206 ]] || fail Expected 206 system operations, found $systems + +deletes=$(find out/registries/ -name \*update_model.json | xargs jq .cloud.operation | fgrep DELETE | wc -l) +[[ $deletes == 1 ]] || fail Expected 1 delete operations, found $deletes + +binds=$(find out/registries/ -name \*update_model.json | xargs jq .cloud.operation | fgrep BIND | wc -l) +[[ $binds == 2 ]] || fail Expected 2 bind operations, found $binds + +echo Done with successful runlocal validation. diff --git a/pubber/src/main/java/daq/pubber/Pubber.java b/pubber/src/main/java/daq/pubber/Pubber.java index f0d9f1fd78..6b06dee3fc 100644 --- a/pubber/src/main/java/daq/pubber/Pubber.java +++ b/pubber/src/main/java/daq/pubber/Pubber.java @@ -43,11 +43,9 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Function; -import org.apache.http.ConnectionClosedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import udmi.lib.base.MqttDevice; -import udmi.lib.base.MqttPublisher.PublisherException; import udmi.lib.client.DeviceManager; import udmi.lib.client.SystemManager; import udmi.lib.intf.FamilyProvider; @@ -103,7 +101,6 @@ public class Pubber extends PubberManager implements PubberUdmiPublisher { private SchemaVersion targetSchema; private int deviceUpdateCount = -1; private PubberDeviceManager deviceManager; - private boolean isConnected; private boolean isGatewayDevice; /** @@ -393,7 +390,7 @@ private void processDeviceMetadata(Metadata metadata) { } @Override - public void periodicUpdate() { + public synchronized void periodicUpdate() { try { deviceUpdateCount++; checkSmokyFailure(); @@ -403,6 +400,7 @@ public void periodicUpdate() { flushDirtyState(); } catch (Exception e) { error("Fatal error during execution", e); + resetConnection(getWorkingEndpoint()); } } @@ -427,18 +425,13 @@ public void startConnection(Function connectionDone) { private boolean attemptConnection() { try { - isConnected = false; deviceManager.stop(); super.stop(); - if (deviceTarget == null || !deviceTarget.isActive()) { - error("Mqtt publisher not active"); - disconnectMqtt(); - initializeMqtt(); - } + disconnectMqtt(); + initializeMqtt(); registerMessageHandlers(); connect(); configLatchWait(); - isConnected = true; deviceManager.activate(); return true; } catch (Exception e) { @@ -515,22 +508,14 @@ public byte[] ensureKeyBytes() { } @Override - public void publisherException(Exception toReport) { - if (toReport instanceof PublisherException report) { - publisherHandler(report.getType(), report.getPhase(), report.getCause(), - report.getDeviceId()); - } else if (toReport instanceof ConnectionClosedException) { - error("Connection closed, attempting reconnect..."); - while (retriesRemaining.getAndDecrement() > 0) { - if (attemptConnection()) { - return; - } + public synchronized void reconnect() { + while (retriesRemaining.getAndDecrement() > 0) { + if (attemptConnection()) { + return; } - error("Connection retry failed, giving up."); - deviceManager.systemLifecycle(SystemMode.TERMINATE); - } else { - error("Unknown exception type " + toReport.getClass(), toReport); } + error("Connection retry failed, giving up."); + deviceManager.systemLifecycle(SystemMode.TERMINATE); } @Override @@ -541,12 +526,10 @@ public void persistEndpoint(EndpointConfiguration endpoint) { } @Override - public void resetConnection(String targetEndpoint) { + public synchronized void resetConnection(String targetEndpoint) { try { config.endpoint = fromJsonString(targetEndpoint, EndpointConfiguration.class); - disconnectMqtt(); - initializeMqtt(); retriesRemaining.set(CONNECT_RETRIES); startConnection(connectionDone); } catch (Exception e) { @@ -700,7 +683,7 @@ public void setConfigLatch(CountDownLatch countDownLatch) { @Override public boolean isConnected() { - return isConnected; + return deviceTarget != null && deviceTarget.isActive(); } @Override diff --git a/pubber/src/main/java/daq/pubber/PubberGatewayManager.java b/pubber/src/main/java/daq/pubber/PubberGatewayManager.java index e0225ec02b..fe9a14df06 100644 --- a/pubber/src/main/java/daq/pubber/PubberGatewayManager.java +++ b/pubber/src/main/java/daq/pubber/PubberGatewayManager.java @@ -16,6 +16,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import udmi.lib.ProtocolFamily; import udmi.lib.client.GatewayManager; import udmi.lib.client.ProxyDeviceHost; @@ -47,8 +48,9 @@ public void setMetadata(Metadata metadata) { @Override public void activate() { - ifNotNullThen(proxyDevices, p -> p.values() - .parallelStream().forEach(ProxyDeviceHost::activate)); + ifNotNullThen(proxyDevices, p -> CompletableFuture.runAsync(() -> p.values() + .parallelStream() + .forEach(ProxyDeviceHost::activate))); } @Override diff --git a/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java b/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java index a2573c9fd5..ca42f9a0d6 100644 --- a/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java +++ b/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java @@ -52,11 +52,13 @@ import java.util.concurrent.locks.Lock; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.http.ConnectionClosedException; import udmi.lib.base.GatewayError; import udmi.lib.base.MqttDevice; import udmi.lib.base.MqttPublisher.FakeTopic; import udmi.lib.base.MqttPublisher.InjectedMessage; import udmi.lib.base.MqttPublisher.InjectedState; +import udmi.lib.base.MqttPublisher.PublisherException; import udmi.lib.client.DeviceManager; import udmi.lib.client.PointsetManager; import udmi.lib.client.PointsetManager.ExtraPointsetEvent; @@ -242,9 +244,9 @@ default void captureExceptions(String action, Runnable runnable) { */ default void disconnectMqtt() { if (getDeviceTarget() != null) { - captureExceptions("closing mqtt publisher", () -> getDeviceTarget().close()); - captureExceptions("shutting down mqtt publisher executor", + captureExceptions("Shutting down MQTT publisher executor", () -> getDeviceTarget().shutdown()); + captureExceptions("Closing MQTT publisher", () -> getDeviceTarget().close()); setDeviceTarget(null); } } @@ -805,16 +807,12 @@ default void publishSynchronousState() { } } - default boolean publisherActive() { - return getDeviceTarget() != null && getDeviceTarget().isActive(); - } - /** * Publishes the current device state as a message to the publisher if the publisher is active. If * the publisher is not active, it marks the state as dirty and returns without publishing. */ default void publishStateMessage() { - if (!publisherActive()) { + if (!isConnected()) { markStateDirty(-1); return; } @@ -898,8 +896,8 @@ private void publishDeviceMessage(String targetId, Object message) { * configured. */ default void publishDeviceMessage(String targetId, Object message, Runnable callback) { - if (getDeviceTarget() == null) { - error("publisher not active"); + if (!isConnected()) { + error(format("Publisher not active (%s)", targetId)); return; } String topicSuffix = MESSAGE_TOPIC_SUFFIX_MAP.get(message.getClass()); @@ -989,6 +987,10 @@ default void debug(String message, String detail) { void startConnection(Function connectionDone); + void reconnect(); + + void resetConnection(String targetEndpoint); + /** * Flushes the dirty state by publishing an asynchronous state change. */ @@ -1000,12 +1002,25 @@ default void flushDirtyState() { byte[] ensureKeyBytes(); - void publisherException(Exception toReport); + /** + * Handles exceptions related to the publisher and + * takes appropriate actions based on the exception type. + * + * @param toReport the exception to be handled; + */ + default void publisherException(Exception toReport) { + if (toReport instanceof PublisherException r) { + publisherHandler(r.getType(), r.getPhase(), r.getCause(), r.getDeviceId()); + } else if (toReport instanceof ConnectionClosedException) { + warn("Connection closed, attempting reconnect..."); + reconnect(); + } else { + error("Unknown exception type " + toReport.getClass(), toReport); + } + } void persistEndpoint(EndpointConfiguration endpoint); - void resetConnection(String targetEndpoint); - String traceTimestamp(String messageBase); /** diff --git a/pubber/src/main/java/udmi/lib/base/MqttPublisher.java b/pubber/src/main/java/udmi/lib/base/MqttPublisher.java index 2879cc9139..5d808fa677 100644 --- a/pubber/src/main/java/udmi/lib/base/MqttPublisher.java +++ b/pubber/src/main/java/udmi/lib/base/MqttPublisher.java @@ -41,6 +41,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import javax.net.SocketFactory; import javax.net.ssl.SSLSocketFactory; @@ -100,6 +101,7 @@ public class MqttPublisher implements Publisher { private final Map mqttClients = new ConcurrentHashMap<>(); private final Map reauthTimes = new ConcurrentHashMap<>(); + ReentrantLock reconnectLock = new ReentrantLock(); private final ExecutorService publisherExecutor = Executors.newFixedThreadPool(PUBLISH_THREAD_COUNT); @@ -215,22 +217,32 @@ private void publishCore(String deviceId, String topicSuffix, Object data, Runna callback.run(); } } catch (Exception e) { + if (!isActive()) { + return; + } errorCounter.incrementAndGet(); warn(format("Publish %s failed for %s: %s", topicSuffix, deviceId, e)); if (getGatewayId() == null) { closeMqttClient(deviceId); if (mqttClients.isEmpty()) { warn("Last client closed, shutting down connection."); - close(); - shutdown(); + reconnect(); + } + } else if (getGatewayId().equals(deviceId)) { + reconnect(); + } + } + } + + private synchronized void reconnect() { + if (isActive()) { + if (reconnectLock.tryLock()) { + try { // Force reconnect to address potential bad states onError.accept(new ConnectionClosedException()); + } finally { + reconnectLock.unlock(); } - } else if (getGatewayId().equals(deviceId)) { - close(); - shutdown(); - // Force reconnect to address potential bad states - onError.accept(new ConnectionClosedException()); } } } @@ -268,7 +280,7 @@ private void closeMqttClient(String deviceId) { if (removed != null) { try { if (removed.isConnected()) { - removed.disconnect(); + removed.disconnectForcibly(); } removed.close(); } catch (Exception e) { @@ -298,7 +310,7 @@ public void close() { @Override public void shutdown() { if (isActive()) { - publisherExecutor.shutdown(); + publisherExecutor.shutdownNow(); } } @@ -532,7 +544,7 @@ private String getDeviceId(String topic) { return topic.split("/")[splitIndex]; } - public void connect(String targetId, boolean clean) { + public synchronized void connect(String targetId, boolean clean) { ifTrueThen(clean, () -> closeMqttClient(targetId)); getConnectedClient(targetId); } @@ -569,8 +581,10 @@ private boolean sendMessage(String deviceId, String mqttTopic, return true; } - private MqttClient getActiveClient(String targetId) { - checkAuthentication(targetId); + private synchronized MqttClient getActiveClient(String targetId) { + if (!checkAuthentication(targetId)) { + return null; + } MqttClient client = getConnectedClient(targetId); if (client.isConnected()) { return client; @@ -586,24 +600,16 @@ private void safeSleep(long timeoutMs) { } } - private void checkAuthentication(String targetId) { + private boolean checkAuthentication(String targetId) { String authId = ofNullable(getGatewayId()).orElse(targetId); Instant reAuthTime = reauthTimes.get(authId); if (reAuthTime == null || Instant.now().isBefore(reAuthTime)) { - return; + return true; } warn("Authentication retry time reached for " + authId); reauthTimes.remove(authId); - synchronized (mqttClients) { - try { - close(); - shutdown(); - // Force reconnect to address potential bad states - onError.accept(new ConnectionClosedException()); - } catch (Exception e) { - throw new RuntimeException("While trying to reconnect mqtt client", e); - } - } + reconnect(); + return false; } private MqttClient getConnectedClient(String deviceId) { @@ -721,12 +727,11 @@ private class MqttCallbackHandler implements MqttCallback { @Override public void connectionLost(Throwable cause) { - boolean connected = cleanClients(deviceId).isConnected(); - warn("MQTT Connection Lost: " + connected + cause); - close(); - shutdown(); - // Force reconnect to address potential bad states - onError.accept(new ConnectionClosedException()); + if (isActive()) { + boolean connected = cleanClients(deviceId).isConnected(); + warn(format("MQTT Connection Lost: %s %s", connected, cause)); + reconnect(); + } } @Override @@ -734,13 +739,11 @@ public void deliveryComplete(IMqttDeliveryToken token) { } @Override - public void messageArrived(String topic, MqttMessage message) { - synchronized (MqttPublisher.this) { - try { - messageArrivedCore(topic, message); - } catch (Exception e) { - error("While processing message", deviceId, null, "handle", e); - } + public synchronized void messageArrived(String topic, MqttMessage message) { + try { + messageArrivedCore(topic, message); + } catch (Exception e) { + error("While processing message", deviceId, null, "handle", e); } } diff --git a/pubber/src/main/java/udmi/lib/client/DiscoveryManager.java b/pubber/src/main/java/udmi/lib/client/DiscoveryManager.java index 35cb427588..36b955c2cf 100644 --- a/pubber/src/main/java/udmi/lib/client/DiscoveryManager.java +++ b/pubber/src/main/java/udmi/lib/client/DiscoveryManager.java @@ -78,6 +78,7 @@ default void scheduleDiscoveryScan(String family) { int interval = getScanInterval(family); if (rawGeneration == null && interval == 0) { cancelDiscoveryScan(family, null); + removeDiscoveryScan(family); return; } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/ReflectProcessor.java b/udmis/src/main/java/com/google/bos/udmi/service/core/ReflectProcessor.java index 01b0e2a58d..d46f2761fb 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/ReflectProcessor.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/ReflectProcessor.java @@ -152,7 +152,8 @@ private void reflectUdmiLog(Envelope attributes, String message) { private Object extractModel(CloudModel request) { String metadata = catchToNull(() -> request.metadata.get(MetadataMapKeys.UDMI_METADATA)); if (metadata == null) { - return null; + // Cover the cases for DELETE and BIND operations where there is no actual model. + return asModelUpdate(request); } else if (request.resource_type == REGISTRY) { return asSiteMetadataUpdate(metadata); } else { @@ -302,6 +303,12 @@ private Envelope makeTargetEnvelope(Envelope attributes) { return target; } + private ModelUpdate asModelUpdate(CloudModel request) { + ModelUpdate modelUpdate = new ModelUpdate(); + modelUpdate.cloud = request; + return modelUpdate; + } + private ModelUpdate asModelUpdate(String modelString) { // If it's not a valid JSON object, then fall back to a string description alternate. if (modelString == null || !modelString.startsWith(JsonUtil.JSON_OBJECT_LEADER)) {