Skip to content

Commit

Permalink
Fix Proxy Device Subscription Issue and Improve Stability (faucetsdn#…
Browse files Browse the repository at this point in the history
  • Loading branch information
MertCingoz authored Dec 9, 2024
1 parent 4d24be8 commit 3515783
Show file tree
Hide file tree
Showing 34 changed files with 190 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

<module name="SuppressWarningsFilter"/>
<module name="TreeWalker">
<module name="UnusedImports"/>
<module name="SuppressWarningsHolder"/>
<module name="OuterTypeFilename"/>
<module name="IllegalTokenText">
Expand Down
1 change: 1 addition & 0 deletions pubber/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jacocoTestReport {
checkstyle {
ignoreFailures = false
maxWarnings = 0
configFile = file('../etc/checkstyle.xml')
}
checkstyleMain.source = 'src/main/java'

Expand Down
32 changes: 15 additions & 17 deletions pubber/src/main/java/daq/pubber/Pubber.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import com.google.udmi.util.CertManager;
import com.google.udmi.util.SiteModel;
import com.google.udmi.util.SiteModel.MetadataException;
import daq.pubber.PubSubClient.Bundle;
import daq.pubber.PubberPubSubClient.Bundle;
import java.io.File;
import java.io.PrintStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -79,7 +79,7 @@ public class Pubber extends PubberManager implements PubberUdmiPublisher {

private static final String PUBSUB_SITE = "PubSub";

private static final Map<String, AtomicInteger> MESSAGE_COUNTS = new HashMap<>();
private static final Map<String, AtomicInteger> MESSAGE_COUNTS = new ConcurrentHashMap<>();
private static final int CONNECT_RETRIES = 10;
private static final AtomicInteger retriesRemaining = new AtomicInteger(CONNECT_RETRIES);
private static final long RESTART_DELAY_MS = 1000;
Expand All @@ -94,7 +94,7 @@ public class Pubber extends PubberManager implements PubberUdmiPublisher {
private CountDownLatch configLatch;
private MqttDevice deviceTarget;
private long lastStateTimeMs;
private PubSubClient pubSubClient;
private PubberPubSubClient pubSubClient;
private Function<String, Boolean> connectionDone;
private String workingEndpoint;
private String attemptedEndpoint;
Expand Down Expand Up @@ -136,7 +136,7 @@ public Pubber(String iotProject, String sitePath, String deviceId, String serial
checkState(outDir.mkdirs(), "could not make out dir " + outDir.getAbsolutePath());
}
if (PUBSUB_SITE.equals(sitePath)) {
pubSubClient = new PubSubClient(iotProject, deviceId);
pubSubClient = new PubberPubSubClient(iotProject, deviceId);
}
}

Expand Down Expand Up @@ -331,11 +331,6 @@ private File getPersistentStore() {
new File(siteModel.getDeviceWorkingDir(deviceId), PERSISTENT_STORE_FILE);
}

private void markStateDirty(Runnable action) {
action.run();
markStateDirty();
}

@Override
public void markStateDirty(long delayMs) {
stateDirty.set(true);
Expand Down Expand Up @@ -379,9 +374,12 @@ private void processDeviceMetadata(Metadata metadata) {
targetSchema = ifNotNullGet(metadata.system.device_version, SchemaVersion::fromKey);
ifNotNullThen(targetSchema, version -> warn("Emulating UDMI version " + version.key()));

config.serialNo = catchToNull(() -> metadata.system.serial_no);

config.gatewayId = catchToNull(() -> metadata.gateway.gateway_id);
if (config.serialNo == null) {
config.serialNo = catchToNull(() -> metadata.system.serial_no);
}
if (config.gatewayId == null) {
config.gatewayId = catchToNull(() -> metadata.gateway.gateway_id);
}

config.algorithm = config.gatewayId == null
? catchToNull(() -> metadata.cloud.auth_type.value())
Expand Down Expand Up @@ -431,9 +429,11 @@ private boolean attemptConnection() {
try {
isConnected = false;
deviceManager.stop();
super.stop();
if (deviceTarget == null) {
throw new RuntimeException("Mqtt publisher not initialized");
}
registerMessageHandlers();
connect();
configLatchWait();
isConnected = true;
Expand Down Expand Up @@ -488,7 +488,6 @@ public void initializeMqtt() {
siteModel.getDeviceDir(targetDeviceId), endpoint.transport, keyPassword,
this::info);
deviceTarget = new MqttDevice(endpoint, this::publisherException, certManager);
registerMessageHandlers();
publishDirtyState();
}

Expand Down Expand Up @@ -560,7 +559,6 @@ public String traceTimestamp(String messageBase) {
return messageBase + (isTrue(config.options.messageTrace) ? ("_" + timestamp) : "");
}


private void trace(String message) {
cloudLog(message, Level.TRACE);
}
Expand Down Expand Up @@ -712,7 +710,7 @@ public Map<Level, Consumer<String>> getLogMap() {
return SystemManager.getLogMap().apply(LOG);
}

public Metadata getMetadata(String id) {
return siteModel.getMetadata(id);
public SiteModel getSiteModel() {
return siteModel;
}
}
29 changes: 14 additions & 15 deletions pubber/src/main/java/daq/pubber/PubberDeviceManager.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package daq.pubber;

import com.google.udmi.util.SiteModel;
import java.util.Date;
import java.util.Arrays;
import java.util.List;
import udmi.lib.client.DeviceManager;
import udmi.lib.client.DiscoveryManager;
import udmi.lib.client.GatewayManager;
import udmi.lib.client.LocalnetManager;
import udmi.lib.client.PointsetManager;
import udmi.lib.client.SubBlockManager;
import udmi.lib.client.SystemManager;
import udmi.lib.intf.ManagerHost;
import udmi.schema.Config;
Expand All @@ -22,23 +24,28 @@ public class PubberDeviceManager extends PubberManager implements DeviceManager
private final PubberLocalnetManager localnetManager;
private final PubberGatewayManager gatewayManager;
private final PubberDiscoveryManager discoveryManager;
private Date lastConfigTimestamp;
private final List<SubBlockManager> subManagers;

/**
* Create a new instance.
* Managers are logically ordered to ensure proper initialization and shutdown.
* Stop/shutdown order is the reverse of the boot order.
* SystemManager should be created first b/c logging dependency.
* The remaining managers are placed in a logical boot/shutdown order.
*/
public PubberDeviceManager(ManagerHost host, PubberConfiguration configuration) {
super(host, configuration);
systemManager = new PubberSystemManager(host, configuration);
pointsetManager = new PubberPointsetManager(host, configuration);
localnetManager = new PubberLocalnetManager(host, configuration);
gatewayManager = new PubberGatewayManager(host, configuration);
pointsetManager = new PubberPointsetManager(host, configuration);
discoveryManager = new PubberDiscoveryManager(host, configuration, this);
gatewayManager = new PubberGatewayManager(host, configuration);
subManagers = Arrays.asList(
gatewayManager, discoveryManager, pointsetManager, localnetManager, systemManager);
}

@Override
public void updateConfig(Config config) {
lastConfigTimestamp = config.timestamp;
DeviceManager.super.updateConfig(config);
}

Expand Down Expand Up @@ -72,22 +79,15 @@ public DiscoveryManager getDiscoveryManager() {
*/
@Override
public void shutdown() {
getGatewayManager().shutdown();
getLocalnetManager().shutdown();
getPointsetManager().shutdown();
getSystemManager().shutdown();
subManagers.forEach(SubBlockManager::shutdown);
}


/**
* Stop periodic senders.
*/
@Override
public void stop() {
getGatewayManager().stop();
getLocalnetManager().stop();
getPointsetManager().stop();
getSystemManager().stop();
subManagers.forEach(SubBlockManager::stop);
}

/**
Expand All @@ -98,5 +98,4 @@ protected void setSiteModel(SiteModel siteModel) {
gatewayManager.setSiteModel(siteModel);
localnetManager.setSiteModel(siteModel);
}

}
4 changes: 2 additions & 2 deletions pubber/src/main/java/daq/pubber/PubberGatewayManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void activate() {

@Override
public ProxyDeviceHost makeExtraDevice() {
return new ProxyDevice(getHost(), EXTRA_PROXY_DEVICE, config);
return new PubberProxyDevice(getHost(), EXTRA_PROXY_DEVICE, config);
}

/**
Expand Down Expand Up @@ -140,6 +140,6 @@ public Map<String, ProxyDeviceHost> createProxyDevices(List<String> proxyIds) {

@Override
public ProxyDeviceHost createProxyDevice(ManagerHost host, String id) {
return new ProxyDevice(host, id, config);
return new PubberProxyDevice(host, id, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
Expand All @@ -24,7 +25,7 @@
/**
* Wrapper for family of IP-based protocols.
*/
public class IpProvider extends ManagerBase implements FamilyProvider {
public class PubberIpProvider extends ManagerBase implements FamilyProvider {

public static final int DEFAULT_METRIC = 0;
private static final List<Pattern> familyPatterns = ImmutableList.of(
Expand All @@ -39,13 +40,15 @@ public class IpProvider extends ManagerBase implements FamilyProvider {
);

private final LocalnetManager localnetHost;
private final String family;

/**
* Create a basic provider instance.
*/
public IpProvider(ManagerHost host, String family, String deviceId) {
public PubberIpProvider(ManagerHost host, String family, String deviceId) {
super(host, deviceId);
localnetHost = (LocalnetManager) host;
this.family = family;
populateInterfaceAddresses();
}

Expand Down Expand Up @@ -127,16 +130,19 @@ private String getDefaultInterface() {
*/
private void populateInterfaceAddresses() {
String defaultInterface = getDefaultInterface();
info("Using addresses from default interface " + defaultInterface);
info("Using addresses from default interface: " + defaultInterface + " for family: " + family);
Map<String, String> interfaceAddresses = ofNullable(
getInterfaceAddresses(defaultInterface)).orElse(ImmutableMap.of());
interfaceAddresses.entrySet().forEach(this::addStateMapEntry);
}

private void addStateMapEntry(Entry<String, String> entry) {
String family = entry.getKey();
if (!Objects.equals(this.family, family)) {
return;
}
FamilyLocalnetState stateEntry = new FamilyLocalnetState();
stateEntry.addr = entry.getValue();
String family = entry.getKey();
info("Family " + family + " address is " + stateEntry.addr);
localnetHost.update(family, stateEntry);
}
Expand Down
21 changes: 13 additions & 8 deletions pubber/src/main/java/daq/pubber/PubberLocalnetManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.udmi.util.SiteModel;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import udmi.lib.ProtocolFamily;
import udmi.lib.client.LocalnetManager;
import udmi.lib.intf.FamilyProvider;
Expand All @@ -23,10 +22,10 @@ public class PubberLocalnetManager extends PubberManager implements LocalnetMana

static Map<String, Class<? extends FamilyProvider>> LOCALNET_PROVIDERS =
Map.of(
ProtocolFamily.VENDOR, VendorProvider.class,
ProtocolFamily.IPV_4, IpProvider.class,
ProtocolFamily.IPV_6, IpProvider.class,
ProtocolFamily.ETHER, IpProvider.class);
ProtocolFamily.VENDOR, PubberVendorProvider.class,
ProtocolFamily.IPV_4, PubberIpProvider.class,
ProtocolFamily.IPV_6, PubberIpProvider.class,
ProtocolFamily.ETHER, PubberIpProvider.class);

/**
* Create a new container with the given host.
Expand All @@ -35,8 +34,13 @@ public PubberLocalnetManager(ManagerHost host, PubberConfiguration configuration
super(host, configuration);
localnetState = new LocalnetState();
localnetState.families = new HashMap<>();
localnetProviders = LOCALNET_PROVIDERS
.keySet().stream().collect(Collectors.toMap(family -> family, this::instantiateProvider));

localnetProviders = new HashMap<>();
LOCALNET_PROVIDERS.forEach((family, providerClass) -> {
if (host instanceof Pubber || providerClass != PubberIpProvider.class) {
localnetProviders.put(family, instantiateProvider(family));
}
});
}

/**
Expand All @@ -53,7 +57,8 @@ FamilyProvider instantiateProvider(String family) {
}

public void setSiteModel(SiteModel siteModel) {
((VendorProvider) getLocalnetProviders().get(ProtocolFamily.VENDOR)).setSiteModel(siteModel);
((PubberVendorProvider) getLocalnetProviders().get(ProtocolFamily.VENDOR))
.setSiteModel(siteModel);
}


Expand Down
2 changes: 1 addition & 1 deletion pubber/src/main/java/daq/pubber/PubberManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public PubberManager(ManagerHost host, PubberConfiguration configuration) {

@Override
protected int getIntervalSec(Integer sampleRateSec) {
return (int) ofNullable(options.fixedSampleRate).orElse(super.getIntervalSec(sampleRateSec));
return ofNullable(options.fixedSampleRate).orElse(super.getIntervalSec(sampleRateSec));
}
}
4 changes: 2 additions & 2 deletions pubber/src/main/java/daq/pubber/PubberPointsetManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ private static PointPointsetModel makePointPointsetModel(boolean writable) {

private AbstractPoint makePoint(String name, PointPointsetModel point) {
if (BOOLEAN_UNITS.contains(point.units)) {
return new RandomBoolean(name, point);
return new PubberRandomBoolean(name, point);
} else {
return new RandomPoint(name, point);
return new PubberRandomPoint(name, point);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,19 @@

import static com.google.udmi.util.GeneralUtils.catchToNull;
import static com.google.udmi.util.GeneralUtils.deepCopy;
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
import static java.lang.String.format;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import udmi.lib.base.MqttDevice;
import udmi.lib.client.DeviceManager;
import udmi.lib.client.ProxyDeviceHost;
import udmi.lib.intf.ManagerHost;
import udmi.schema.Config;
import udmi.schema.Metadata;
import udmi.schema.PubberConfiguration;

/**
* Wrapper for a complete device construct.
*/
public class ProxyDevice extends PubberManager implements ProxyDeviceHost {
public class PubberProxyDevice extends PubberManager implements ProxyDeviceHost {

private static final long STATE_INTERVAL_MS = 1000;
final PubberDeviceManager deviceManager;
Expand All @@ -28,12 +24,13 @@ public class ProxyDevice extends PubberManager implements ProxyDeviceHost {
/**
* New instance.
*/
public ProxyDevice(ManagerHost host, String id, PubberConfiguration pubberConfig) {
public PubberProxyDevice(ManagerHost host, String id, PubberConfiguration pubberConfig) {
super(host, makeProxyConfiguration(host, id, pubberConfig));
// Simple shortcut to get access to some foundational mechanisms inside of Pubber.
pubberHost = (Pubber) host;
deviceManager = new PubberDeviceManager(this, makeProxyConfiguration(host, id,
pubberConfig));
deviceManager.setSiteModel(pubberHost.getSiteModel());
executor.scheduleAtFixedRate(this::publishDirtyState, STATE_INTERVAL_MS, STATE_INTERVAL_MS,
TimeUnit.MILLISECONDS);
}
Expand All @@ -42,7 +39,7 @@ private static PubberConfiguration makeProxyConfiguration(ManagerHost host, Stri
PubberConfiguration config) {
PubberConfiguration proxyConfiguration = deepCopy(config);
proxyConfiguration.deviceId = id;
Metadata metadata = ((Pubber) host).getMetadata(id);
Metadata metadata = ((Pubber) host).getSiteModel().getMetadata(id);
proxyConfiguration.serialNo = catchToNull(() -> metadata.system.serial_no);
return proxyConfiguration;
}
Expand Down
Loading

0 comments on commit 3515783

Please sign in to comment.