diff --git a/pubber/config/checkstyle/checkstyle.xml b/etc/checkstyle.xml
similarity index 99%
rename from pubber/config/checkstyle/checkstyle.xml
rename to etc/checkstyle.xml
index df33c6b25e..f3f871c5b3 100644
--- a/pubber/config/checkstyle/checkstyle.xml
+++ b/etc/checkstyle.xml
@@ -48,6 +48,7 @@
+
diff --git a/pubber/build.gradle b/pubber/build.gradle
index 97f1ee2a0c..4a6b88dfbf 100644
--- a/pubber/build.gradle
+++ b/pubber/build.gradle
@@ -55,6 +55,7 @@ jacocoTestReport {
checkstyle {
ignoreFailures = false
maxWarnings = 0
+ configFile = file('../etc/checkstyle.xml')
}
checkstyleMain.source = 'src/main/java'
diff --git a/pubber/src/main/java/daq/pubber/Pubber.java b/pubber/src/main/java/daq/pubber/Pubber.java
index db835845ca..611457dae1 100644
--- a/pubber/src/main/java/daq/pubber/Pubber.java
+++ b/pubber/src/main/java/daq/pubber/Pubber.java
@@ -29,12 +29,12 @@
import com.google.udmi.util.CertManager;
import com.google.udmi.util.SiteModel;
import com.google.udmi.util.SiteModel.MetadataException;
-import daq.pubber.PubSubClient.Bundle;
+import daq.pubber.PubberPubSubClient.Bundle;
import java.io.File;
import java.io.PrintStream;
import java.time.Duration;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -79,7 +79,7 @@ public class Pubber extends PubberManager implements PubberUdmiPublisher {
private static final String PUBSUB_SITE = "PubSub";
- private static final Map MESSAGE_COUNTS = new HashMap<>();
+ private static final Map MESSAGE_COUNTS = new ConcurrentHashMap<>();
private static final int CONNECT_RETRIES = 10;
private static final AtomicInteger retriesRemaining = new AtomicInteger(CONNECT_RETRIES);
private static final long RESTART_DELAY_MS = 1000;
@@ -94,7 +94,7 @@ public class Pubber extends PubberManager implements PubberUdmiPublisher {
private CountDownLatch configLatch;
private MqttDevice deviceTarget;
private long lastStateTimeMs;
- private PubSubClient pubSubClient;
+ private PubberPubSubClient pubSubClient;
private Function connectionDone;
private String workingEndpoint;
private String attemptedEndpoint;
@@ -136,7 +136,7 @@ public Pubber(String iotProject, String sitePath, String deviceId, String serial
checkState(outDir.mkdirs(), "could not make out dir " + outDir.getAbsolutePath());
}
if (PUBSUB_SITE.equals(sitePath)) {
- pubSubClient = new PubSubClient(iotProject, deviceId);
+ pubSubClient = new PubberPubSubClient(iotProject, deviceId);
}
}
@@ -331,11 +331,6 @@ private File getPersistentStore() {
new File(siteModel.getDeviceWorkingDir(deviceId), PERSISTENT_STORE_FILE);
}
- private void markStateDirty(Runnable action) {
- action.run();
- markStateDirty();
- }
-
@Override
public void markStateDirty(long delayMs) {
stateDirty.set(true);
@@ -379,9 +374,12 @@ private void processDeviceMetadata(Metadata metadata) {
targetSchema = ifNotNullGet(metadata.system.device_version, SchemaVersion::fromKey);
ifNotNullThen(targetSchema, version -> warn("Emulating UDMI version " + version.key()));
- config.serialNo = catchToNull(() -> metadata.system.serial_no);
-
- config.gatewayId = catchToNull(() -> metadata.gateway.gateway_id);
+ if (config.serialNo == null) {
+ config.serialNo = catchToNull(() -> metadata.system.serial_no);
+ }
+ if (config.gatewayId == null) {
+ config.gatewayId = catchToNull(() -> metadata.gateway.gateway_id);
+ }
config.algorithm = config.gatewayId == null
? catchToNull(() -> metadata.cloud.auth_type.value())
@@ -431,9 +429,11 @@ private boolean attemptConnection() {
try {
isConnected = false;
deviceManager.stop();
+ super.stop();
if (deviceTarget == null) {
throw new RuntimeException("Mqtt publisher not initialized");
}
+ registerMessageHandlers();
connect();
configLatchWait();
isConnected = true;
@@ -488,7 +488,6 @@ public void initializeMqtt() {
siteModel.getDeviceDir(targetDeviceId), endpoint.transport, keyPassword,
this::info);
deviceTarget = new MqttDevice(endpoint, this::publisherException, certManager);
- registerMessageHandlers();
publishDirtyState();
}
@@ -560,7 +559,6 @@ public String traceTimestamp(String messageBase) {
return messageBase + (isTrue(config.options.messageTrace) ? ("_" + timestamp) : "");
}
-
private void trace(String message) {
cloudLog(message, Level.TRACE);
}
@@ -712,7 +710,7 @@ public Map> getLogMap() {
return SystemManager.getLogMap().apply(LOG);
}
- public Metadata getMetadata(String id) {
- return siteModel.getMetadata(id);
+ public SiteModel getSiteModel() {
+ return siteModel;
}
}
diff --git a/pubber/src/main/java/daq/pubber/PubberDeviceManager.java b/pubber/src/main/java/daq/pubber/PubberDeviceManager.java
index b895fecd95..e5ebea35bf 100644
--- a/pubber/src/main/java/daq/pubber/PubberDeviceManager.java
+++ b/pubber/src/main/java/daq/pubber/PubberDeviceManager.java
@@ -1,12 +1,14 @@
package daq.pubber;
import com.google.udmi.util.SiteModel;
-import java.util.Date;
+import java.util.Arrays;
+import java.util.List;
import udmi.lib.client.DeviceManager;
import udmi.lib.client.DiscoveryManager;
import udmi.lib.client.GatewayManager;
import udmi.lib.client.LocalnetManager;
import udmi.lib.client.PointsetManager;
+import udmi.lib.client.SubBlockManager;
import udmi.lib.client.SystemManager;
import udmi.lib.intf.ManagerHost;
import udmi.schema.Config;
@@ -22,23 +24,28 @@ public class PubberDeviceManager extends PubberManager implements DeviceManager
private final PubberLocalnetManager localnetManager;
private final PubberGatewayManager gatewayManager;
private final PubberDiscoveryManager discoveryManager;
- private Date lastConfigTimestamp;
+ private final List subManagers;
/**
* Create a new instance.
+ * Managers are logically ordered to ensure proper initialization and shutdown.
+ * Stop/shutdown order is the reverse of the boot order.
+ * SystemManager should be created first b/c logging dependency.
+ * The remaining managers are placed in a logical boot/shutdown order.
*/
public PubberDeviceManager(ManagerHost host, PubberConfiguration configuration) {
super(host, configuration);
systemManager = new PubberSystemManager(host, configuration);
- pointsetManager = new PubberPointsetManager(host, configuration);
localnetManager = new PubberLocalnetManager(host, configuration);
- gatewayManager = new PubberGatewayManager(host, configuration);
+ pointsetManager = new PubberPointsetManager(host, configuration);
discoveryManager = new PubberDiscoveryManager(host, configuration, this);
+ gatewayManager = new PubberGatewayManager(host, configuration);
+ subManagers = Arrays.asList(
+ gatewayManager, discoveryManager, pointsetManager, localnetManager, systemManager);
}
@Override
public void updateConfig(Config config) {
- lastConfigTimestamp = config.timestamp;
DeviceManager.super.updateConfig(config);
}
@@ -72,22 +79,15 @@ public DiscoveryManager getDiscoveryManager() {
*/
@Override
public void shutdown() {
- getGatewayManager().shutdown();
- getLocalnetManager().shutdown();
- getPointsetManager().shutdown();
- getSystemManager().shutdown();
+ subManagers.forEach(SubBlockManager::shutdown);
}
-
/**
* Stop periodic senders.
*/
@Override
public void stop() {
- getGatewayManager().stop();
- getLocalnetManager().stop();
- getPointsetManager().stop();
- getSystemManager().stop();
+ subManagers.forEach(SubBlockManager::stop);
}
/**
@@ -98,5 +98,4 @@ protected void setSiteModel(SiteModel siteModel) {
gatewayManager.setSiteModel(siteModel);
localnetManager.setSiteModel(siteModel);
}
-
}
diff --git a/pubber/src/main/java/daq/pubber/PubberGatewayManager.java b/pubber/src/main/java/daq/pubber/PubberGatewayManager.java
index 302d574aea..27184a447c 100644
--- a/pubber/src/main/java/daq/pubber/PubberGatewayManager.java
+++ b/pubber/src/main/java/daq/pubber/PubberGatewayManager.java
@@ -52,7 +52,7 @@ public void activate() {
@Override
public ProxyDeviceHost makeExtraDevice() {
- return new ProxyDevice(getHost(), EXTRA_PROXY_DEVICE, config);
+ return new PubberProxyDevice(getHost(), EXTRA_PROXY_DEVICE, config);
}
/**
@@ -140,6 +140,6 @@ public Map createProxyDevices(List proxyIds) {
@Override
public ProxyDeviceHost createProxyDevice(ManagerHost host, String id) {
- return new ProxyDevice(host, id, config);
+ return new PubberProxyDevice(host, id, config);
}
}
diff --git a/pubber/src/main/java/daq/pubber/IpProvider.java b/pubber/src/main/java/daq/pubber/PubberIpProvider.java
similarity index 93%
rename from pubber/src/main/java/daq/pubber/IpProvider.java
rename to pubber/src/main/java/daq/pubber/PubberIpProvider.java
index fa8081f982..65d4567e20 100644
--- a/pubber/src/main/java/daq/pubber/IpProvider.java
+++ b/pubber/src/main/java/daq/pubber/PubberIpProvider.java
@@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -24,7 +25,7 @@
/**
* Wrapper for family of IP-based protocols.
*/
-public class IpProvider extends ManagerBase implements FamilyProvider {
+public class PubberIpProvider extends ManagerBase implements FamilyProvider {
public static final int DEFAULT_METRIC = 0;
private static final List familyPatterns = ImmutableList.of(
@@ -39,13 +40,15 @@ public class IpProvider extends ManagerBase implements FamilyProvider {
);
private final LocalnetManager localnetHost;
+ private final String family;
/**
* Create a basic provider instance.
*/
- public IpProvider(ManagerHost host, String family, String deviceId) {
+ public PubberIpProvider(ManagerHost host, String family, String deviceId) {
super(host, deviceId);
localnetHost = (LocalnetManager) host;
+ this.family = family;
populateInterfaceAddresses();
}
@@ -127,16 +130,19 @@ private String getDefaultInterface() {
*/
private void populateInterfaceAddresses() {
String defaultInterface = getDefaultInterface();
- info("Using addresses from default interface " + defaultInterface);
+ info("Using addresses from default interface: " + defaultInterface + " for family: " + family);
Map interfaceAddresses = ofNullable(
getInterfaceAddresses(defaultInterface)).orElse(ImmutableMap.of());
interfaceAddresses.entrySet().forEach(this::addStateMapEntry);
}
private void addStateMapEntry(Entry entry) {
+ String family = entry.getKey();
+ if (!Objects.equals(this.family, family)) {
+ return;
+ }
FamilyLocalnetState stateEntry = new FamilyLocalnetState();
stateEntry.addr = entry.getValue();
- String family = entry.getKey();
info("Family " + family + " address is " + stateEntry.addr);
localnetHost.update(family, stateEntry);
}
diff --git a/pubber/src/main/java/daq/pubber/PubberLocalnetManager.java b/pubber/src/main/java/daq/pubber/PubberLocalnetManager.java
index 1cea667097..0cf72c0077 100644
--- a/pubber/src/main/java/daq/pubber/PubberLocalnetManager.java
+++ b/pubber/src/main/java/daq/pubber/PubberLocalnetManager.java
@@ -3,7 +3,6 @@
import com.google.udmi.util.SiteModel;
import java.util.HashMap;
import java.util.Map;
-import java.util.stream.Collectors;
import udmi.lib.ProtocolFamily;
import udmi.lib.client.LocalnetManager;
import udmi.lib.intf.FamilyProvider;
@@ -23,10 +22,10 @@ public class PubberLocalnetManager extends PubberManager implements LocalnetMana
static Map> LOCALNET_PROVIDERS =
Map.of(
- ProtocolFamily.VENDOR, VendorProvider.class,
- ProtocolFamily.IPV_4, IpProvider.class,
- ProtocolFamily.IPV_6, IpProvider.class,
- ProtocolFamily.ETHER, IpProvider.class);
+ ProtocolFamily.VENDOR, PubberVendorProvider.class,
+ ProtocolFamily.IPV_4, PubberIpProvider.class,
+ ProtocolFamily.IPV_6, PubberIpProvider.class,
+ ProtocolFamily.ETHER, PubberIpProvider.class);
/**
* Create a new container with the given host.
@@ -35,8 +34,13 @@ public PubberLocalnetManager(ManagerHost host, PubberConfiguration configuration
super(host, configuration);
localnetState = new LocalnetState();
localnetState.families = new HashMap<>();
- localnetProviders = LOCALNET_PROVIDERS
- .keySet().stream().collect(Collectors.toMap(family -> family, this::instantiateProvider));
+
+ localnetProviders = new HashMap<>();
+ LOCALNET_PROVIDERS.forEach((family, providerClass) -> {
+ if (host instanceof Pubber || providerClass != PubberIpProvider.class) {
+ localnetProviders.put(family, instantiateProvider(family));
+ }
+ });
}
/**
@@ -53,7 +57,8 @@ FamilyProvider instantiateProvider(String family) {
}
public void setSiteModel(SiteModel siteModel) {
- ((VendorProvider) getLocalnetProviders().get(ProtocolFamily.VENDOR)).setSiteModel(siteModel);
+ ((PubberVendorProvider) getLocalnetProviders().get(ProtocolFamily.VENDOR))
+ .setSiteModel(siteModel);
}
diff --git a/pubber/src/main/java/daq/pubber/PubberManager.java b/pubber/src/main/java/daq/pubber/PubberManager.java
index 9313d04c39..9ea103a893 100644
--- a/pubber/src/main/java/daq/pubber/PubberManager.java
+++ b/pubber/src/main/java/daq/pubber/PubberManager.java
@@ -26,6 +26,6 @@ public PubberManager(ManagerHost host, PubberConfiguration configuration) {
@Override
protected int getIntervalSec(Integer sampleRateSec) {
- return (int) ofNullable(options.fixedSampleRate).orElse(super.getIntervalSec(sampleRateSec));
+ return ofNullable(options.fixedSampleRate).orElse(super.getIntervalSec(sampleRateSec));
}
}
diff --git a/pubber/src/main/java/daq/pubber/PubberPointsetManager.java b/pubber/src/main/java/daq/pubber/PubberPointsetManager.java
index 83dc8d0121..917560602a 100644
--- a/pubber/src/main/java/daq/pubber/PubberPointsetManager.java
+++ b/pubber/src/main/java/daq/pubber/PubberPointsetManager.java
@@ -72,9 +72,9 @@ private static PointPointsetModel makePointPointsetModel(boolean writable) {
private AbstractPoint makePoint(String name, PointPointsetModel point) {
if (BOOLEAN_UNITS.contains(point.units)) {
- return new RandomBoolean(name, point);
+ return new PubberRandomBoolean(name, point);
} else {
- return new RandomPoint(name, point);
+ return new PubberRandomPoint(name, point);
}
}
diff --git a/pubber/src/main/java/daq/pubber/ProxyDevice.java b/pubber/src/main/java/daq/pubber/PubberProxyDevice.java
similarity index 84%
rename from pubber/src/main/java/daq/pubber/ProxyDevice.java
rename to pubber/src/main/java/daq/pubber/PubberProxyDevice.java
index 2662a688e9..45fef9ef42 100644
--- a/pubber/src/main/java/daq/pubber/ProxyDevice.java
+++ b/pubber/src/main/java/daq/pubber/PubberProxyDevice.java
@@ -2,23 +2,19 @@
import static com.google.udmi.util.GeneralUtils.catchToNull;
import static com.google.udmi.util.GeneralUtils.deepCopy;
-import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
-import static java.lang.String.format;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import udmi.lib.base.MqttDevice;
import udmi.lib.client.DeviceManager;
import udmi.lib.client.ProxyDeviceHost;
import udmi.lib.intf.ManagerHost;
-import udmi.schema.Config;
import udmi.schema.Metadata;
import udmi.schema.PubberConfiguration;
/**
* Wrapper for a complete device construct.
*/
-public class ProxyDevice extends PubberManager implements ProxyDeviceHost {
+public class PubberProxyDevice extends PubberManager implements ProxyDeviceHost {
private static final long STATE_INTERVAL_MS = 1000;
final PubberDeviceManager deviceManager;
@@ -28,12 +24,13 @@ public class ProxyDevice extends PubberManager implements ProxyDeviceHost {
/**
* New instance.
*/
- public ProxyDevice(ManagerHost host, String id, PubberConfiguration pubberConfig) {
+ public PubberProxyDevice(ManagerHost host, String id, PubberConfiguration pubberConfig) {
super(host, makeProxyConfiguration(host, id, pubberConfig));
// Simple shortcut to get access to some foundational mechanisms inside of Pubber.
pubberHost = (Pubber) host;
deviceManager = new PubberDeviceManager(this, makeProxyConfiguration(host, id,
pubberConfig));
+ deviceManager.setSiteModel(pubberHost.getSiteModel());
executor.scheduleAtFixedRate(this::publishDirtyState, STATE_INTERVAL_MS, STATE_INTERVAL_MS,
TimeUnit.MILLISECONDS);
}
@@ -42,7 +39,7 @@ private static PubberConfiguration makeProxyConfiguration(ManagerHost host, Stri
PubberConfiguration config) {
PubberConfiguration proxyConfiguration = deepCopy(config);
proxyConfiguration.deviceId = id;
- Metadata metadata = ((Pubber) host).getMetadata(id);
+ Metadata metadata = ((Pubber) host).getSiteModel().getMetadata(id);
proxyConfiguration.serialNo = catchToNull(() -> metadata.system.serial_no);
return proxyConfiguration;
}
diff --git a/pubber/src/main/java/daq/pubber/PubSubClient.java b/pubber/src/main/java/daq/pubber/PubberPubSubClient.java
similarity index 93%
rename from pubber/src/main/java/daq/pubber/PubSubClient.java
rename to pubber/src/main/java/daq/pubber/PubberPubSubClient.java
index fc074a1937..6817c365ea 100644
--- a/pubber/src/main/java/daq/pubber/PubSubClient.java
+++ b/pubber/src/main/java/daq/pubber/PubberPubSubClient.java
@@ -16,9 +16,9 @@
/**
* Wrapper class for a PubSub client.
*/
-public class PubSubClient {
+public class PubberPubSubClient {
- private static final Logger LOG = LoggerFactory.getLogger(PubSubClient.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PubberPubSubClient.class);
private final String subscriptionName;
private final GrpcSubscriberStub subscriber;
@@ -29,7 +29,7 @@ public class PubSubClient {
* @param projectId GCP project id
* @param subscriptionId PubSub subscription
*/
- public PubSubClient(String projectId, String subscriptionId) {
+ public PubberPubSubClient(String projectId, String subscriptionId) {
subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
LOG.info("Using PubSub subscription " + subscriptionName);
try {
@@ -89,4 +89,4 @@ static class Bundle {
String body;
Map attributes;
}
-}
\ No newline at end of file
+}
diff --git a/pubber/src/main/java/daq/pubber/RandomBoolean.java b/pubber/src/main/java/daq/pubber/PubberRandomBoolean.java
similarity index 77%
rename from pubber/src/main/java/daq/pubber/RandomBoolean.java
rename to pubber/src/main/java/daq/pubber/PubberRandomBoolean.java
index b935cb1412..e042091121 100644
--- a/pubber/src/main/java/daq/pubber/RandomBoolean.java
+++ b/pubber/src/main/java/daq/pubber/PubberRandomBoolean.java
@@ -1,5 +1,6 @@
package daq.pubber;
+import udmi.lib.base.BasicPoint;
import udmi.lib.intf.AbstractPoint;
import udmi.schema.PointPointsetModel;
import udmi.schema.RefDiscovery;
@@ -7,9 +8,9 @@
/**
* Represents a random boolean point.
*/
-public class RandomBoolean extends BasicPoint implements AbstractPoint {
+public class PubberRandomBoolean extends BasicPoint implements AbstractPoint {
- public RandomBoolean(String name, PointPointsetModel pointModel) {
+ public PubberRandomBoolean(String name, PointPointsetModel pointModel) {
super(name, pointModel);
}
diff --git a/pubber/src/main/java/daq/pubber/RandomPoint.java b/pubber/src/main/java/daq/pubber/PubberRandomPoint.java
similarity index 90%
rename from pubber/src/main/java/daq/pubber/RandomPoint.java
rename to pubber/src/main/java/daq/pubber/PubberRandomPoint.java
index cb5ad79603..930cb31d94 100644
--- a/pubber/src/main/java/daq/pubber/RandomPoint.java
+++ b/pubber/src/main/java/daq/pubber/PubberRandomPoint.java
@@ -1,5 +1,6 @@
package daq.pubber;
+import udmi.lib.base.BasicPoint;
import udmi.lib.intf.AbstractPoint;
import udmi.schema.PointPointsetModel;
import udmi.schema.RefDiscovery;
@@ -7,7 +8,7 @@
/**
* Represents a randomly generated numerical point.
*/
-public class RandomPoint extends BasicPoint implements AbstractPoint {
+public class PubberRandomPoint extends BasicPoint implements AbstractPoint {
private static final double DEFAULT_BASELINE_VALUE = 50;
private final double min;
@@ -17,7 +18,7 @@ public class RandomPoint extends BasicPoint implements AbstractPoint {
/**
* Creates a random point generator for data simulation.
*/
- public RandomPoint(String name, PointPointsetModel pointModel) {
+ public PubberRandomPoint(String name, PointPointsetModel pointModel) {
super(name, pointModel);
double baselineValue = convertValue(pointModel.baseline_value, DEFAULT_BASELINE_VALUE);
double baselineTolerance = convertValue(pointModel.baseline_tolerance, baselineValue);
diff --git a/pubber/src/main/java/daq/pubber/PubberSystemManager.java b/pubber/src/main/java/daq/pubber/PubberSystemManager.java
index ab811d02e1..376f194c0e 100644
--- a/pubber/src/main/java/daq/pubber/PubberSystemManager.java
+++ b/pubber/src/main/java/daq/pubber/PubberSystemManager.java
@@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
import udmi.lib.client.SystemManager;
import udmi.lib.intf.ManagerHost;
import udmi.schema.Entry;
@@ -48,10 +47,8 @@ public class PubberSystemManager extends PubberManager implements SystemManager
private final List logentries = new ArrayList<>();
private final ExtraSystemState systemState;
private final ManagerHost host;
- private int systemEventCount;
private SystemConfig systemConfig;
private boolean publishingLog;
- private static final Logger LOG = Pubber.LOG;
/**
* New instance.
@@ -62,10 +59,9 @@ public PubberSystemManager(ManagerHost host, PubberConfiguration configuration)
if (host instanceof Pubber pubberHost) {
initializeLogger(pubberHost);
+ info("Device start time is " + isoConvert(DEVICE_START_TIME));
}
- info("Device start time is " + isoConvert(DEVICE_START_TIME));
-
systemState = new ExtraSystemState();
systemState.operation = new StateSystemOperation();
@@ -141,12 +137,12 @@ public void systemLifecycle(SystemMode mode) {
public void localLog(String message, Level level, String timestamp, String detail) {
String detailPostfix = detail == null ? "" : ":\n" + detail;
String logMessage = format("%s %s%s", timestamp, message, detailPostfix);
- udmi.lib.client.SystemManager.getLogMap().apply(LOG).get(level).accept(logMessage);
+ SystemManager.getLogMap().apply(Pubber.LOG).get(level).accept(logMessage);
try {
PrintStream stream;
if (host instanceof Pubber pubberHost) {
stream = pubberHost.logPrintWriter;
- } else if (host instanceof ProxyDevice proxyHost) {
+ } else if (host instanceof PubberProxyDevice proxyHost) {
stream = proxyHost.pubberHost.logPrintWriter;
} else {
throw new RuntimeException("While writing log output file: Unknown host");
diff --git a/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java b/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java
index 8e7b38b34d..5e53858824 100644
--- a/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java
+++ b/pubber/src/main/java/daq/pubber/PubberUdmiPublisher.java
@@ -266,6 +266,7 @@ static String getGatewayId(String targetId, PubberConfiguration configuration) {
* whether the device is a gateway or a proxy device.
*/
default void registerMessageHandlers() {
+ getDeviceTarget().unregisterHandlers();
getDeviceTarget().registerHandler(CONFIG_TOPIC, this::configHandler, Config.class);
String gatewayId = getGatewayId(getDeviceId(), getConfig());
if (isGatewayDevice()) {
@@ -279,7 +280,6 @@ default void registerMessageHandlers() {
}
}
-
default MqttDevice getMqttDevice(String proxyId) {
return new MqttDevice(proxyId, getDeviceTarget());
}
@@ -329,7 +329,7 @@ default void publisherHandler(String type, String phase, Throwable cause, String
Entry report = entryFromException(category, cause);
getDeviceManager().localLog(report);
publishLogMessage(report, targetId);
- ifTrueThen(getDeviceId().equals(targetId), () -> registerSystemStatus(report));
+ registerSystemStatus(report, targetId);
}
void error(String s);
@@ -337,10 +337,9 @@ default void publisherHandler(String type, String phase, Throwable cause, String
/**
* Register a system status entry.
*/
- default void registerSystemStatus(Entry report) {
+ default void registerSystemStatus(Entry report, String targetId) {
if (isNotTrue(getOptions().noStatus)) {
- getDeviceState().system.status = report;
- markStateDirty();
+ getDeviceManager().setStatus(report, targetId);
}
}
@@ -405,11 +404,11 @@ private void configHandler(Config config) {
configPreprocess(getDeviceId(), config);
debug(format("Config update %s%s", getDeviceId(), getDeviceManager().getTestingTag()),
toJsonString(config));
- processConfigUpdate(config);
if (getConfigLatch().getCount() > 0) {
warn("Received config for config latch " + getDeviceId());
getConfigLatch().countDown();
}
+ processConfigUpdate(config);
publisherConfigLog("apply", null, getDeviceId());
} catch (Exception e) {
publisherConfigLog("apply", e, getDeviceId());
@@ -451,8 +450,8 @@ private void processConfigUpdate(Config configMsg) {
} catch (Exception e) {
throw new RuntimeException("While acquiring state lock", e);
}
-
try {
+ updateInterval(DEFAULT_REPORT_SEC);
if (configMsg != null) {
if (configMsg.system == null && isTrue(getConfig().options.barfConfig)) {
error("Empty config system block and configured to restart on bad config!");
@@ -465,7 +464,6 @@ private void processConfigUpdate(Config configMsg) {
} else {
info(getTimestamp() + " defaulting empty config");
}
- updateInterval(DEFAULT_REPORT_SEC);
} finally {
getStateLock().unlock();
}
@@ -895,7 +893,6 @@ default void publishDeviceMessage(String targetId, Object message, Runnable call
error("publisher not active");
return;
}
-
String topicSuffix = MESSAGE_TOPIC_SUFFIX_MAP.get(message.getClass());
if (topicSuffix == null) {
error("Unknown message class " + message.getClass());
@@ -907,11 +904,6 @@ default void publishDeviceMessage(String targetId, Object message, Runnable call
return;
}
- if (getDeviceTarget() == null) {
- error("publisher not active");
- return;
- }
-
if (isTrue(getOptions().noFolder) && topicSuffix.equals(SYSTEM_EVENT_TOPIC)) {
topicSuffix = RAW_EVENT_TOPIC;
}
diff --git a/pubber/src/main/java/daq/pubber/VendorProvider.java b/pubber/src/main/java/daq/pubber/PubberVendorProvider.java
similarity index 93%
rename from pubber/src/main/java/daq/pubber/VendorProvider.java
rename to pubber/src/main/java/daq/pubber/PubberVendorProvider.java
index d0711cfe7e..ab4d0e53d7 100644
--- a/pubber/src/main/java/daq/pubber/VendorProvider.java
+++ b/pubber/src/main/java/daq/pubber/PubberVendorProvider.java
@@ -24,13 +24,13 @@
/**
* Basic provider for the Vendor protocol family.
*/
-public class VendorProvider extends ManagerBase implements FamilyProvider {
+public class PubberVendorProvider extends ManagerBase implements FamilyProvider {
private final LocalnetManager localnetHost;
private SiteModel siteModel;
private String selfAddr;
- public VendorProvider(ManagerHost host, String family, String deviceId) {
+ public PubberVendorProvider(ManagerHost host, String family, String deviceId) {
super(host, deviceId);
localnetHost = (LocalnetManager) host;
}
diff --git a/pubber/src/main/java/daq/pubber/BasicPoint.java b/pubber/src/main/java/udmi/lib/base/BasicPoint.java
similarity index 99%
rename from pubber/src/main/java/daq/pubber/BasicPoint.java
rename to pubber/src/main/java/udmi/lib/base/BasicPoint.java
index 9bb8e52d6b..d347ad074b 100644
--- a/pubber/src/main/java/daq/pubber/BasicPoint.java
+++ b/pubber/src/main/java/udmi/lib/base/BasicPoint.java
@@ -1,4 +1,4 @@
-package daq.pubber;
+package udmi.lib.base;
import static com.google.udmi.util.GeneralUtils.deepCopy;
import static com.google.udmi.util.GeneralUtils.getNow;
diff --git a/pubber/src/main/java/udmi/lib/base/ListPublisher.java b/pubber/src/main/java/udmi/lib/base/ListPublisher.java
index 797e0167c8..310f78e78e 100644
--- a/pubber/src/main/java/udmi/lib/base/ListPublisher.java
+++ b/pubber/src/main/java/udmi/lib/base/ListPublisher.java
@@ -58,6 +58,11 @@ public void registerHandler(String deviceId, String topicSuffix,
handlers.put(topicSuffix, new SimpleEntry<>(foo, clazz));
}
+ @Override
+ public void unregisterHandlers() {
+ handlers.clear();
+ }
+
@Override
public void connect(String deviceId, boolean clean) {
Consumer