diff --git a/bin/start_hivemq b/bin/start_hivemq index da24080b0f..535be780f0 100755 --- a/bin/start_hivemq +++ b/bin/start_hivemq @@ -1,3 +1,5 @@ #!/bin/bash -e +sudo systemctl stop mosquitto || true + docker run -p 1883:1883 hivemq/hivemq-ce diff --git a/udmis/etc/prod_pod.json b/udmis/etc/prod_pod.json index 5a4b91da85..d2ea4e89d6 100644 --- a/udmis/etc/prod_pod.json +++ b/udmis/etc/prod_pod.json @@ -13,6 +13,7 @@ }, "crons": { "reglist": { + "periodic_sec": 10, "payload": "query/cloud:{}", "send_id": "${UDMI_PREFIX}udmi_control" } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/ControlProcessor.java b/udmis/src/main/java/com/google/bos/udmi/service/core/ControlProcessor.java index db395bca5f..d715ebc2b2 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/ControlProcessor.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/ControlProcessor.java @@ -1,5 +1,6 @@ package com.google.bos.udmi.service.core; +import static com.google.udmi.util.JsonUtil.stringifyTerse; import static java.lang.String.format; import java.util.Objects; @@ -18,4 +19,10 @@ public ControlProcessor(EndpointConfiguration config) { private static String makeTransactionId() { return format("CP:%08x", Objects.hash(System.currentTimeMillis(), Thread.currentThread())); } + + @Override + protected void defaultHandler(Object message) { + debug("Received defaulted control message type %s: %s", message.getClass().getSimpleName(), + stringifyTerse(message)); + } } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/CronJob.java b/udmis/src/main/java/com/google/bos/udmi/service/core/CronJob.java deleted file mode 100644 index a6becfd15b..0000000000 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/CronJob.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.google.bos.udmi.service.core; - -import static com.google.udmi.util.GeneralUtils.ifTrueGet; -import static com.google.udmi.util.JsonUtil.fromStringStrict; -import static com.google.udmi.util.JsonUtil.stringifyTerse; - -import com.google.bos.udmi.service.messaging.impl.MessageDispatcherImpl; -import com.google.bos.udmi.service.pod.ContainerProvider; -import udmi.schema.EndpointConfiguration; -import udmi.schema.Envelope; -import udmi.schema.Envelope.SubFolder; -import udmi.schema.Envelope.SubType; - -/** - * Simple clas to manage a distributed cron execution environment. - */ -public class CronJob extends ProcessorBase { - - private static final String SELF_ID = "groot"; - private final Envelope envelope; - private final Object message; - - /** - * Create an instance with the given config. - */ - public CronJob(EndpointConfiguration config) { - super(config); - - distributorName = config.distributor; - - String[] targetMessage = config.payload.split(":", 2); - - String[] parts = targetMessage[0].split("/"); - envelope = new Envelope(); - envelope.subType = SubType.fromValue(parts[0]); - envelope.subFolder = SubFolder.fromValue(parts[1]); - envelope.deviceRegistryId = ifTrueGet(parts.length >= 3, () -> parts[3]); - envelope.deviceId = ifTrueGet(parts.length >= 4, () -> parts[4]); - envelope.gatewayId = SELF_ID + DistributorPipe.ROUTE_SEPERATOR + config.name; - - Class messageClass = MessageDispatcherImpl.getMessageClassFor(envelope, false); - String payload = ifTrueGet(targetMessage.length > 1, () -> targetMessage[1], EMPTY_JSON); - message = fromStringStrict(messageClass, payload); - - info("Set-up cron job for %s %s/%s to %s/%s", containerId, envelope.subType, envelope.subFolder, - envelope.deviceRegistryId, envelope.deviceId); - } - - public static ContainerProvider from(EndpointConfiguration config) { - return new CronJob(config); - } - - @Override - protected void periodicTask() { - info("Distributing %s %s/%s to %s/%s", containerId, envelope.subType, envelope.subFolder, - envelope.deviceRegistryId, envelope.deviceId); - distributor.publish(envelope, message, containerId); - handleTick(envelope, message); - } - - @Override - protected void defaultHandler(Object message) { - handleTick(getContinuation(message).getEnvelope(), message); - } - - private void handleTick(Envelope envelope, Object message) { - debug("Cron Job update " + stringifyTerse(envelope) + " " + stringifyTerse(message)); - } -} diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java b/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java new file mode 100644 index 0000000000..ae1e94ffbc --- /dev/null +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java @@ -0,0 +1,113 @@ +package com.google.bos.udmi.service.core; + +import static com.google.udmi.util.GeneralUtils.CSV_JOINER; +import static com.google.udmi.util.GeneralUtils.ifTrueGet; +import static com.google.udmi.util.GeneralUtils.ifTrueThen; +import static com.google.udmi.util.JsonUtil.fromStringStrict; +import static com.google.udmi.util.JsonUtil.isoConvert; +import static com.google.udmi.util.JsonUtil.stringifyTerse; + +import com.google.bos.udmi.service.messaging.impl.MessageDispatcherImpl; +import com.google.udmi.util.JsonUtil; +import java.time.Duration; +import java.util.Date; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicReference; +import udmi.schema.EndpointConfiguration; +import udmi.schema.Envelope; +import udmi.schema.Envelope.SubFolder; +import udmi.schema.Envelope.SubType; + +/** + * Simple clas to manage a distributed cron execution environment. + */ +public class CronProcessor extends ProcessorBase { + + public static final String PAYLOAD_SEPARATOR = ":"; + public static final String PATH_SEPARATOR = "/"; + private final Envelope srcEnvelope; + private final Object message; + private final SortedMap received = new ConcurrentSkipListMap<>(); + private final Duration waitAndListen; + private final AtomicReference previousTick = new AtomicReference<>(); + + /** + * Create an instance with the given config. + */ + public CronProcessor(EndpointConfiguration config) { + super(config); + + distributorName = config.distributor; + waitAndListen = Duration.ofSeconds(periodicSec / 2); + + String[] targetMessage = config.payload.split(PAYLOAD_SEPARATOR, 2); + + String[] parts = targetMessage[0].split(PATH_SEPARATOR, 4); + srcEnvelope = new Envelope(); + srcEnvelope.subType = SubType.fromValue(parts[0]); + srcEnvelope.subFolder = SubFolder.fromValue(parts[1]); + srcEnvelope.deviceRegistryId = ifTrueGet(parts.length >= 3, () -> parts[3]); + srcEnvelope.deviceId = ifTrueGet(parts.length >= 4, () -> parts[4]); + + Class messageClass = MessageDispatcherImpl.getMessageClassFor(srcEnvelope, false); + String payload = ifTrueGet(targetMessage.length > 1, () -> targetMessage[1], EMPTY_JSON); + message = fromStringStrict(messageClass, payload); + } + + @Override + protected void defaultHandler(Object message) { + trackPod(getContinuation(message).getEnvelope()); + } + + @Override + protected void periodicTask() { + info("Distributing %s %s/%s to %s/%s", containerId, srcEnvelope.subType, srcEnvelope.subFolder, + srcEnvelope.deviceRegistryId, srcEnvelope.deviceId); + + Date publishTime = new Date(); + srcEnvelope.publishTime = publishTime; + + distributor.publish(srcEnvelope, message, containerId); + + trackPod(srcEnvelope); + + Date before = previousTick.getAndSet(publishTime); + + // Wait for half the window, to make sure everybody has a chance to report in. + scheduleIn(waitAndListen, () -> ifTrueThen(isAmGroot(before), this::processGroot)); + } + + private boolean isAmGroot(Date cutoffTime) { + debug("Check grootness for %s after %s", srcEnvelope.gatewayId, isoConvert(cutoffTime)); + + if (cutoffTime == null) { + return false; + } + + debug("Received values: " + received.size() + " " + CSV_JOINER.join( + received.values().stream().map(JsonUtil::isoConvert).toList())); + received.entrySet().removeIf(entry -> entry.getValue().before(cutoffTime)); + debug("Received %s is groot: %s", received.firstKey(), CSV_JOINER.join(received.keySet())); + return srcEnvelope.gatewayId.equals(received.firstKey()); + } + + private void processGroot() { + debug("Publishing as %s: %s", stringifyTerse(srcEnvelope), stringifyTerse(message)); + publish(srcEnvelope, message); + } + + private void trackPod(Envelope envelope) { + debug("Pod timestamp update %s to %s", envelope.gatewayId, isoConvert(envelope.publishTime)); + received.put(envelope.gatewayId, envelope.publishTime); + debug("Received values: " + received.size() + " " + CSV_JOINER.join( + received.values().stream().map(JsonUtil::isoConvert).toList())); + } + + @Override + public void activate() { + super.activate(); + srcEnvelope.gatewayId = distributor.getRouteId(containerId); + info("Activated cron as %s", stringifyTerse(srcEnvelope)); + } +} diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/DistributorPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/core/DistributorPipe.java index 78079333c7..cbae5445be 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/DistributorPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/DistributorPipe.java @@ -1,7 +1,6 @@ package com.google.bos.udmi.service.core; import static com.google.udmi.util.GeneralUtils.deepCopy; -import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; import static com.google.udmi.util.JsonUtil.stringifyTerse; import static java.lang.String.format; @@ -41,12 +40,15 @@ public DistributorPipe(EndpointConfiguration config) { @Override protected void defaultHandler(Object message) { Envelope envelope = getContinuation(message).getEnvelope(); + debug("Handling distribution from " + stringifyTerse(envelope)); try { String[] routeId = envelope.gatewayId.split(ROUTE_SEPERATOR, 2); if (clientId.equals(routeId[0])) { + debug("Rejecting loopback client " + clientId); return; } Object component = UdmiServicePod.getComponent(routeId[1]); + debug("Routing result to " + component.getClass().getSimpleName()); if (component instanceof ProcessorBase processorBase) { processorBase.processMessage(envelope, message); } else { @@ -63,14 +65,17 @@ protected void defaultHandler(Object message) { public void publish(Envelope rawEnvelope, Object message, String source) { try { Envelope envelope = deepCopy(rawEnvelope); - String routeId = format("%s%s%s", clientId, ROUTE_SEPERATOR, source); + String routeId = getRouteId(source); debug("Distributing %s for %s/%s as %s", message.getClass().getSimpleName(), envelope.deviceRegistryId, envelope.deviceId, routeId); envelope.gatewayId = routeId; - super.publish(envelope, message); + publish(envelope, message); } catch (Exception e) { - error("Error distributing update: " + friendlyStackTrace(e)); + throw new RuntimeException("Error distributing update", e); } } + public String getRouteId(String source) { + return format("%s%s%s", clientId, ROUTE_SEPERATOR, source); + } } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java b/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java index ebe85cb170..fd9165b9a9 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/pod/ContainerBase.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableSet; import com.google.udmi.util.JsonUtil; import java.io.PrintStream; +import java.time.Duration; import java.util.Arrays; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -157,6 +158,11 @@ private void periodicTaskWrapper() { } protected void periodicTask() { + if (scheduledExecutor != null && !scheduledExecutor.isShutdown()) { + debug("Shutting down unused scheduled executor"); + scheduledExecutor.shutdown(); + return; + } throw new IllegalStateException("Unexpected periodic task execution"); } @@ -225,6 +231,10 @@ public void activate() { }); } + protected void scheduleIn(Duration duration, Runnable task) { + scheduledExecutor.schedule(task, duration.getSeconds(), TimeUnit.SECONDS); + } + public void debug(String format, Object... args) { debug(format(format, args)); } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java b/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java index c7106bb3ed..60b50db18c 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java @@ -7,7 +7,6 @@ import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; -import static com.google.udmi.util.GeneralUtils.ifNotNullThrow; import static com.google.udmi.util.JsonUtil.loadFileStrictRequired; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -15,7 +14,7 @@ import com.google.bos.udmi.service.access.IotAccessProvider; import com.google.bos.udmi.service.core.BridgeProcessor; import com.google.bos.udmi.service.core.ControlProcessor; -import com.google.bos.udmi.service.core.CronJob; +import com.google.bos.udmi.service.core.CronProcessor; import com.google.bos.udmi.service.core.DistributorPipe; import com.google.bos.udmi.service.core.ProcessorBase; import com.google.bos.udmi.service.core.ReflectProcessor; @@ -63,10 +62,10 @@ public UdmiServicePod(String[] args) { try { checkState(args.length == 1, "expected exactly one argument: configuration_file"); + ifNotNullThen(podConfiguration.distributors, dist -> dist.forEach(this::createDistributor)); + ifNotNullThen(podConfiguration.iot_access, access -> access.forEach(this::createAccess)); ifNotNullThen(podConfiguration.flows, flows -> flows.forEach(this::createFlow)); ifNotNullThen(podConfiguration.bridges, bridges -> bridges.forEach(this::createBridge)); - ifNotNullThen(podConfiguration.iot_access, access -> access.forEach(this::createAccess)); - ifNotNullThen(podConfiguration.distributors, dist -> dist.forEach(this::createDistributor)); ifNotNullThen(podConfiguration.crons, dist -> dist.forEach(this::createCron)); } catch (Exception e) { throw new RuntimeException("Fatal error instantiating pod " + CSV_JOINER.join(args), e); @@ -184,7 +183,7 @@ private static void setConfigName(EndpointConfiguration config, String name) { private void createCron(String name, EndpointConfiguration config) { setConfigName(config, name); - putComponent(name, () -> ProcessorBase.create(CronJob.class, makeConfig(config))); + putComponent(name, () -> ProcessorBase.create(CronProcessor.class, makeConfig(config))); } private void createDistributor(String name, EndpointConfiguration config) {