Skip to content

Commit

Permalink
Merge branch 'master' into runlocal
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Apr 17, 2024
2 parents c3971ba + 7a04afa commit e490751
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 30 deletions.
1 change: 0 additions & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ jobs:
runlocal:
name: UDMIS Local Setup
runs-on: ubuntu-latest
needs: pretest
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.google.bos.udmi.service.core;

import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.GeneralUtils.deepCopy;
import static com.google.udmi.util.GeneralUtils.ifTrueThen;
import static com.google.udmi.util.GeneralUtils.isTrue;
import static com.google.udmi.util.GeneralUtils.requireNull;
import static com.google.udmi.util.GeneralUtils.toDate;
import static com.google.udmi.util.JsonUtil.stringifyTerse;
import static java.util.Objects.requireNonNull;

import com.google.bos.udmi.service.access.IotAccessBase;
Expand All @@ -28,16 +31,26 @@ public class CloudQueryHandler {
private final ControlProcessor controller;
private final IotAccessBase iotAccess;
private final TargetProcessor target;
private CloudQuery query;
private Envelope envelope;
private final CloudQuery query;
private final Envelope envelope;
private final String savedEnvelope;
private final String savedQuery;

/**
* Create a query handler for cloud queries.
*/
public CloudQueryHandler(ControlProcessor controlProcessor) {
public CloudQueryHandler(ControlProcessor controlProcessor, CloudQuery cloudQuery) {
controller = controlProcessor;
iotAccess = controller.iotAccess;
target = controller.targetProcessor;
query = cloudQuery;
envelope = controller.getContinuation(cloudQuery).getEnvelope();
savedQuery = stringifyTerse(query);
savedEnvelope = stringifyTerse(envelope);
}

public static void processQuery(ControlProcessor controlProcessor, CloudQuery query) {
new CloudQueryHandler(controlProcessor, query).process();
}

@NotNull
Expand All @@ -58,10 +71,9 @@ private void issueModifiedDevice(String deviceId) {
}

private void issueModifiedQuery(Consumer<Envelope> mutator) {
CloudQuery cloudQuery = new CloudQuery();
cloudQuery.generation = query.generation;
mutator.accept(envelope);
controller.sideProcess(envelope, cloudQuery);
Envelope mutated = deepCopy(envelope);
mutator.accept(mutated);
controller.sideProcess(mutated, query);
}

private void issueModifiedRegistry(String registryId) {
Expand Down Expand Up @@ -151,20 +163,15 @@ private boolean shouldTraverseRegistries() {
/**
* Process an individual cloud query.
*/
public synchronized void process(CloudQuery newQuery) {
try {
query = newQuery;
envelope = controller.getContinuation(newQuery).getEnvelope();
if (envelope.deviceRegistryId == null) {
queryAllRegistries();
} else if (envelope.deviceId == null) {
queryRegistryDevices();
} else {
queryDeviceDetails();
}
} finally {
query = null;
envelope = null;
public synchronized void process() {
if (envelope.deviceRegistryId == null) {
queryAllRegistries();
} else if (envelope.deviceId == null) {
queryRegistryDevices();
} else {
queryDeviceDetails();
}
checkState(savedEnvelope.equals(stringifyTerse(envelope)), "mutated envelope");
checkState(savedQuery.equals(stringifyTerse(query)), "mutated query");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
public class ControlProcessor extends ProcessorBase {

TargetProcessor targetProcessor;
private CloudQueryHandler cloudQueryHandler;

public ControlProcessor(EndpointConfiguration config) {
super(config);
Expand All @@ -21,14 +20,13 @@ public ControlProcessor(EndpointConfiguration config) {
public void activate() {
super.activate();
targetProcessor = UdmiServicePod.getComponent(TargetProcessor.class);
cloudQueryHandler = new CloudQueryHandler(this);
}

/**
* Handle a cloud query command.
*/
@MessageHandler
public void cloudQueryHandler(CloudQuery query) {
cloudQueryHandler.process(query);
CloudQueryHandler.processQuery(this, query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ class CloudQueryHandlerTest implements MessageContinuation {
private static final Date QUERY_GENERATION = new Date();
private final ControlProcessor controlProcessor = mock(ControlProcessor.class);
private final Envelope envelope = new Envelope();
private CloudQueryHandler queryHandler;
private final ArgumentCaptor<Object> targetCapture = ArgumentCaptor.forClass(Object.class);
private final ArgumentCaptor<Object> controlCapture = ArgumentCaptor.forClass(Object.class);
private final ArgumentCaptor<Envelope> envelopeCapture = ArgumentCaptor.forClass(Envelope.class);
private final Set<String> mockRegistries = ImmutableSet.of(TEST_REGISTRY);
private final CloudQuery query = new CloudQuery();
private CloudQueryHandler queryHandler;

@Override
public Envelope getEnvelope() {
Expand All @@ -49,7 +49,7 @@ public void publish(Object message) {

@Test
public void queryAllRegistries() {
queryHandler.process(query);
queryHandler.process();

List<Object> targetMessages = targetCapture.getAllValues();
assertEquals(1, targetMessages.size(), "published messages");
Expand All @@ -72,6 +72,9 @@ public void queryAllRegistries() {

@BeforeEach
public void setupMock() {
query.generation = QUERY_GENERATION;
query.depth = Depth.ENTRIES;

doReturn(this).when(controlProcessor).getContinuation(eq(query));
doNothing().when(controlProcessor).publish(targetCapture.capture());
doNothing().when(controlProcessor)
Expand All @@ -84,8 +87,7 @@ public void setupMock() {
TargetProcessor targetProcessor = mock(TargetProcessor.class);
controlProcessor.targetProcessor = targetProcessor;
doReturn(LAST_SEEN).when(targetProcessor).getLastSeen(eq(TEST_REGISTRY));
queryHandler = new CloudQueryHandler(controlProcessor);
query.generation = QUERY_GENERATION;
query.depth = Depth.ENTRIES;

queryHandler = new CloudQueryHandler(controlProcessor, query);
}
}

0 comments on commit e490751

Please sign in to comment.