From 39750e17a87631e961cddf8789136d86c151d10e Mon Sep 17 00:00:00 2001 From: Trevor Date: Thu, 7 Dec 2023 09:48:06 -0500 Subject: [PATCH] Refactoring DiscoveryManager (#782) --- bin/sequencer | 2 +- .../com/google/udmi/util/GeneralUtils.java | 5 + .../java/com/google/udmi/util/JsonUtil.java | 64 ++--- .../main/java/daq/pubber/DeviceManager.java | 10 +- .../java/daq/pubber/DiscoveryManager.java | 269 ++++++++++++++++++ .../main/java/daq/pubber/PointsetManager.java | 2 +- pubber/src/main/java/daq/pubber/Pubber.java | 267 +---------------- .../main/java/daq/pubber/SystemManager.java | 15 +- .../access/DynamicIotAccessProvider.java | 4 +- .../udmi/service/access/IotAccessBase.java | 4 +- .../bos/udmi/service/core/ProcessorBase.java | 14 +- .../udmi/service/core/ReflectProcessor.java | 8 +- .../service/messaging/impl/PubSubPipe.java | 4 +- .../messaging/impl/TraceMessagePipe.java | 12 +- .../bos/udmi/service/pod/ContainerBase.java | 2 +- .../iot/core/proxy/IotReflectorClient.java | 13 +- .../bos/iot/core/proxy/MockPublisher.java | 4 +- .../google/daq/mqtt/mapping/MappingAgent.java | 4 +- .../daq/mqtt/mapping/MappingEngine.java | 8 +- .../daq/mqtt/sequencer/SequenceBase.java | 31 +- .../sequencer/sequences/ConfigSequences.java | 10 +- .../sequences/DiscoverySequences.java | 7 +- .../sequences/PointsetSequences.java | 3 +- .../google/daq/mqtt/util/IotCoreProvider.java | 4 +- .../daq/mqtt/util/ObjectDiffEngine.java | 4 +- .../google/daq/mqtt/util/PubSubClient.java | 10 +- .../google/daq/mqtt/validator/Validator.java | 1 - .../com/google/udmi/util/PubSubReflector.java | 6 +- .../google/daq/mqtt/validator/BasicTest.java | 11 +- .../google/daq/mqtt/validator/TestBase.java | 3 +- 30 files changed, 416 insertions(+), 385 deletions(-) create mode 100644 pubber/src/main/java/daq/pubber/DiscoveryManager.java diff --git a/bin/sequencer b/bin/sequencer index 00436a4ede..82a776f640 100755 --- a/bin/sequencer +++ b/bin/sequencer @@ -102,7 +102,7 @@ elif [[ -f $site_path ]]; then serial_no=`jq -r .serial_no $site_path` [[ $serial_no == null ]] && serial_no=$DEFAULT_SERIAL endpoint=`jq .reflector_endpoint $site_path` - alt_registry=null + alt_registry=`jq .alt_registry $site_path` else echo Site model $site_path not found. false diff --git a/common/src/main/java/com/google/udmi/util/GeneralUtils.java b/common/src/main/java/com/google/udmi/util/GeneralUtils.java index cb152c033b..f95d87c79d 100644 --- a/common/src/main/java/com/google/udmi/util/GeneralUtils.java +++ b/common/src/main/java/com/google/udmi/util/GeneralUtils.java @@ -1,5 +1,6 @@ package com.google.udmi.util; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.ProperPrinter.OutputFormat.COMPRESSED; import static com.google.udmi.util.ProperPrinter.OutputFormat.VERBOSE; import static java.lang.String.format; @@ -490,4 +491,8 @@ public static void setClockSkew(Duration skew) { public static Date getNow() { return Date.from(Instant.now().plus(clockSkew)); } + + public static String getTimestamp() { + return isoConvert(getNow()); + } } diff --git a/common/src/main/java/com/google/udmi/util/JsonUtil.java b/common/src/main/java/com/google/udmi/util/JsonUtil.java index 7b86a445e7..da8185c653 100644 --- a/common/src/main/java/com/google/udmi/util/JsonUtil.java +++ b/common/src/main/java/com/google/udmi/util/JsonUtil.java @@ -1,10 +1,11 @@ package com.google.udmi.util; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.udmi.util.GeneralUtils.fromJsonString; +import static com.google.udmi.util.GeneralUtils.toJsonString; import static java.util.Objects.requireNonNull; import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonParser.Feature; import com.fasterxml.jackson.core.json.JsonReadFeature; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -12,11 +13,9 @@ import com.fasterxml.jackson.databind.SerializationFeature; import java.io.File; import java.nio.file.Files; -import java.nio.file.Paths; import java.time.Instant; import java.util.Date; import java.util.Map; -import java.util.Objects; import java.util.TreeMap; /** @@ -145,42 +144,13 @@ public static Instant getInstant(String timestamp) { return timestamp == null ? null : Instant.parse(replaced); } - /** - * Get a proper JSON string representation of the given Date. - * - * @param date thing to convert - * @return converted to a string - */ - public static String getTimestamp(Date date) { - try { - if (date == null) { - return "null"; - } - String dateString = stringify(date); - // Remove the encapsulating quotes included because it's a JSON string-in-a-string. - return dateString.substring(1, dateString.length() - 1); - } catch (Exception e) { - throw new RuntimeException("Creating timestamp", e); - } - } - - /** - * Get a proper JSON string representation of the given Instant. - * - * @param timestamp thing to convert - * @return converted to string - */ - public static String getTimestamp(Instant timestamp) { - return getTimestamp(Date.from(timestamp)); - } - /** * Get a current timestamp string. * * @return current ISO timestamp */ - public static String getTimestamp() { - return getTimestamp(CleanDateFormat.cleanDate()); + public static String isoConvert() { + return isoConvert(CleanDateFormat.cleanDate()); } /** @@ -402,4 +372,30 @@ public static void writeFile(Object target, File file) { throw new RuntimeException("While writing " + file.getAbsolutePath(), e); } } + + private static Date isoConvert(String timestamp) { + try { + String wrappedString = "\"" + timestamp + "\""; + return fromJsonString(wrappedString, Date.class); + } catch (Exception e) { + throw new RuntimeException("Creating date", e); + } + } + + public static String isoConvert(Instant timestamp) { + return isoConvert(Date.from(timestamp)); + } + + public static String isoConvert(Date timestamp) { + try { + if (timestamp == null) { + return "null"; + } + String dateString = toJsonString(timestamp); + // Strip off the leading and trailing quotes from the JSON string-as-string representation. + return dateString.substring(1, dateString.length() - 1); + } catch (Exception e) { + throw new RuntimeException("Creating timestamp", e); + } + } } diff --git a/pubber/src/main/java/daq/pubber/DeviceManager.java b/pubber/src/main/java/daq/pubber/DeviceManager.java index e166121e99..cdccda6555 100644 --- a/pubber/src/main/java/daq/pubber/DeviceManager.java +++ b/pubber/src/main/java/daq/pubber/DeviceManager.java @@ -1,5 +1,6 @@ package daq.pubber; +import com.google.udmi.util.SiteModel; import java.util.Map; import udmi.schema.Config; import udmi.schema.DevicePersistent; @@ -19,6 +20,7 @@ public class DeviceManager extends ManagerBase { private final SystemManager systemManager; private final LocalnetManager localnetManager; private final GatewayManager gatewayManager; + private final DiscoveryManager discoveryManager; /** @@ -30,6 +32,7 @@ public DeviceManager(ManagerHost host, PubberConfiguration configuration) { pointsetManager = new PointsetManager(host, configuration); localnetManager = new LocalnetManager(host, configuration); gatewayManager = new GatewayManager(host, configuration); + discoveryManager = new DiscoveryManager(host, configuration, this); } public void setPersistentData(DevicePersistent persistentData) { @@ -62,7 +65,7 @@ public void localLog(Entry report) { } public void localLog(String message, Level trace, String timestamp, String detail) { - systemManager.localLog(message, trace, timestamp, detail); + SystemManager.localLog(message, trace, timestamp, detail); } public String getTestingTag() { @@ -76,6 +79,7 @@ public void updateConfig(Config config) { pointsetManager.updateConfig(config.pointset); systemManager.updateConfig(config.system, config.timestamp); gatewayManager.updateConfig(config.gateway); + discoveryManager.updateConfig(config.discovery); } public void publishLogMessage(Entry logEntry) { @@ -99,4 +103,8 @@ public void shutdown() { public Map enumerateFamilies() { return localnetManager.enumerateFamilies(); } + + public void setSiteModel(SiteModel siteModel) { + discoveryManager.setSiteModel(siteModel); + } } diff --git a/pubber/src/main/java/daq/pubber/DiscoveryManager.java b/pubber/src/main/java/daq/pubber/DiscoveryManager.java new file mode 100644 index 0000000000..17f5def829 --- /dev/null +++ b/pubber/src/main/java/daq/pubber/DiscoveryManager.java @@ -0,0 +1,269 @@ +package daq.pubber; + +import static com.google.udmi.util.GeneralUtils.isGetTrue; +import static com.google.udmi.util.JsonUtil.isoConvert; +import static daq.pubber.Pubber.DEVICE_START_TIME; +import static java.lang.String.format; +import static java.util.Optional.ofNullable; +import static java.util.stream.Collectors.toMap; + +import com.google.udmi.util.SiteModel; +import java.time.Instant; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import udmi.schema.DiscoveryConfig; +import udmi.schema.DiscoveryEvent; +import udmi.schema.DiscoveryState; +import udmi.schema.Enumerate; +import udmi.schema.FamilyDiscoveryConfig; +import udmi.schema.FamilyDiscoveryEvent; +import udmi.schema.FamilyDiscoveryState; +import udmi.schema.FamilyLocalnetModel; +import udmi.schema.Metadata; +import udmi.schema.PointEnumerationEvent; +import udmi.schema.PointPointsetModel; +import udmi.schema.PointsetState; +import udmi.schema.PubberConfiguration; + +/** + * Manager wrapper for discovery functionality in pubber. + */ +public class DiscoveryManager extends ManagerBase { + + public static final int SCAN_DURATION_SEC = 10; + + private final DeviceManager deviceManager; + private DiscoveryState discoveryState; + private DiscoveryConfig discoveryConfig; + private SiteModel siteModel; + + public DiscoveryManager(ManagerHost host, PubberConfiguration configuration, + DeviceManager deviceManager) { + super(host, configuration); + this.deviceManager = deviceManager; + } + + private void updateDiscoveryEnumeration(DiscoveryConfig config) { + Date enumerationGeneration = config.generation; + if (enumerationGeneration == null) { + discoveryState.generation = null; + return; + } + if (discoveryState.generation != null + && !enumerationGeneration.after(discoveryState.generation)) { + return; + } + discoveryState.generation = enumerationGeneration; + info("Discovery enumeration at " + isoConvert(enumerationGeneration)); + DiscoveryEvent discoveryEvent = new DiscoveryEvent(); + discoveryEvent.generation = enumerationGeneration; + Enumerate enumerate = config.enumerate; + discoveryEvent.uniqs = ifTrue(enumerate.uniqs, () -> enumeratePoints(deviceId)); + discoveryEvent.features = ifTrue(enumerate.features, SupportedFeatures::getFeatures); + discoveryEvent.families = ifTrue(enumerate.families, deviceManager::enumerateFamilies); + host.publish(discoveryEvent); + } + + private void updateDiscoveryScan(HashMap familiesRaw) { + HashMap families = + familiesRaw == null ? new HashMap<>() : familiesRaw; + if (discoveryState.families == null) { + discoveryState.families = new HashMap<>(); + } + + discoveryState.families.keySet().forEach(family -> { + if (!families.containsKey(family)) { + FamilyDiscoveryState familyDiscoveryState = discoveryState.families.get(family); + if (familyDiscoveryState.generation != null) { + info("Clearing scheduled discovery family " + family); + familyDiscoveryState.generation = null; + familyDiscoveryState.active = null; + } + } + }); + families.keySet().forEach(family -> { + FamilyDiscoveryConfig familyDiscoveryConfig = families.get(family); + Date configGeneration = familyDiscoveryConfig.generation; + if (configGeneration == null) { + discoveryState.families.remove(family); + return; + } + + Date previousGeneration = getFamilyDiscoveryState(family).generation; + Date baseGeneration = previousGeneration == null ? DEVICE_START_TIME : previousGeneration; + final Date startGeneration; + if (configGeneration.before(baseGeneration)) { + int interval = getScanInterval(family); + if (interval > 0) { + long deltaSec = (baseGeneration.getTime() - configGeneration.getTime() + 999) / 1000; + long intervals = (deltaSec + interval - 1) / interval; + startGeneration = Date.from( + configGeneration.toInstant().plusSeconds(intervals * interval)); + } else { + return; + } + } else { + startGeneration = configGeneration; + } + + info("Discovery scan generation " + family + " is " + isoConvert(startGeneration)); + scheduleFuture(startGeneration, () -> checkDiscoveryScan(family, startGeneration)); + }); + + if (discoveryState.families.isEmpty()) { + discoveryState.families = null; + } + } + + private FamilyDiscoveryState getFamilyDiscoveryState(String family) { + return discoveryState.families.computeIfAbsent( + family, key -> new FamilyDiscoveryState()); + } + + private void checkDiscoveryScan(String family, Date scanGeneration) { + try { + FamilyDiscoveryState familyDiscoveryState = getFamilyDiscoveryState(family); + if (familyDiscoveryState.generation == null + || familyDiscoveryState.generation.before(scanGeneration)) { + scheduleDiscoveryScan(family, scanGeneration); + } + } catch (Exception e) { + throw new RuntimeException("While checking for discovery scan start", e); + } + } + + private void scheduleDiscoveryScan(String family, Date scanGeneration) { + info("Discovery scan starting " + family + " as " + isoConvert(scanGeneration)); + Date stopTime = Date.from(Instant.now().plusSeconds(SCAN_DURATION_SEC)); + FamilyDiscoveryState familyDiscoveryState = getFamilyDiscoveryState(family); + scheduleFuture(stopTime, () -> discoveryScanComplete(family, scanGeneration)); + familyDiscoveryState.generation = scanGeneration; + familyDiscoveryState.active = true; + updateState(); + Date sendTime = Date.from(Instant.now().plusSeconds(SCAN_DURATION_SEC / 2)); + scheduleFuture(sendTime, () -> sendDiscoveryEvent(family, scanGeneration)); + } + + private void sendDiscoveryEvent(String family, Date scanGeneration) { + FamilyDiscoveryState familyDiscoveryState = getFamilyDiscoveryState(family); + if (scanGeneration.equals(familyDiscoveryState.generation) + && familyDiscoveryState.active) { + AtomicInteger sentEvents = new AtomicInteger(); + siteModel.forEachMetadata((deviceId, targetMetadata) -> { + FamilyLocalnetModel familyLocalnetModel = getFamilyLocalnetModel(family, targetMetadata); + if (familyLocalnetModel != null && familyLocalnetModel.addr != null) { + DiscoveryEvent discoveryEvent = new DiscoveryEvent(); + discoveryEvent.generation = scanGeneration; + discoveryEvent.scan_family = family; + discoveryEvent.scan_addr = deviceId; + discoveryEvent.families = targetMetadata.localnet.families.entrySet().stream() + .collect(toMap(Map.Entry::getKey, this::eventForTarget)); + discoveryEvent.families.computeIfAbsent("iot", + key -> new FamilyDiscoveryEvent()).addr = deviceId; + if (isGetTrue(() -> discoveryConfig.families.get(family).enumerate)) { + discoveryEvent.uniqs = enumeratePoints(deviceId); + } + host.publish(discoveryEvent); + sentEvents.incrementAndGet(); + } + }); + info("Sent " + sentEvents.get() + " discovery events from " + family + " for " + + scanGeneration); + } + } + + private FamilyDiscoveryEvent eventForTarget(Map.Entry target) { + FamilyDiscoveryEvent event = new FamilyDiscoveryEvent(); + event.addr = target.getValue().addr; + return event; + } + + private FamilyLocalnetModel getFamilyLocalnetModel(String family, Metadata targetMetadata) { + try { + return targetMetadata.localnet.families.get(family); + } catch (Exception e) { + return null; + } + } + + private void discoveryScanComplete(String family, Date scanGeneration) { + try { + FamilyDiscoveryState familyDiscoveryState = getFamilyDiscoveryState(family); + if (scanGeneration.equals(familyDiscoveryState.generation)) { + int interval = getScanInterval(family); + if (interval > 0) { + Date newGeneration = Date.from(scanGeneration.toInstant().plusSeconds(interval)); + scheduleFuture(newGeneration, () -> checkDiscoveryScan(family, newGeneration)); + } else { + info("Discovery scan stopping " + family + " from " + isoConvert(scanGeneration)); + familyDiscoveryState.active = false; + updateState(); + } + } + } catch (Exception e) { + throw new RuntimeException("While checking for discovery scan complete", e); + } + } + + private int getScanInterval(String family) { + try { + return discoveryConfig.families.get(family).scan_interval_sec; + } catch (Exception e) { + return 0; + } + } + + private T ifTrue(Boolean condition, Supplier supplier) { + return isGetTrue(() -> condition) ? supplier.get() : null; + } + + private Map enumeratePoints(String deviceId) { + return siteModel.getMetadata(deviceId).pointset.points.entrySet().stream().collect( + Collectors.toMap(this::getPointUniqKey, this::getPointEnumerationEvent)); + } + + private String getPointUniqKey(Map.Entry entry) { + return format("%08x", entry.getKey().hashCode()); + } + + private PointEnumerationEvent getPointEnumerationEvent( + Map.Entry entry) { + PointEnumerationEvent pointEnumerationEvent = new PointEnumerationEvent(); + PointPointsetModel model = entry.getValue(); + pointEnumerationEvent.writable = model.writable; + pointEnumerationEvent.units = model.units; + pointEnumerationEvent.ref = model.ref; + pointEnumerationEvent.name = entry.getKey(); + return pointEnumerationEvent; + } + + private void updateState() { + updateState(ofNullable((Object) discoveryState).orElse(DiscoveryState.class)); + } + + /** + * Update the discovery config. + */ + public void updateConfig(DiscoveryConfig discovery) { + discoveryConfig = discovery; + if (discovery == null) { + discoveryState = null; + updateState(); + return; + } + if (discoveryState == null) { + discoveryState = new DiscoveryState(); + } + updateDiscoveryEnumeration(discovery); + updateDiscoveryScan(discovery.families); + updateState(); + } + + public void setSiteModel(SiteModel siteModel) { + this.siteModel = siteModel; + } +} diff --git a/pubber/src/main/java/daq/pubber/PointsetManager.java b/pubber/src/main/java/daq/pubber/PointsetManager.java index a2eff7712a..b66a5c4f33 100644 --- a/pubber/src/main/java/daq/pubber/PointsetManager.java +++ b/pubber/src/main/java/daq/pubber/PointsetManager.java @@ -1,12 +1,12 @@ package daq.pubber; import static com.google.udmi.util.GeneralUtils.getNow; +import static com.google.udmi.util.GeneralUtils.getTimestamp; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.GeneralUtils.ifNotTrueThen; import static com.google.udmi.util.GeneralUtils.ifNullThen; import static com.google.udmi.util.GeneralUtils.ifTrueGet; -import static com.google.udmi.util.JsonUtil.getTimestamp; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.Optional.ofNullable; diff --git a/pubber/src/main/java/daq/pubber/Pubber.java b/pubber/src/main/java/daq/pubber/Pubber.java index ea9ddb7a4a..d050911213 100644 --- a/pubber/src/main/java/daq/pubber/Pubber.java +++ b/pubber/src/main/java/daq/pubber/Pubber.java @@ -10,6 +10,7 @@ import static com.google.udmi.util.GeneralUtils.fromJsonFile; import static com.google.udmi.util.GeneralUtils.fromJsonString; import static com.google.udmi.util.GeneralUtils.getNow; +import static com.google.udmi.util.GeneralUtils.getTimestamp; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.GeneralUtils.ifTrueThen; @@ -17,8 +18,10 @@ import static com.google.udmi.util.GeneralUtils.isTrue; import static com.google.udmi.util.GeneralUtils.optionsString; import static com.google.udmi.util.GeneralUtils.setClockSkew; +import static com.google.udmi.util.GeneralUtils.stackTraceString; import static com.google.udmi.util.GeneralUtils.toJsonFile; import static com.google.udmi.util.GeneralUtils.toJsonString; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.safeSleep; import static com.google.udmi.util.JsonUtil.stringifyTerse; import static daq.pubber.MqttDevice.CONFIG_TOPIC; @@ -29,7 +32,6 @@ import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNullElse; import static java.util.Optional.ofNullable; -import static java.util.stream.Collectors.toMap; import static udmi.schema.BlobsetConfig.SystemBlobsets.IOT_ENDPOINT_CONFIG; import static udmi.schema.EndpointConfiguration.Protocol.MQTT; @@ -47,10 +49,7 @@ import daq.pubber.PointsetManager.ExtraPointsetEvent; import daq.pubber.PubSubClient.Bundle; import daq.pubber.SystemManager.ExtraSystemState; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.OutputStream; -import java.io.PrintStream; import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Path; @@ -71,8 +70,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.http.ConnectionClosedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,27 +82,19 @@ import udmi.schema.CloudModel.Auth_type; import udmi.schema.Config; import udmi.schema.DevicePersistent; -import udmi.schema.DiscoveryConfig; import udmi.schema.DiscoveryEvent; import udmi.schema.DiscoveryState; import udmi.schema.EndpointConfiguration; import udmi.schema.EndpointConfiguration.Protocol; import udmi.schema.Entry; -import udmi.schema.Enumerate; import udmi.schema.Envelope; import udmi.schema.Envelope.SubFolder; import udmi.schema.Envelope.SubType; -import udmi.schema.FamilyDiscoveryConfig; -import udmi.schema.FamilyDiscoveryEvent; -import udmi.schema.FamilyDiscoveryState; -import udmi.schema.FamilyLocalnetModel; import udmi.schema.GatewayState; import udmi.schema.Level; import udmi.schema.LocalnetState; import udmi.schema.Metadata; import udmi.schema.Operation.SystemMode; -import udmi.schema.PointEnumerationEvent; -import udmi.schema.PointPointsetModel; import udmi.schema.PointsetEvent; import udmi.schema.PointsetState; import udmi.schema.PubberConfiguration; @@ -119,14 +108,13 @@ */ public class Pubber extends ManagerBase implements ManagerHost { - public static final int SCAN_DURATION_SEC = 10; public static final String PUBBER_OUT = "pubber/out"; public static final String PERSISTENT_STORE_FILE = "persistent_data.json"; public static final String PERSISTENT_TMP_FORMAT = "/tmp/pubber_%s_" + PERSISTENT_STORE_FILE; public static final String DATA_URL_JSON_BASE64 = "data:application/json;base64,"; static final String UDMI_VERSION = SchemaVersion.CURRENT.key(); static final Logger LOG = LoggerFactory.getLogger(Pubber.class); - static final Date deviceStartTime = getRoundedStartTime(); + static final Date DEVICE_START_TIME = getRoundedStartTime(); static final int MESSAGE_REPORT_INTERVAL = 10; private static final String BROKEN_VERSION = "1.4."; private static final String HOSTNAME = System.getenv("HOSTNAME"); @@ -183,6 +171,7 @@ public class Pubber extends ManagerBase implements ManagerHost { private int deviceUpdateCount = -1; private DeviceManager deviceManager; private boolean isConnected; + private boolean isGatewayDevice; /** @@ -379,6 +368,7 @@ private void initializeDevice() { if (configuration.sitePath != null) { siteModel = new SiteModel(configuration.sitePath); siteModel.initialize(); + deviceManager.setSiteModel(siteModel); if (configuration.endpoint == null) { configuration.endpoint = siteModel.makeEndpointConfig(configuration.iotProject, deviceId); } @@ -497,6 +487,8 @@ public void update(Object update) { deviceState.localnet = (LocalnetState) checkValue; } else if (checkTarget instanceof GatewayState) { deviceState.gateway = (GatewayState) checkValue; + } else if (checkTarget instanceof DiscoveryState) { + deviceState.discovery = (DiscoveryState) checkValue; } else { throw new RuntimeException( "Unrecognized update type " + checkTarget.getClass().getSimpleName()); @@ -591,7 +583,7 @@ public void periodicUpdate() { private void checkSmokyFailure() { if (isTrue(configuration.options.smokeCheck) - && Instant.now().minus(SMOKE_CHECK_TIME).isAfter(deviceStartTime.toInstant())) { + && Instant.now().minus(SMOKE_CHECK_TIME).isAfter(DEVICE_START_TIME.toInstant())) { error(format("Smoke check failed after %sm, terminating run.", SMOKE_CHECK_TIME.getSeconds() / 60)); deviceManager.systemLifecycle(SystemMode.TERMINATE); @@ -941,7 +933,6 @@ private void processConfigUpdate(Config config) { GeneralUtils.copyFields(config, deviceConfig, true); info(format("%s received config %s", getTimestamp(), isoConvert(config.timestamp))); deviceManager.updateConfig(config); - updateDiscoveryConfig(config.discovery); extractEndpointBlobConfig(); } else { info(getTimestamp() + " defaulting empty config"); @@ -1116,246 +1107,6 @@ private String extractConfigBlob(String blobName) { } } - private void updateDiscoveryConfig(DiscoveryConfig config) { - if (config == null) { - deviceState.discovery = null; - return; - } - if (deviceState.discovery == null) { - deviceState.discovery = new DiscoveryState(); - } - updateDiscoveryEnumeration(config); - updateDiscoveryScan(config.families); - } - - private void updateDiscoveryEnumeration(DiscoveryConfig config) { - Date enumerationGeneration = config.generation; - if (enumerationGeneration == null) { - deviceState.discovery.generation = null; - return; - } - if (deviceState.discovery.generation != null - && !enumerationGeneration.after(deviceState.discovery.generation)) { - return; - } - deviceState.discovery.generation = enumerationGeneration; - info("Discovery enumeration at " + isoConvert(enumerationGeneration)); - DiscoveryEvent discoveryEvent = new DiscoveryEvent(); - discoveryEvent.generation = enumerationGeneration; - Enumerate enumerate = config.enumerate; - discoveryEvent.uniqs = ifTrue(enumerate.uniqs, () -> enumeratePoints(configuration.deviceId)); - discoveryEvent.features = ifTrue(enumerate.features, SupportedFeatures::getFeatures); - discoveryEvent.families = ifTrue(enumerate.families, () -> deviceManager.enumerateFamilies()); - publishDeviceMessage(discoveryEvent); - } - - private T ifTrue(Boolean condition, Supplier supplier) { - return isGetTrue(() -> condition) ? supplier.get() : null; - } - - private Map enumeratePoints(String deviceId) { - return siteModel.getMetadata(deviceId).pointset.points.entrySet().stream().collect( - Collectors.toMap(this::getPointUniqKey, this::getPointEnumerationEvent)); - } - - private String getPointUniqKey(Map.Entry entry) { - return format("%08x", entry.getKey().hashCode()); - } - - private PointEnumerationEvent getPointEnumerationEvent( - Map.Entry entry) { - PointEnumerationEvent pointEnumerationEvent = new PointEnumerationEvent(); - PointPointsetModel model = entry.getValue(); - pointEnumerationEvent.writable = model.writable; - pointEnumerationEvent.units = model.units; - pointEnumerationEvent.ref = model.ref; - pointEnumerationEvent.name = entry.getKey(); - return pointEnumerationEvent; - } - - private void updateDiscoveryScan(HashMap familiesRaw) { - HashMap families = - familiesRaw == null ? new HashMap<>() : familiesRaw; - if (deviceState.discovery.families == null) { - deviceState.discovery.families = new HashMap<>(); - } - - deviceState.discovery.families.keySet().forEach(family -> { - if (!families.containsKey(family)) { - FamilyDiscoveryState familyDiscoveryState = deviceState.discovery.families.get(family); - if (familyDiscoveryState.generation != null) { - info("Clearing scheduled discovery family " + family); - familyDiscoveryState.generation = null; - familyDiscoveryState.active = null; - } - } - }); - families.keySet().forEach(family -> { - FamilyDiscoveryConfig familyDiscoveryConfig = families.get(family); - Date configGeneration = familyDiscoveryConfig.generation; - if (configGeneration == null) { - deviceState.discovery.families.remove(family); - return; - } - - Date previousGeneration = getFamilyDiscoveryState(family).generation; - Date baseGeneration = previousGeneration == null ? deviceStartTime : previousGeneration; - final Date startGeneration; - if (configGeneration.before(baseGeneration)) { - int interval = getScanInterval(family); - if (interval > 0) { - long deltaSec = (baseGeneration.getTime() - configGeneration.getTime() + 999) / 1000; - long intervals = (deltaSec + interval - 1) / interval; - startGeneration = Date.from( - configGeneration.toInstant().plusSeconds(intervals * interval)); - } else { - return; - } - } else { - startGeneration = configGeneration; - } - - info("Discovery scan generation " + family + " is " + isoConvert(startGeneration)); - scheduleFuture(startGeneration, () -> checkDiscoveryScan(family, startGeneration)); - }); - - if (deviceState.discovery.families.isEmpty()) { - deviceState.discovery.families = null; - } - } - - private FamilyDiscoveryState getFamilyDiscoveryState(String family) { - return deviceState.discovery.families.computeIfAbsent( - family, key -> new FamilyDiscoveryState()); - } - - private void checkDiscoveryScan(String family, Date scanGeneration) { - try { - FamilyDiscoveryState familyDiscoveryState = getFamilyDiscoveryState(family); - if (familyDiscoveryState.generation == null - || familyDiscoveryState.generation.before(scanGeneration)) { - scheduleDiscoveryScan(family, scanGeneration); - } - } catch (Exception e) { - throw new RuntimeException("While checking for discovery scan start", e); - } - } - - private void scheduleDiscoveryScan(String family, Date scanGeneration) { - info("Discovery scan starting " + family + " as " + isoConvert(scanGeneration)); - Date stopTime = Date.from(Instant.now().plusSeconds(SCAN_DURATION_SEC)); - FamilyDiscoveryState familyDiscoveryState = getFamilyDiscoveryState(family); - scheduleFuture(stopTime, () -> discoveryScanComplete(family, scanGeneration)); - familyDiscoveryState.generation = scanGeneration; - familyDiscoveryState.active = true; - publishAsynchronousState(); - Date sendTime = Date.from(Instant.now().plusSeconds(SCAN_DURATION_SEC / 2)); - scheduleFuture(sendTime, () -> sendDiscoveryEvent(family, scanGeneration)); - } - - private void sendDiscoveryEvent(String family, Date scanGeneration) { - FamilyDiscoveryState familyDiscoveryState = getFamilyDiscoveryState(family); - if (scanGeneration.equals(familyDiscoveryState.generation) - && familyDiscoveryState.active) { - AtomicInteger sentEvents = new AtomicInteger(); - siteModel.forEachMetadata((deviceId, targetMetadata) -> { - FamilyLocalnetModel familyLocalnetModel = getFamilyLocalnetModel(family, targetMetadata); - if (familyLocalnetModel != null && familyLocalnetModel.addr != null) { - DiscoveryEvent discoveryEvent = new DiscoveryEvent(); - discoveryEvent.generation = scanGeneration; - discoveryEvent.scan_family = family; - discoveryEvent.scan_addr = deviceId; - discoveryEvent.families = targetMetadata.localnet.families.entrySet().stream() - .collect(toMap(Map.Entry::getKey, this::eventForTarget)); - discoveryEvent.families.computeIfAbsent("iot", - key -> new FamilyDiscoveryEvent()).addr = deviceId; - if (isGetTrue(() -> deviceConfig.discovery.families.get(family).enumerate)) { - discoveryEvent.uniqs = enumeratePoints(deviceId); - } - publishDeviceMessage(discoveryEvent); - sentEvents.incrementAndGet(); - } - }); - info("Sent " + sentEvents.get() + " discovery events from " + family + " for " - + scanGeneration); - } - } - - private FamilyDiscoveryEvent eventForTarget(Map.Entry target) { - FamilyDiscoveryEvent event = new FamilyDiscoveryEvent(); - event.addr = target.getValue().addr; - return event; - } - - private FamilyLocalnetModel getFamilyLocalnetModel(String family, Metadata targetMetadata) { - try { - return targetMetadata.localnet.families.get(family); - } catch (Exception e) { - return null; - } - } - - private void discoveryScanComplete(String family, Date scanGeneration) { - try { - FamilyDiscoveryState familyDiscoveryState = getFamilyDiscoveryState(family); - if (scanGeneration.equals(familyDiscoveryState.generation)) { - int interval = getScanInterval(family); - if (interval > 0) { - Date newGeneration = Date.from(scanGeneration.toInstant().plusSeconds(interval)); - scheduleFuture(newGeneration, () -> checkDiscoveryScan(family, newGeneration)); - } else { - info("Discovery scan stopping " + family + " from " + isoConvert(scanGeneration)); - familyDiscoveryState.active = false; - publishAsynchronousState(); - } - } - } catch (Exception e) { - throw new RuntimeException("While checking for discovery scan complete", e); - } - } - - private int getScanInterval(String family) { - try { - return deviceConfig.discovery.families.get(family).scan_interval_sec; - } catch (Exception e) { - return 0; - } - } - - private String stackTraceString(Throwable e) { - OutputStream outputStream = new ByteArrayOutputStream(); - try (PrintStream ps = new PrintStream(outputStream)) { - e.printStackTrace(ps); - } - return outputStream.toString(); - } - - private String getTimestamp() { - return isoConvert(getNow()); - } - - private Date isoConvert(String timestamp) { - try { - String wrappedString = "\"" + timestamp + "\""; - return fromJsonString(wrappedString, Date.class); - } catch (Exception e) { - throw new RuntimeException("Creating date", e); - } - } - - private String isoConvert(Date timestamp) { - try { - if (timestamp == null) { - return "null"; - } - String dateString = toJsonString(timestamp); - // Strip off the leading and trailing quotes from the JSON string-as-string representation. - return dateString.substring(1, dateString.length() - 1); - } catch (Exception e) { - throw new RuntimeException("Creating timestamp", e); - } - } - private byte[] getFileBytes(String dataFile) { Path dataPath = Paths.get(dataFile); try { diff --git a/pubber/src/main/java/daq/pubber/SystemManager.java b/pubber/src/main/java/daq/pubber/SystemManager.java index 66e6924051..82762ab169 100644 --- a/pubber/src/main/java/daq/pubber/SystemManager.java +++ b/pubber/src/main/java/daq/pubber/SystemManager.java @@ -2,11 +2,12 @@ import static com.google.udmi.util.GeneralUtils.catchOrElse; import static com.google.udmi.util.GeneralUtils.catchToNull; +import static com.google.udmi.util.GeneralUtils.getTimestamp; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.GeneralUtils.ifNotTrueThen; import static com.google.udmi.util.GeneralUtils.isTrue; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static java.lang.String.format; import static java.util.Objects.requireNonNullElse; import static java.util.Optional.ofNullable; @@ -50,7 +51,7 @@ public class SystemManager extends ManagerBase { private static final String DEFAULT_MODEL = "pubber"; private static final String DEFAULT_SOFTWARE_KEY = "firmware"; private static final String DEFAULT_SOFTWARE_VALUE = "v1"; - private static final Date DEVICE_START_TIME = Pubber.deviceStartTime; + private static final Date DEVICE_START_TIME = Pubber.DEVICE_START_TIME; private static final Map EXIT_CODE_MAP = ImmutableMap.of( SystemMode.SHUTDOWN, 0, // Indicates expected clean shutdown (success). SystemMode.RESTART, 192, // Indicate process to be explicitly restarted. @@ -93,7 +94,7 @@ public SystemManager(ManagerHost host, PubberConfiguration configuration) { super(host, configuration); this.host = host; - info("Device start time is " + getTimestamp(DEVICE_START_TIME)); + info("Device start time is " + isoConvert(DEVICE_START_TIME)); systemState = new ExtraSystemState(); systemState.operation = new StateSystemOperation(); @@ -168,12 +169,12 @@ void maybeRestartSystem() { if (configLastStart != null) { if (DEVICE_START_TIME.before(configLastStart)) { error(format("Device start time %s before last config start %s, terminating.", - getTimestamp(DEVICE_START_TIME), getTimestamp(configLastStart))); + isoConvert(DEVICE_START_TIME), isoConvert(configLastStart))); systemLifecycle(SystemMode.TERMINATE); } else if (isTrue(options.smokeCheck) && CleanDateFormat.dateEquals(DEVICE_START_TIME, configLastStart)) { error(format("Device start time %s matches, smoke check indicating success!", - getTimestamp(configLastStart))); + isoConvert(configLastStart))); systemLifecycle(SystemMode.SHUTDOWN); } } @@ -271,8 +272,8 @@ String getTestingTag() { void localLog(Entry entry) { String message = format("Log %s%s %s %s %s%s", Level.fromValue(entry.level).name(), shouldLogLevel(entry.level) ? "" : "*", - entry.category, entry.message, getTimestamp(entry.timestamp), getTestingTag()); - localLog(message, Level.fromValue(entry.level), getTimestamp(entry.timestamp), null); + entry.category, entry.message, isoConvert(entry.timestamp), getTestingTag()); + localLog(message, Level.fromValue(entry.level), isoConvert(entry.timestamp), null); } static void localLog(String message, Level level, String timestamp, String detail) { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/DynamicIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/DynamicIotAccessProvider.java index 413e578d49..d71f5ddf84 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/DynamicIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/DynamicIotAccessProvider.java @@ -3,7 +3,7 @@ import static com.google.bos.udmi.service.pod.UdmiServicePod.getComponent; import static com.google.udmi.util.GeneralUtils.ifTrueThen; import static com.google.udmi.util.GeneralUtils.sortedMapCollector; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.Optional.ofNullable; @@ -83,7 +83,7 @@ private String registryPriority(String registryId, Entry int providerIndex = providerList.size() - providerList.indexOf(provider.getKey()); String provisionedAt = ofNullable( provider.getValue().fetchRegistryMetadata(registryId, "udmi_provisioned")).orElse( - getTimestamp(new Date(providerIndex * INDEX_ORDERING_MULTIPLIER_MS))); + isoConvert(new Date(providerIndex * INDEX_ORDERING_MULTIPLIER_MS))); debug(format("Registry %s provider %s provisioned %s", registryId, provider.getKey(), provisionedAt)); return provisionedAt; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java b/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java index e878f7c3cb..2715a01cb7 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java @@ -3,7 +3,7 @@ import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.safeSleep; import static com.google.udmi.util.JsonUtil.toMap; import static java.lang.String.format; @@ -289,7 +289,7 @@ public final void sendCommand(String registryId, String deviceId, SubFolder fold error("Exception sending command to %s: %s", backoffKey, friendlyStackTrace(e)); ifNotNullThen(registryBackoffInhibit(registryId, deviceId), until -> debug("Setting registry backoff for %s until %s", - backoffKey, getTimestamp(until))); + backoffKey, isoConvert(until))); } } else { debug("Dropping message because registry backoff for %s", backoffKey); diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/ProcessorBase.java b/udmis/src/main/java/com/google/bos/udmi/service/core/ProcessorBase.java index ced9218ae5..a9c2ca6f50 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/ProcessorBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/ProcessorBase.java @@ -20,7 +20,7 @@ import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.JsonUtil.asMap; import static com.google.udmi.util.JsonUtil.getDate; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.stringify; import static com.google.udmi.util.JsonUtil.toStringMap; import static java.lang.String.format; @@ -130,7 +130,7 @@ protected void processConfigChange(Envelope envelope, Map payloa useAttributes.subFolder = UPDATE; checkState(useAttributes.subType == SubType.CONFIG); debug("Acknowledging config/%s %s %s", subFolder, useAttributes.transactionId, - getTimestamp(newLastStart)); + isoConvert(newLastStart)); reflectMessage(useAttributes, configUpdate); } @@ -156,7 +156,7 @@ protected void reflectError(SubType subType, BundleException bundleException) { errorMessage.error = (String) bundle.message; errorMessage.data = encodeBase64(bundle.payload); errorMessage.version = UdmiServicePod.getDeployedConfig().udmi_version; - errorMessage.timestamp = getTimestamp(); + errorMessage.timestamp = isoConvert(); errorMap.put("payload", encodeBase64(stringify(errorMessage))); error(format("Reflecting error %s/%s for %s", errorMap.get(SUBTYPE_PROPERTY_KEY), errorMap.get(SUBFOLDER_PROPERTY_KEY), @@ -257,7 +257,7 @@ private String updateConfig(String previous, Envelope attributes, payload.put(attributes.subFolder.value(), updatePayload); } - String updateTimestamp = getTimestamp(); + String updateTimestamp = isoConvert(); payload.put(TIMESTAMP_KEY, updateTimestamp); payload.put(VERSION_KEY, UDMI_VERSION); @@ -274,8 +274,8 @@ private String updateWithLastStart(Map oldPayload, Date newLastS Date oldLastStart = getDate((String) oldOperation.get("last_start")); boolean shouldUpdate = oldLastStart == null || oldLastStart.before(newLastStart); - debug("Last start was %s, now %s, updating %s", getTimestamp(oldLastStart), - getTimestamp(newLastStart), shouldUpdate); + debug("Last start was %s, now %s, updating %s", isoConvert(oldLastStart), + isoConvert(newLastStart), shouldUpdate); if (!shouldUpdate) { return null; } @@ -360,7 +360,7 @@ void updateLastStart(Envelope envelope, StateUpdate message) { String serialNo = message.system.serial_no; Date newLastStart = message.system.operation.last_start; debug("Checking config last_start for %s/%s sn:%s against state last_start %s", - envelope.deviceRegistryId, envelope.deviceId, serialNo, getTimestamp(newLastStart)); + envelope.deviceRegistryId, envelope.deviceId, serialNo, isoConvert(newLastStart)); processConfigChange(envelope, new HashMap<>(), newLastStart); } catch (Exception e) { debug("Could not process config last_state update, skipping: " diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/ReflectProcessor.java b/udmis/src/main/java/com/google/bos/udmi/service/core/ReflectProcessor.java index 03043e912e..f53fbb1d3b 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/ReflectProcessor.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/ReflectProcessor.java @@ -19,7 +19,7 @@ import static com.google.udmi.util.JsonUtil.convertTo; import static com.google.udmi.util.JsonUtil.convertToStrict; import static com.google.udmi.util.JsonUtil.fromString; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.stringify; import static com.google.udmi.util.JsonUtil.stringifyTerse; import static com.google.udmi.util.JsonUtil.toMap; @@ -82,7 +82,7 @@ private Boolean checkConfigAckTime(Envelope attributes, StateUpdate stateUpdate) CloudModel cloudModel = iotAccess.fetchDevice(attributes.deviceRegistryId, attributes.deviceId); Date lastConfigAck = cleanDate(cloudModel.last_config_ack); Date lastConfig = cleanDate(ifNotNullGet(stateUpdate.system, system -> system.last_config)); - debug("Check last config ack %s >= %s", getTimestamp(lastConfigAck), getTimestamp(lastConfig)); + debug("Check last config ack %s >= %s", isoConvert(lastConfigAck), isoConvert(lastConfig)); if (lastConfig == null || lastConfigAck == null) { return false; } @@ -140,7 +140,7 @@ private void processException(Envelope reflection, Map objectMap private void processReflection(Envelope reflection, Envelope envelope, Map payload) { debug("Processing reflection %s/%s %s %s", envelope.subType, envelope.subFolder, - getTimestamp(envelope.publishTime), envelope.transactionId); + isoConvert(envelope.publishTime), envelope.transactionId); CloudModel result = getReflectionResult(envelope, payload); ifNotNullThen(result, v -> debug("Reflection result %s: %s", envelope.transactionId, envelope.subType)); @@ -222,7 +222,7 @@ private void reflectStateHandler(Envelope envelope, UdmiState toolState) { Map configMap = new HashMap<>(); configMap.put(SubFolder.UDMI.value(), udmiConfig); - configMap.put(TIMESTAMP_KEY, getTimestamp()); + configMap.put(TIMESTAMP_KEY, isoConvert()); String contents = stringifyTerse(configMap); debug("Setting reflector config %s %s: %s", registryId, deviceId, contents); iotAccess.modifyConfig(registryId, deviceId, previous -> contents); diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java index 0dc24a86d5..d64ff871df 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java @@ -7,7 +7,7 @@ import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.GeneralUtils.ifNullThen; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.stringify; import static com.google.udmi.util.JsonUtil.toMap; import static java.lang.String.format; @@ -161,7 +161,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer reply) { reply.ack(); String messageId = message.getMessageId(); attributesMap.computeIfAbsent("publishTime", - key -> getTimestamp(ofEpochSecond(message.getPublishTime().getSeconds()))); + key -> isoConvert(ofEpochSecond(message.getPublishTime().getSeconds()))); attributesMap.computeIfAbsent(Common.TRANSACTION_KEY, key -> PS_TXN_PREFIX + messageId); receiveMessage(attributesMap, message.getData().toStringUtf8()); Instant end = Instant.now(); diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java index 45da361165..622a14fe27 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java @@ -1,13 +1,12 @@ package com.google.bos.udmi.service.messaging.impl; -import static com.google.udmi.util.Common.DEVICE_ID_KEY; import static com.google.udmi.util.Common.PUBLISH_TIME_KEY; import static com.google.udmi.util.GeneralUtils.decodeBase64; import static com.google.udmi.util.GeneralUtils.encodeBase64; -import static com.google.udmi.util.GeneralUtils.ifNotNullGet; +import static com.google.udmi.util.GeneralUtils.getTimestamp; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.JsonUtil.asMap; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.stringify; import static com.google.udmi.util.JsonUtil.writeFile; import static java.lang.String.format; @@ -15,7 +14,7 @@ import com.google.bos.udmi.service.messaging.MessagePipe; import com.google.common.collect.ImmutableMap; -import com.google.udmi.util.Common; +import com.google.udmi.util.GeneralUtils; import com.google.udmi.util.JsonUtil; import java.io.File; import java.util.HashMap; @@ -23,11 +22,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import org.jetbrains.annotations.Nullable; import udmi.schema.EndpointConfiguration; import udmi.schema.Envelope; -import udmi.schema.Envelope.SubFolder; -import udmi.schema.Envelope.SubType; /** * Basic file message pipe that reads from simple json files encoded with type/folder in the @@ -96,7 +92,7 @@ public void publish(Bundle bundle) { } Envelope envelope = bundle.envelope; String publishTime = - envelope.publishTime == null ? getTimestamp() : getTimestamp(envelope.publishTime); + envelope.publishTime == null ? getTimestamp() : isoConvert(envelope.publishTime); int endMark = publishTime.lastIndexOf("."); String useTime = publishTime.substring(0, endMark >= 0 ? endMark : publishTime.length()); String timePath = useTime.replaceAll("[T:Z]", "/"); diff --git a/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java b/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java index e3a087e4bc..13916ad150 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java @@ -162,7 +162,7 @@ private String getSimpleName() { private void output(Level level, String message) { PrintStream printStream = level.value() >= Level.WARNING.value() ? System.err : System.out; - printStream.printf("%s %s %s: %s %s%n", JsonUtil.getTimestamp(), getExecutionContext(), + printStream.printf("%s %s %s: %s %s%n", JsonUtil.isoConvert(), getExecutionContext(), level.name().charAt(0), getSimpleName(), message); printStream.flush(); } diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java index 351e0f088c..b9e31df243 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java @@ -13,11 +13,12 @@ import static com.google.udmi.util.Common.TRANSACTION_KEY; import static com.google.udmi.util.Common.VERSION_KEY; import static com.google.udmi.util.Common.getNamespacePrefix; +import static com.google.udmi.util.GeneralUtils.getTimestamp; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.JsonUtil.asMap; import static com.google.udmi.util.JsonUtil.convertTo; import static com.google.udmi.util.JsonUtil.getDate; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.stringify; import static com.google.udmi.util.JsonUtil.toMap; import static java.lang.String.format; @@ -194,7 +195,7 @@ private void initializeReflectorState() { try { reflectorStateTimestamp = new Date(); System.err.printf("Setting state version %s timestamp %s%n", - udmiVersion, getTimestamp(reflectorStateTimestamp)); + udmiVersion, isoConvert(reflectorStateTimestamp)); setReflectorState(udmiState); } catch (Exception e) { throw new RuntimeException("Could not set reflector state", e); @@ -243,7 +244,7 @@ private void messageHandler(String topic, String payload) { } catch (Exception e) { if (isInstallValid) { handleReceivedMessage(extractAttributes(messageMap), - new ErrorContainer(e, payload, JsonUtil.getTimestamp())); + new ErrorContainer(e, payload, getTimestamp())); } else { throw e; } @@ -306,8 +307,8 @@ private boolean ensureCloudSync(Map message) { } System.err.println("UDMI received reflectorConfig: " + stringify(reflectorConfig)); Date lastState = reflectorConfig.last_state; - System.err.println("UDMI matching against expected state timestamp " + getTimestamp( - reflectorStateTimestamp)); + System.err.println("UDMI matching against expected state timestamp " + + isoConvert(reflectorStateTimestamp)); udmiInfo = reflectorConfig.setup; boolean timestampMatch = dateEquals(lastState, reflectorStateTimestamp); @@ -337,7 +338,7 @@ private boolean ensureCloudSync(Map message) { } else if (!versionMatch) { System.err.println("UDMI update version mismatch... waiting for retry..."); } else { - System.err.println("UDMI ignoring mismatching timestamp " + getTimestamp(lastState)); + System.err.println("UDMI ignoring mismatching timestamp " + isoConvert(lastState)); } } catch (Exception e) { syncFailure = e; diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java b/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java index f76e14ea3b..23a96acd83 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/MockPublisher.java @@ -1,9 +1,11 @@ package com.google.bos.iot.core.proxy; +import static com.google.udmi.util.GeneralUtils.getTimestamp; import static java.lang.String.format; import com.google.daq.mqtt.util.MessagePublisher; import com.google.daq.mqtt.validator.Validator; +import com.google.udmi.util.GeneralUtils; import com.google.udmi.util.JsonUtil; import com.google.udmi.util.SiteModel; import java.util.HashMap; @@ -29,7 +31,7 @@ public String publish(String deviceId, String topic, String message) { Validator.MessageBundle bundle = new Validator.MessageBundle(); bundle.message = JsonUtil.asMap(message); bundle.attributes = new HashMap<>(); - bundle.timestamp = JsonUtil.getTimestamp(); + bundle.timestamp = getTimestamp(); messages.put(bundle); return format("%08x", System.currentTimeMillis()); } catch (Exception e) { diff --git a/validator/src/main/java/com/google/daq/mqtt/mapping/MappingAgent.java b/validator/src/main/java/com/google/daq/mqtt/mapping/MappingAgent.java index 22b36c8bf6..f999bd05ac 100644 --- a/validator/src/main/java/com/google/daq/mqtt/mapping/MappingAgent.java +++ b/validator/src/main/java/com/google/daq/mqtt/mapping/MappingAgent.java @@ -1,5 +1,7 @@ package com.google.daq.mqtt.mapping; +import static com.google.udmi.util.JsonUtil.isoConvert; + import com.google.common.collect.ImmutableList; import com.google.daq.mqtt.util.MessageHandler; import com.google.daq.mqtt.util.MessageHandler.HandlerSpecification; @@ -72,7 +74,7 @@ private void processFamilyState(String family, FamilyDiscoveryState state) { FamilyDiscoveryState previous = familyStates.put(family, state); if (previous == null || !Objects.equals(previous.generation, state.generation)) { System.err.printf("Received family %s generation %s active %s%n", family, - JsonUtil.getTimestamp(state.generation), state.active); + isoConvert(state.generation), state.active); } } diff --git a/validator/src/main/java/com/google/daq/mqtt/mapping/MappingEngine.java b/validator/src/main/java/com/google/daq/mqtt/mapping/MappingEngine.java index 484b0cb9d6..235f1c7ad0 100644 --- a/validator/src/main/java/com/google/daq/mqtt/mapping/MappingEngine.java +++ b/validator/src/main/java/com/google/daq/mqtt/mapping/MappingEngine.java @@ -1,5 +1,7 @@ package com.google.daq.mqtt.mapping; +import static com.google.udmi.util.JsonUtil.isoConvert; + import com.google.common.collect.ImmutableList; import com.google.daq.mqtt.util.MessageHandler; import com.google.daq.mqtt.util.MessageHandler.HandlerSpecification; @@ -59,7 +61,7 @@ private void mappingConfigHandler(Envelope envelope, MappingConfig mappingConfig private void discoveryEventHandler(Envelope envelope, DiscoveryEvent message) { String deviceId = message.scan_addr; System.err.printf("Processing device %s generation %s%n", deviceId, - JsonUtil.getTimestamp(message.generation)); + isoConvert(message.generation)); getDeviceState(deviceId).discovered = message.timestamp; updateTranslation(deviceId, message.uniqs); @@ -78,8 +80,8 @@ private void updateTranslation(String deviceId, Map(); final MappingEventEntity entity = new MappingEventEntity(); entity.translation = uniqs.entrySet() - .stream().map(this::makeTranslation).collect(Collectors.toMap(SimpleEntry::getKey, - SimpleEntry::getValue, (existing, replacement) -> replacement, HashMap::new)); + .stream().map(this::makeTranslation).collect(Collectors.toMap(SimpleEntry::getKey, + SimpleEntry::getValue, (existing, replacement) -> replacement, HashMap::new)); result.entities.put(deviceGuid(deviceId), entity); result.timestamp = new Date(); publishMessage(deviceId, result); diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java index 6e82c600bc..5a410c32f2 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java @@ -16,6 +16,7 @@ import static com.google.udmi.util.Common.TIMESTAMP_KEY; import static com.google.udmi.util.GeneralUtils.changedLines; import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; +import static com.google.udmi.util.GeneralUtils.getTimestamp; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.GeneralUtils.ifNullThen; @@ -24,7 +25,7 @@ import static com.google.udmi.util.GeneralUtils.isTrue; import static com.google.udmi.util.GeneralUtils.stackTraceString; import static com.google.udmi.util.JsonUtil.convertTo; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.loadFileRequired; import static com.google.udmi.util.JsonUtil.safeSleep; import static com.google.udmi.util.JsonUtil.stringify; @@ -607,7 +608,7 @@ public void setExtraField(String extraField) { */ public void setLastStart(Date use) { boolean changed = !stringify(deviceConfig.system.operation.last_start).equals(stringify(use)); - debug("last_start changed " + changed + ", last_start " + getTimestamp(use)); + debug("last_start changed " + changed + ", last_start " + isoConvert(use)); deviceConfig.system.operation.last_start = use; } @@ -647,7 +648,7 @@ private void resetDeviceConfig(boolean clean) { deviceConfig.system.min_loglevel = Level.INFO.value(); Date resetDate = ofNullable(catchToNull(() -> deviceState.system.operation.last_start)) .orElse(RESET_LAST_START); - debug("Configuring device last_start to be " + getTimestamp(resetDate)); + debug("Configuring device last_start to be " + isoConvert(resetDate)); setLastStart(SemanticDate.describe("device reported", resetDate)); setExtraField(null); } @@ -1005,7 +1006,7 @@ private void logSystemEvent(String messageBase, Map message) { } private String entryMessage(Entry logEntry) { - return format("%s %s %s: %s", getTimestamp(logEntry.timestamp), + return format("%s %s %s: %s", isoConvert(logEntry.timestamp), Level.fromValue(logEntry.level).name(), logEntry.category, logEntry.message); } @@ -1026,7 +1027,7 @@ private String writeLogEntry(Entry logEntry, PrintWriter printWriter) { if (logEntry.timestamp == null) { throw new RuntimeException("log entry timestamp is null"); } - String messageStr = format("%s %s %s", getTimestamp(logEntry.timestamp), + String messageStr = format("%s %s %s", isoConvert(logEntry.timestamp), Level.fromValue(logEntry.level), logEntry.message); printWriter.println(messageStr); @@ -1136,7 +1137,7 @@ private void captureConfigChange(String reason) { try { String suffix = reason == null ? "" : (" " + reason); String header = format("Update config%s: ", suffix); - debug(header + getTimestamp(deviceConfig.timestamp)); + debug(header + isoConvert(deviceConfig.timestamp)); recordRawMessage(deviceConfig, LOCAL_CONFIG_UPDATE); List allDiffs = SENT_CONFIG_DIFFERNATOR.computeChanges(deviceConfig); List filteredDiffs = filterTesting(allDiffs); @@ -1280,8 +1281,8 @@ protected void checkNotLogged(String category, Level minLevel) { entry -> category.equals(entry.category) && entry.level >= minLevel.value()); checkThat(format("log category `%s` level `%s` not logged", category, minLevel), () -> { if (!entries.isEmpty()) { - warning(format("Filtered config between %s and %s", getTimestamp(lastConfigUpdate), - getTimestamp(endTime))); + warning(format("Filtered config between %s and %s", isoConvert(lastConfigUpdate), + isoConvert(endTime))); entries.forEach(entry -> error("undesirable " + entryMessage(entry))); } return entries.isEmpty(); @@ -1609,14 +1610,14 @@ private synchronized void handleUpdateMessage(String subTypeRaw, return; } List changes = updateDeviceConfig(config); - debug(format("Updated config %s %s", getTimestamp(config.timestamp), txnId)); + debug(format("Updated config %s %s", isoConvert(config.timestamp), txnId)); if (updateCount == CAPABILITY_SCORE) { info(format("Initial config #%03d", updateCount), stringify(deviceConfig)); } else { info(format("Updated config #%03d", updateCount), changedLines(changes)); } } else if (converted instanceof State convertedState) { - String timestamp = getTimestamp(convertedState.timestamp); + String timestamp = isoConvert(convertedState.timestamp); if (convertedState.timestamp == null) { warning("No timestamp in state message, rejecting."); return; @@ -1626,10 +1627,10 @@ private synchronized void handleUpdateMessage(String subTypeRaw, return; } if (deviceState == null && convertedState.timestamp.before(stateCutoffThreshold)) { - String lastStart = getTimestamp( + String lastStart = isoConvert( catchToNull(() -> convertedState.system.operation.last_start)); warning(format("Ignoring stale state update %s %s %s %s", timestamp, - getTimestamp(stateCutoffThreshold), lastStart, txnId)); + isoConvert(stateCutoffThreshold), lastStart, txnId)); return; } checkState(deviceSupportsState(), "Received state update with no-state device"); @@ -1646,8 +1647,8 @@ private synchronized void handleUpdateMessage(String subTypeRaw, deviceState = convertedState; validSerialNo(); debug(format("Updated state has last_config %s (expecting %s)", - getTimestamp((Date) ifNotNullGet(deviceState.system, x -> x.last_config)), - getTimestamp((Date) ifNotNullGet(deviceConfig, config -> config.timestamp)))); + isoConvert((Date) ifNotNullGet(deviceState.system, x -> x.last_config)), + isoConvert((Date) ifNotNullGet(deviceConfig, config -> config.timestamp)))); } else { error("Unknown update type " + converted.getClass().getSimpleName()); } @@ -1717,7 +1718,7 @@ private boolean configIsPending(boolean debugOut) { boolean pending = !(synced && configTransactions.isEmpty()); if (debugOut && pending) { notice(format("last_start synchronized %s: state/%s =? config/%s", synced, - getTimestamp(stateLast), getTimestamp(configLast))); + isoConvert(stateLast), isoConvert(configLast))); notice(format("pending configTransactions: %s", configTransactionsListString())); } return pending; diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ConfigSequences.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ConfigSequences.java index 38d232d2bc..01de95408a 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ConfigSequences.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ConfigSequences.java @@ -7,7 +7,7 @@ import static com.google.daq.mqtt.util.TimePeriodConstants.THREE_MINUTES_MS; import static com.google.daq.mqtt.util.TimePeriodConstants.TWO_MINUTES_MS; import static com.google.udmi.util.CleanDateFormat.dateEquals; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.safeSleep; import static java.lang.String.format; import static org.junit.Assert.assertEquals; @@ -123,10 +123,10 @@ public void broken_config() { deviceConfig.system.min_loglevel = Level.DEBUG.value(); updateConfig("starting broken_config"); Date stableConfig = deviceConfig.timestamp; - info("initial stable_config " + getTimestamp(stableConfig)); + info("initial stable_config " + isoConvert(stableConfig)); untilTrue("initial state synchronized", () -> dateEquals(stableConfig, deviceState.system.last_config)); - info("initial last_config " + getTimestamp(deviceState.system.last_config)); + info("initial last_config " + isoConvert(deviceState.system.last_config)); checkThat("initial stable_config matches last_config", () -> dateEquals(stableConfig, deviceState.system.last_config)); @@ -140,8 +140,8 @@ public void broken_config() { debug("Error detail: " + stateStatus.detail); assertEquals(SYSTEM_CONFIG_PARSE, stateStatus.category); assertEquals(Level.ERROR.value(), (int) stateStatus.level); - info("following stable_config " + getTimestamp(stableConfig)); - info("following last_config " + getTimestamp(deviceState.system.last_config)); + info("following stable_config " + isoConvert(stableConfig)); + info("following last_config " + isoConvert(deviceState.system.last_config)); // The last_config should not be updated to not reflect the broken config. assertTrue("following stable_config matches last_config", dateEquals(stableConfig, deviceState.system.last_config)); diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/DiscoverySequences.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/DiscoverySequences.java index 49a14e7fa8..a8ec6b6f20 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/DiscoverySequences.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/DiscoverySequences.java @@ -2,6 +2,7 @@ import static com.google.daq.mqtt.util.TimePeriodConstants.TWO_MINUTES_MS; import static com.google.udmi.util.GeneralUtils.CSV_JOINER; +import static com.google.udmi.util.JsonUtil.isoConvert; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.junit.Assert.assertEquals; @@ -25,7 +26,6 @@ import com.google.daq.mqtt.sequencer.Summary; import com.google.daq.mqtt.sequencer.semantic.SemanticDate; import com.google.udmi.util.CleanDateFormat; -import com.google.udmi.util.JsonUtil; import java.time.Instant; import java.util.Date; import java.util.HashMap; @@ -37,7 +37,6 @@ import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; -import org.junit.AssumptionViolatedException; import org.junit.Test; import udmi.schema.Bucket; import udmi.schema.DiscoveryConfig; @@ -68,7 +67,7 @@ private DiscoveryEvent runEnumeration(Enumerate enumerate) { Date startTime = SemanticDate.describe("generation start time", CleanDateFormat.cleanDate()); deviceConfig.discovery.generation = startTime; - info("Starting empty enumeration at " + JsonUtil.getTimestamp(startTime)); + info("Starting empty enumeration at " + isoConvert(startTime)); untilTrue("matching enumeration generation", () -> deviceState.discovery.generation.equals(startTime)); @@ -81,7 +80,7 @@ private DiscoveryEvent runEnumeration(Enumerate enumerate) { .collect(Collectors.toList()); assertEquals("a single discovery event received", 1, enumEvents.size()); DiscoveryEvent event = enumEvents.get(0); - info("Received discovery generation " + JsonUtil.getTimestamp(event.generation)); + info("Received discovery generation " + isoConvert(event.generation)); assertEquals("matching event generation", startTime, event.generation); return event; } diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/PointsetSequences.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/PointsetSequences.java index d0eee6365a..51484a0840 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/PointsetSequences.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/PointsetSequences.java @@ -52,7 +52,8 @@ private void untilPointsetSanity() { untilTrue("pointset event contains correct points with present_value", () -> { List pointsetEvents = popReceivedEvents(PointsetEvent.class); - return pointsetEvents.get(pointsetEvents.size() - 1).points.entrySet().stream() + return !pointsetEvents.isEmpty() + && pointsetEvents.get(pointsetEvents.size() - 1).points.entrySet().stream() .filter(this::validPointEntry).map(Entry::getKey).collect(Collectors.toSet()) .equals(deviceConfig.pointset.points.keySet()); } diff --git a/validator/src/main/java/com/google/daq/mqtt/util/IotCoreProvider.java b/validator/src/main/java/com/google/daq/mqtt/util/IotCoreProvider.java index ae9e740905..ab5e845d38 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/IotCoreProvider.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/IotCoreProvider.java @@ -1,6 +1,6 @@ package com.google.daq.mqtt.util; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static java.lang.Boolean.TRUE; import static java.util.Optional.ofNullable; import static udmi.schema.CloudModel.Resource_type.DEVICE; @@ -174,7 +174,7 @@ static Device convert(CloudModel device) { .map(credentials -> credentials.stream() .map(IotCoreProvider::convert).collect(Collectors.toList())).orElse(null); BigInteger numId = device.num_id == null ? null : new BigInteger(device.num_id); - String timestamp = device.last_event_time == null ? null : getTimestamp(device.last_event_time); + String timestamp = device.last_event_time == null ? null : isoConvert(device.last_event_time); return new Device() .setNumId(numId) .setBlocked(device.blocked) diff --git a/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java b/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java index 22270355e4..c44774cc78 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java @@ -1,5 +1,7 @@ package com.google.daq.mqtt.util; +import static com.google.udmi.util.JsonUtil.isoConvert; + import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.daq.mqtt.sequencer.semantic.SemanticValue; @@ -170,7 +172,7 @@ private String semanticValue(Object value) { String wrapper = isSemantic ? "_" : "`"; if (value instanceof Date && !isSemantic) { if (ignoreSemantics) { - return wrapper + JsonUtil.getTimestamp((Date) value) + wrapper; + return wrapper + isoConvert((Date) value) + wrapper; } else { throw new IllegalArgumentException( "Unexpected non-semantic Date in semantic value calculation"); diff --git a/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java b/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java index eccc8f3762..ce5d271dab 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java @@ -4,7 +4,8 @@ import static com.google.bos.iot.core.proxy.IotReflectorClient.UDMI_FOLDER; import static com.google.udmi.util.Common.PUBLISH_TIME_KEY; import static com.google.udmi.util.GeneralUtils.encodeBase64; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.GeneralUtils.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.stringify; import static java.time.Instant.ofEpochSecond; @@ -16,7 +17,6 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.rpc.NotFoundException; import com.google.bos.iot.core.proxy.IotReflectorClient; -import com.google.bos.iot.core.proxy.MqttPublisher; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Publisher; @@ -45,11 +45,9 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Consumer; import udmi.schema.Envelope; import udmi.schema.Envelope.SubFolder; import udmi.schema.Envelope.SubType; -import udmi.schema.ExecutionConfiguration; import udmi.schema.SetupUdmiConfig; import udmi.schema.SystemState; @@ -217,12 +215,12 @@ public Validator.MessageBundle takeNextMessage(QuerySpeed speed) { Map dataMap = OBJECT_MAPPER.readValue(data, TreeMap.class); asMap = dataMap; } catch (JsonProcessingException e) { - asMap = new ErrorContainer(e, getSubscriptionId(), JsonUtil.getTimestamp()); + asMap = new ErrorContainer(e, getSubscriptionId(), getTimestamp()); } HashMap attributes = new HashMap<>(message.getAttributesMap()); attributes.computeIfAbsent(PUBLISH_TIME_KEY, - key -> getTimestamp(ofEpochSecond(message.getPublishTime().getSeconds()))); + key -> isoConvert(ofEpochSecond(message.getPublishTime().getSeconds()))); attributes.put(WAS_BASE_64, "" + base64); MessageBundle bundle = new MessageBundle(); diff --git a/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java b/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java index 510f42c1ca..1fe896f1b0 100644 --- a/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java +++ b/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java @@ -26,7 +26,6 @@ import static com.google.udmi.util.JsonUtil.JSON_SUFFIX; import static com.google.udmi.util.JsonUtil.OBJECT_MAPPER; import static com.google.udmi.util.JsonUtil.getInstant; -import static com.google.udmi.util.JsonUtil.getTimestamp; import static java.lang.String.format; import static java.util.Optional.ofNullable; diff --git a/validator/src/main/java/com/google/udmi/util/PubSubReflector.java b/validator/src/main/java/com/google/udmi/util/PubSubReflector.java index fed6fa1d8f..fb5cfeb1aa 100644 --- a/validator/src/main/java/com/google/udmi/util/PubSubReflector.java +++ b/validator/src/main/java/com/google/udmi/util/PubSubReflector.java @@ -8,7 +8,7 @@ import static com.google.udmi.util.Common.PUBLISH_TIME_KEY; import static com.google.udmi.util.Common.SUBFOLDER_PROPERTY_KEY; import static com.google.udmi.util.Common.getNamespacePrefix; -import static com.google.udmi.util.JsonUtil.getTimestamp; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.stringify; import static com.google.udmi.util.JsonUtil.toStringMap; import static java.lang.String.format; @@ -204,12 +204,12 @@ private MessageBundle processMessage(PubsubMessage message) { Map dataMap = OBJECT_MAPPER.readValue(data, TreeMap.class); asMap = dataMap; } catch (JsonProcessingException e) { - asMap = new ErrorContainer(e, getSubscriptionId(), JsonUtil.getTimestamp()); + asMap = new ErrorContainer(e, getSubscriptionId(), GeneralUtils.getTimestamp()); } HashMap attributes = new HashMap<>(message.getAttributesMap()); attributes.computeIfAbsent(PUBLISH_TIME_KEY, - key -> getTimestamp(ofEpochSecond(message.getPublishTime().getSeconds()))); + key -> isoConvert(ofEpochSecond(message.getPublishTime().getSeconds()))); attributes.put(WAS_BASE_64, "" + base64); MessageBundle bundle = new MessageBundle(); diff --git a/validator/src/test/java/com/google/daq/mqtt/validator/BasicTest.java b/validator/src/test/java/com/google/daq/mqtt/validator/BasicTest.java index 8126ec38ed..84532aca93 100644 --- a/validator/src/test/java/com/google/daq/mqtt/validator/BasicTest.java +++ b/validator/src/test/java/com/google/daq/mqtt/validator/BasicTest.java @@ -2,10 +2,8 @@ import static com.google.udmi.util.Common.TIMESTAMP_KEY; import static com.google.udmi.util.JsonUtil.getInstant; -import static com.google.udmi.util.JsonUtil.getTimestamp; -import static com.google.udmi.util.JsonUtil.safeSleep; +import static com.google.udmi.util.JsonUtil.isoConvert; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static udmi.schema.Level.INFO; @@ -20,7 +18,6 @@ import udmi.schema.Config; import udmi.schema.DeviceValidationEvent; import udmi.schema.DiscoveryEvent; -import udmi.schema.Level; import udmi.schema.PointPointsetEvent; import udmi.schema.PointsetEvent; import udmi.schema.PointsetState; @@ -78,8 +75,8 @@ public void validPointsetEvent() { DeviceValidationEvent deviceValidationEvent = report.devices.get(TestCommon.DEVICE_ID); assertEquals("report status level", (Object) INFO.value(), deviceValidationEvent.status.level); - String expected = getTimestamp(messageObject.timestamp); - String lastSeen = getTimestamp(deviceValidationEvent.last_seen); + String expected = isoConvert(messageObject.timestamp); + String lastSeen = isoConvert(deviceValidationEvent.last_seen); assertEquals("status last_seen", expected, lastSeen); } @@ -147,7 +144,7 @@ public void lastSeenUpdate() { DeviceValidationEvent deviceValidationEvent = report.devices.get(TestCommon.DEVICE_ID); Date lastSeen = deviceValidationEvent.last_seen; Instant parse = getInstant((String) eventBundle.message.get(TIMESTAMP_KEY)); - assertEquals("device last seen", getTimestamp(Date.from(parse)), getTimestamp(lastSeen)); + assertEquals("device last seen", isoConvert(Date.from(parse)), isoConvert(lastSeen)); } } diff --git a/validator/src/test/java/com/google/daq/mqtt/validator/TestBase.java b/validator/src/test/java/com/google/daq/mqtt/validator/TestBase.java index a9eb3d921c..1e875b1cfa 100644 --- a/validator/src/test/java/com/google/daq/mqtt/validator/TestBase.java +++ b/validator/src/test/java/com/google/daq/mqtt/validator/TestBase.java @@ -3,6 +3,7 @@ import static com.google.daq.mqtt.util.FileDataSink.REPORT_JSON_FILENAME; import static com.google.udmi.util.Common.PUBLISH_TIME_KEY; import static com.google.udmi.util.JsonUtil.getDate; +import static com.google.udmi.util.JsonUtil.isoConvert; import com.google.daq.mqtt.TestCommon; import com.google.daq.mqtt.validator.Validator.MessageBundle; @@ -122,6 +123,6 @@ private Map messageAttributes(String subType, String subFolder) @NotNull private String getTestTimestamp() { - return JsonUtil.getTimestamp(new Date(testTime.get())); + return isoConvert(new Date(testTime.get())); } }