Skip to content

Commit

Permalink
Merge branch 'master' into documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Mar 2, 2024
2 parents a7a9882 + b6218e6 commit 613f485
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 78 deletions.
2 changes: 2 additions & 0 deletions bin/start_hivemq
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/bin/bash -e

sudo systemctl stop mosquitto || true

docker run -p 1883:1883 hivemq/hivemq-ce
1 change: 1 addition & 0 deletions udmis/etc/prod_pod.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
},
"crons": {
"reglist": {
"periodic_sec": 10,
"payload": "query/cloud:{}",
"send_id": "${UDMI_PREFIX}udmi_control"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.google.bos.udmi.service.core;

import static com.google.udmi.util.JsonUtil.stringifyTerse;
import static java.lang.String.format;

import java.util.Objects;
Expand All @@ -18,4 +19,10 @@ public ControlProcessor(EndpointConfiguration config) {
private static String makeTransactionId() {
return format("CP:%08x", Objects.hash(System.currentTimeMillis(), Thread.currentThread()));
}

@Override
protected void defaultHandler(Object message) {
debug("Received defaulted control message type %s: %s", message.getClass().getSimpleName(),
stringifyTerse(message));
}
}
69 changes: 0 additions & 69 deletions udmis/src/main/java/com/google/bos/udmi/service/core/CronJob.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.google.bos.udmi.service.core;

import static com.google.udmi.util.GeneralUtils.CSV_JOINER;
import static com.google.udmi.util.GeneralUtils.ifTrueGet;
import static com.google.udmi.util.GeneralUtils.ifTrueThen;
import static com.google.udmi.util.JsonUtil.fromStringStrict;
import static com.google.udmi.util.JsonUtil.isoConvert;
import static com.google.udmi.util.JsonUtil.stringifyTerse;

import com.google.bos.udmi.service.messaging.impl.MessageDispatcherImpl;
import com.google.udmi.util.JsonUtil;
import java.time.Duration;
import java.util.Date;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
import udmi.schema.EndpointConfiguration;
import udmi.schema.Envelope;
import udmi.schema.Envelope.SubFolder;
import udmi.schema.Envelope.SubType;

/**
* Simple clas to manage a distributed cron execution environment.
*/
public class CronProcessor extends ProcessorBase {

public static final String PAYLOAD_SEPARATOR = ":";
public static final String PATH_SEPARATOR = "/";
private final Envelope srcEnvelope;
private final Object message;
private final SortedMap<String, Date> received = new ConcurrentSkipListMap<>();
private final Duration waitAndListen;
private final AtomicReference<Date> previousTick = new AtomicReference<>();

/**
* Create an instance with the given config.
*/
public CronProcessor(EndpointConfiguration config) {
super(config);

distributorName = config.distributor;
waitAndListen = Duration.ofSeconds(periodicSec / 2);

String[] targetMessage = config.payload.split(PAYLOAD_SEPARATOR, 2);

String[] parts = targetMessage[0].split(PATH_SEPARATOR, 4);
srcEnvelope = new Envelope();
srcEnvelope.subType = SubType.fromValue(parts[0]);
srcEnvelope.subFolder = SubFolder.fromValue(parts[1]);
srcEnvelope.deviceRegistryId = ifTrueGet(parts.length >= 3, () -> parts[3]);
srcEnvelope.deviceId = ifTrueGet(parts.length >= 4, () -> parts[4]);

Class<?> messageClass = MessageDispatcherImpl.getMessageClassFor(srcEnvelope, false);
String payload = ifTrueGet(targetMessage.length > 1, () -> targetMessage[1], EMPTY_JSON);
message = fromStringStrict(messageClass, payload);
}

@Override
protected void defaultHandler(Object message) {
trackPod(getContinuation(message).getEnvelope());
}

@Override
protected void periodicTask() {
info("Distributing %s %s/%s to %s/%s", containerId, srcEnvelope.subType, srcEnvelope.subFolder,
srcEnvelope.deviceRegistryId, srcEnvelope.deviceId);

Date publishTime = new Date();
srcEnvelope.publishTime = publishTime;

distributor.publish(srcEnvelope, message, containerId);

trackPod(srcEnvelope);

Date before = previousTick.getAndSet(publishTime);

// Wait for half the window, to make sure everybody has a chance to report in.
scheduleIn(waitAndListen, () -> ifTrueThen(isAmGroot(before), this::processGroot));
}

private boolean isAmGroot(Date cutoffTime) {
debug("Check grootness for %s after %s", srcEnvelope.gatewayId, isoConvert(cutoffTime));

if (cutoffTime == null) {
return false;
}

debug("Received values: " + received.size() + " " + CSV_JOINER.join(
received.values().stream().map(JsonUtil::isoConvert).toList()));
received.entrySet().removeIf(entry -> entry.getValue().before(cutoffTime));
debug("Received %s is groot: %s", received.firstKey(), CSV_JOINER.join(received.keySet()));
return srcEnvelope.gatewayId.equals(received.firstKey());
}

private void processGroot() {
debug("Publishing as %s: %s", stringifyTerse(srcEnvelope), stringifyTerse(message));
publish(srcEnvelope, message);
}

private void trackPod(Envelope envelope) {
debug("Pod timestamp update %s to %s", envelope.gatewayId, isoConvert(envelope.publishTime));
received.put(envelope.gatewayId, envelope.publishTime);
debug("Received values: " + received.size() + " " + CSV_JOINER.join(
received.values().stream().map(JsonUtil::isoConvert).toList()));
}

@Override
public void activate() {
super.activate();
srcEnvelope.gatewayId = distributor.getRouteId(containerId);
info("Activated cron as %s", stringifyTerse(srcEnvelope));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.google.bos.udmi.service.core;

import static com.google.udmi.util.GeneralUtils.deepCopy;
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
import static com.google.udmi.util.JsonUtil.stringifyTerse;
import static java.lang.String.format;

Expand Down Expand Up @@ -41,12 +40,15 @@ public DistributorPipe(EndpointConfiguration config) {
@Override
protected void defaultHandler(Object message) {
Envelope envelope = getContinuation(message).getEnvelope();
debug("Handling distribution from " + stringifyTerse(envelope));
try {
String[] routeId = envelope.gatewayId.split(ROUTE_SEPERATOR, 2);
if (clientId.equals(routeId[0])) {
debug("Rejecting loopback client " + clientId);
return;
}
Object component = UdmiServicePod.getComponent(routeId[1]);
debug("Routing result to " + component.getClass().getSimpleName());
if (component instanceof ProcessorBase processorBase) {
processorBase.processMessage(envelope, message);
} else {
Expand All @@ -63,14 +65,17 @@ protected void defaultHandler(Object message) {
public void publish(Envelope rawEnvelope, Object message, String source) {
try {
Envelope envelope = deepCopy(rawEnvelope);
String routeId = format("%s%s%s", clientId, ROUTE_SEPERATOR, source);
String routeId = getRouteId(source);
debug("Distributing %s for %s/%s as %s", message.getClass().getSimpleName(),
envelope.deviceRegistryId, envelope.deviceId, routeId);
envelope.gatewayId = routeId;
super.publish(envelope, message);
publish(envelope, message);
} catch (Exception e) {
error("Error distributing update: " + friendlyStackTrace(e));
throw new RuntimeException("Error distributing update", e);
}
}

public String getRouteId(String source) {
return format("%s%s%s", clientId, ROUTE_SEPERATOR, source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.udmi.util.JsonUtil;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -157,6 +158,11 @@ private void periodicTaskWrapper() {
}

protected void periodicTask() {
if (scheduledExecutor != null && !scheduledExecutor.isShutdown()) {
debug("Shutting down unused scheduled executor");
scheduledExecutor.shutdown();
return;
}
throw new IllegalStateException("Unexpected periodic task execution");
}

Expand Down Expand Up @@ -225,6 +231,10 @@ public void activate() {
});
}

protected void scheduleIn(Duration duration, Runnable task) {
scheduledExecutor.schedule(task, duration.getSeconds(), TimeUnit.SECONDS);
}

public void debug(String format, Object... args) {
debug(format(format, args));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
import static com.google.udmi.util.GeneralUtils.ifNotNullGet;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
import static com.google.udmi.util.GeneralUtils.ifNotNullThrow;
import static com.google.udmi.util.JsonUtil.loadFileStrictRequired;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

import com.google.bos.udmi.service.access.IotAccessProvider;
import com.google.bos.udmi.service.core.BridgeProcessor;
import com.google.bos.udmi.service.core.ControlProcessor;
import com.google.bos.udmi.service.core.CronJob;
import com.google.bos.udmi.service.core.CronProcessor;
import com.google.bos.udmi.service.core.DistributorPipe;
import com.google.bos.udmi.service.core.ProcessorBase;
import com.google.bos.udmi.service.core.ReflectProcessor;
Expand Down Expand Up @@ -63,10 +62,10 @@ public UdmiServicePod(String[] args) {
try {
checkState(args.length == 1, "expected exactly one argument: configuration_file");

ifNotNullThen(podConfiguration.distributors, dist -> dist.forEach(this::createDistributor));
ifNotNullThen(podConfiguration.iot_access, access -> access.forEach(this::createAccess));
ifNotNullThen(podConfiguration.flows, flows -> flows.forEach(this::createFlow));
ifNotNullThen(podConfiguration.bridges, bridges -> bridges.forEach(this::createBridge));
ifNotNullThen(podConfiguration.iot_access, access -> access.forEach(this::createAccess));
ifNotNullThen(podConfiguration.distributors, dist -> dist.forEach(this::createDistributor));
ifNotNullThen(podConfiguration.crons, dist -> dist.forEach(this::createCron));
} catch (Exception e) {
throw new RuntimeException("Fatal error instantiating pod " + CSV_JOINER.join(args), e);
Expand Down Expand Up @@ -184,7 +183,7 @@ private static void setConfigName(EndpointConfiguration config, String name) {

private void createCron(String name, EndpointConfiguration config) {
setConfigName(config, name);
putComponent(name, () -> ProcessorBase.create(CronJob.class, makeConfig(config)));
putComponent(name, () -> ProcessorBase.create(CronProcessor.class, makeConfig(config)));
}

private void createDistributor(String name, EndpointConfiguration config) {
Expand Down

0 comments on commit 613f485

Please sign in to comment.