Skip to content

Commit

Permalink
Merge branch 'master' into pointmap
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Jan 9, 2025
2 parents 6838ac5 + 0e7d933 commit 208bc63
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 94 deletions.
54 changes: 53 additions & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,17 @@ jobs:
bin/clone_model
bin/registrar sites/udmi_site_model
- name: local setup
run: bin/start_local sites/udmi_site_model //mqtt/localhost
run: |
bin/start_local sites/udmi_site_model //mqtt/localhost
bin/pull_messages //mqtt/localhost > out/message_capture.log 2>&1 &
- name: bin/test_etcd
run: bin/test_etcd
- name: bin/test_mosquitto
run: bin/test_mosquitto
- name: bin/test_regclean
run: bin/test_regclean //mqtt/localhost
- name: bin/test_runlocal
run: bin/test_runlocal
- name: bin/test_sequencer
run: bin/test_sequencer local full //mqtt/localhost $(< etc/local_tests.txt)
- name: bin/test_udmis
Expand Down Expand Up @@ -352,3 +356,51 @@ jobs:
- name: Itemized validation
if: ${{ !cancelled() }}
run: bin/test_itemcheck

discovery:
name: Discovery Tests
runs-on: ubuntu-24.04
timeout-minutes: 15
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
fetch-tags: true
- uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '17'
- name: Setup prerequisites
run: |
bin/setup_base
bin/clone_model
ln -s sites/udmi_site_model/ site_model
(cd site_model; git log -n 1)
docker network create udminet --subnet 192.168.99.0/24
- name: Build UDMIS container
run: |
udmis/bin/build check
bin/container udmis build --no-check latest
echo Built local UDMIS
- name: Start UDMIS container
run: |
docker run -d --net udminet --name udmis -p 8883:8883 \
-v $PWD/site_model:/root/site_model \
-v $PWD/var/tmp:/tmp \
-v $PWD/var/etcd:/root/udmi/var/etcd \
-v $PWD/var/mosquitto:/etc/mosquitto \
udmis udmi/bin/start_local block site_model/cloud_iot_config.json
for count in `seq 0 30`; do
echo Waiting for UDMIS startup $((30 - count))
[[ ! -f var/tmp/pod_ready.txt ]] || break
(docker ps | fgrep -q udmis) || break
sleep 1
done
ls -l var/tmp/pod_ready.txt 2>&1
- name: Run Tests
# This runs as sudo because docker containers in Github CI run as root.
# So, when the UDMIS container runs `keygen`, the CSR and SRL files are
# owned by root, and cannot be ovewritten by system calls to keygen,
# unless it too is root, or the behaviour is changed, or the permisions
# are fixed.
run: sudo misc/discoverynode/testing/e2e/test_local site_model
23 changes: 12 additions & 11 deletions bin/pull_mqtt
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,25 @@ sudo mosquitto_sub $SERVER_OPTS -R -F "%j" -t $topic_filter |
json=$(jq . <<< "$payload" 2> /dev/null) || json=$payload

readarray -d '/' -t array <<< "${topic}/"
registryId=${array[2]}
deviceId=${array[4]}
subType=${array[5]}
subFolder=${array[6]}

# Trim whitespace
subFolder=$(echo $subFolder | xargs)
registryId=${array[2]:-} # Use :- to substute default empty to prevent script termination-on-undefined
deviceId=${array[4]:-}
subType=${array[5]:-}
subFolder=${array[6]:-}


subFolder=$(echo $subFolder | xargs) # Trim whitespace
[[ -n $subFolder ]] || subFolder=update

subType=$(echo $subType | xargs)
[[ -n $subType ]] || subType=events
[[ $subType != "null" ]] || subType=events

[[ -n ${subFolder% } ]] || subFolder=update

if [[ $deviceId == "" ]]; then
deviceId=empty
fi

if [[ $subType == null ]]; then
subType=events
fi

timepath=$(echo ${timestamp%:*} | tr T: //) # Bucket messages by minute
usetime=$(echo $timestamp | tr : x) # Colon is not allowed on Windows!
out_path=$OUT_BASE/$registryId/devices/$deviceId/${timepath}/${usetime}_${subFolder}_${subType}
Expand Down
2 changes: 1 addition & 1 deletion bin/test_redirect
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ log bin/reset_config $site_path $project_spec $device_id shutdown_config.json
bin/reset_config $site_path $project_spec $device_id shutdown_config.json

log And let it settle for last start termination...
sleep 120
sleep 125

tail out/pubber.log.2

Expand Down
24 changes: 24 additions & 0 deletions bin/test_runlocal
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash -e
#
# This doesn't actually run the local stuff, it's just used post-execution to validate results.
#

UDMI_ROOT=$(dirname $0)/..
cd $UDMI_ROOT

source etc/shell_common.sh

cat out/message_capture.log

echo Captured $(wc -l out/message_capture.log) messages.

systems=$(find out/registries/ -name \*update_model.json | xargs jq .system | fgrep -v null | wc -l)
[[ $systems == 206 ]] || fail Expected 206 system operations, found $systems

deletes=$(find out/registries/ -name \*update_model.json | xargs jq .cloud.operation | fgrep DELETE | wc -l)
[[ $deletes == 1 ]] || fail Expected 1 delete operations, found $deletes

binds=$(find out/registries/ -name \*update_model.json | xargs jq .cloud.operation | fgrep BIND | wc -l)
[[ $binds == 2 ]] || fail Expected 2 bind operations, found $binds

echo Done with successful runlocal validation.
41 changes: 12 additions & 29 deletions pubber/src/main/java/daq/pubber/Pubber.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.http.ConnectionClosedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import udmi.lib.base.MqttDevice;
import udmi.lib.base.MqttPublisher.PublisherException;
import udmi.lib.client.DeviceManager;
import udmi.lib.client.SystemManager;
import udmi.lib.intf.FamilyProvider;
Expand Down Expand Up @@ -103,7 +101,6 @@ public class Pubber extends PubberManager implements PubberUdmiPublisher {
private SchemaVersion targetSchema;
private int deviceUpdateCount = -1;
private PubberDeviceManager deviceManager;
private boolean isConnected;
private boolean isGatewayDevice;

/**
Expand Down Expand Up @@ -393,7 +390,7 @@ private void processDeviceMetadata(Metadata metadata) {
}

@Override
public void periodicUpdate() {
public synchronized void periodicUpdate() {
try {
deviceUpdateCount++;
checkSmokyFailure();
Expand All @@ -403,6 +400,7 @@ public void periodicUpdate() {
flushDirtyState();
} catch (Exception e) {
error("Fatal error during execution", e);
resetConnection(getWorkingEndpoint());
}
}

Expand All @@ -427,18 +425,13 @@ public void startConnection(Function<String, Boolean> connectionDone) {

private boolean attemptConnection() {
try {
isConnected = false;
deviceManager.stop();
super.stop();
if (deviceTarget == null || !deviceTarget.isActive()) {
error("Mqtt publisher not active");
disconnectMqtt();
initializeMqtt();
}
disconnectMqtt();
initializeMqtt();
registerMessageHandlers();
connect();
configLatchWait();
isConnected = true;
deviceManager.activate();
return true;
} catch (Exception e) {
Expand Down Expand Up @@ -515,22 +508,14 @@ public byte[] ensureKeyBytes() {
}

@Override
public void publisherException(Exception toReport) {
if (toReport instanceof PublisherException report) {
publisherHandler(report.getType(), report.getPhase(), report.getCause(),
report.getDeviceId());
} else if (toReport instanceof ConnectionClosedException) {
error("Connection closed, attempting reconnect...");
while (retriesRemaining.getAndDecrement() > 0) {
if (attemptConnection()) {
return;
}
public synchronized void reconnect() {
while (retriesRemaining.getAndDecrement() > 0) {
if (attemptConnection()) {
return;
}
error("Connection retry failed, giving up.");
deviceManager.systemLifecycle(SystemMode.TERMINATE);
} else {
error("Unknown exception type " + toReport.getClass(), toReport);
}
error("Connection retry failed, giving up.");
deviceManager.systemLifecycle(SystemMode.TERMINATE);
}

@Override
Expand All @@ -541,12 +526,10 @@ public void persistEndpoint(EndpointConfiguration endpoint) {
}

@Override
public void resetConnection(String targetEndpoint) {
public synchronized void resetConnection(String targetEndpoint) {
try {
config.endpoint = fromJsonString(targetEndpoint,
EndpointConfiguration.class);
disconnectMqtt();
initializeMqtt();
retriesRemaining.set(CONNECT_RETRIES);
startConnection(connectionDone);
} catch (Exception e) {
Expand Down Expand Up @@ -700,7 +683,7 @@ public void setConfigLatch(CountDownLatch countDownLatch) {

@Override
public boolean isConnected() {
return isConnected;
return deviceTarget != null && deviceTarget.isActive();
}

@Override
Expand Down
6 changes: 4 additions & 2 deletions pubber/src/main/java/daq/pubber/PubberGatewayManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import udmi.lib.ProtocolFamily;
import udmi.lib.client.GatewayManager;
import udmi.lib.client.ProxyDeviceHost;
Expand Down Expand Up @@ -47,8 +48,9 @@ public void setMetadata(Metadata metadata) {

@Override
public void activate() {
ifNotNullThen(proxyDevices, p -> p.values()
.parallelStream().forEach(ProxyDeviceHost::activate));
ifNotNullThen(proxyDevices, p -> CompletableFuture.runAsync(() -> p.values()
.parallelStream()
.forEach(ProxyDeviceHost::activate)));
}

@Override
Expand Down
39 changes: 27 additions & 12 deletions pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.http.ConnectionClosedException;
import udmi.lib.base.GatewayError;
import udmi.lib.base.MqttDevice;
import udmi.lib.base.MqttPublisher.FakeTopic;
import udmi.lib.base.MqttPublisher.InjectedMessage;
import udmi.lib.base.MqttPublisher.InjectedState;
import udmi.lib.base.MqttPublisher.PublisherException;
import udmi.lib.client.DeviceManager;
import udmi.lib.client.PointsetManager;
import udmi.lib.client.PointsetManager.ExtraPointsetEvent;
Expand Down Expand Up @@ -242,9 +244,9 @@ default void captureExceptions(String action, Runnable runnable) {
*/
default void disconnectMqtt() {
if (getDeviceTarget() != null) {
captureExceptions("closing mqtt publisher", () -> getDeviceTarget().close());
captureExceptions("shutting down mqtt publisher executor",
captureExceptions("Shutting down MQTT publisher executor",
() -> getDeviceTarget().shutdown());
captureExceptions("Closing MQTT publisher", () -> getDeviceTarget().close());
setDeviceTarget(null);
}
}
Expand Down Expand Up @@ -805,16 +807,12 @@ default void publishSynchronousState() {
}
}

default boolean publisherActive() {
return getDeviceTarget() != null && getDeviceTarget().isActive();
}

/**
* Publishes the current device state as a message to the publisher if the publisher is active. If
* the publisher is not active, it marks the state as dirty and returns without publishing.
*/
default void publishStateMessage() {
if (!publisherActive()) {
if (!isConnected()) {
markStateDirty(-1);
return;
}
Expand Down Expand Up @@ -898,8 +896,8 @@ private void publishDeviceMessage(String targetId, Object message) {
* configured.
*/
default void publishDeviceMessage(String targetId, Object message, Runnable callback) {
if (getDeviceTarget() == null) {
error("publisher not active");
if (!isConnected()) {
error(format("Publisher not active (%s)", targetId));
return;
}
String topicSuffix = MESSAGE_TOPIC_SUFFIX_MAP.get(message.getClass());
Expand Down Expand Up @@ -989,6 +987,10 @@ default void debug(String message, String detail) {

void startConnection(Function<String, Boolean> connectionDone);

void reconnect();

void resetConnection(String targetEndpoint);

/**
* Flushes the dirty state by publishing an asynchronous state change.
*/
Expand All @@ -1000,12 +1002,25 @@ default void flushDirtyState() {

byte[] ensureKeyBytes();

void publisherException(Exception toReport);
/**
* Handles exceptions related to the publisher and
* takes appropriate actions based on the exception type.
*
* @param toReport the exception to be handled;
*/
default void publisherException(Exception toReport) {
if (toReport instanceof PublisherException r) {
publisherHandler(r.getType(), r.getPhase(), r.getCause(), r.getDeviceId());
} else if (toReport instanceof ConnectionClosedException) {
warn("Connection closed, attempting reconnect...");
reconnect();
} else {
error("Unknown exception type " + toReport.getClass(), toReport);
}
}

void persistEndpoint(EndpointConfiguration endpoint);

void resetConnection(String targetEndpoint);

String traceTimestamp(String messageBase);

/**
Expand Down
Loading

0 comments on commit 208bc63

Please sign in to comment.