From d750fb6c7b9648dc0661a6439208147a13bf663a Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Wed, 31 Jan 2024 11:27:07 -0500 Subject: [PATCH] Support principal ids for e2e --- .../core/processors/CommandProcessor.java | 10 +++---- .../server/TestProcessorExecutionContext.java | 2 ++ .../core/processors/CommandProcessorTest.java | 29 ++++++++++++++----- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java b/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java index 4c565c35f..efaa11f4b 100644 --- a/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java +++ b/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java @@ -140,7 +140,12 @@ private ProcessorExecutionContext buildExecutionContext(Record } public void onPartitionClaimed() { + if (partitionIsClaimed) { + throw new RuntimeException("Re-claiming partition! Yikes!"); + } + partitionIsClaimed = true; ClusterScopedStore clusterStore = ClusterScopedStore.newInstance(this.globalStore, new BackgroundContext()); + rehydrateTenant(new TenantModel(LHConstants.DEFAULT_TENANT)); try (LHKeyValueIterator storedTenants = clusterStore.range( GetableClassEnum.TENANT.getNumber() + "/", GetableClassEnum.TENANT.getNumber() + "/~", @@ -155,11 +160,6 @@ public void onPartitionClaimed() { private void rehydrateTenant(TenantModel tenant) { TenantScopedStore coreDefaultStore = TenantScopedStore.newInstance(this.nativeStore, tenant.getId(), new BackgroundContext()); - if (partitionIsClaimed) { - throw new RuntimeException("Re-claiming partition! Yikes!"); - } - partitionIsClaimed = true; - try (LHKeyValueIterator iter = coreDefaultStore.prefixScan("", ScheduledTaskModel.class)) { while (iter.hasNext()) { LHIterKeyValue next = iter.next(); diff --git a/server/src/test/java/io/littlehorse/server/TestProcessorExecutionContext.java b/server/src/test/java/io/littlehorse/server/TestProcessorExecutionContext.java index a927d2019..23f47d02e 100644 --- a/server/src/test/java/io/littlehorse/server/TestProcessorExecutionContext.java +++ b/server/src/test/java/io/littlehorse/server/TestProcessorExecutionContext.java @@ -31,6 +31,7 @@ public class TestProcessorExecutionContext extends ProcessorExecutionContext { private final TenantScopedStore tenantMetadataStore; private final ClusterScopedStore clusterMetadataStore; private final Headers recordMetadata; + private final KafkaStreamsServerImpl server; public TestProcessorExecutionContext( Command currentCommand, @@ -47,6 +48,7 @@ public TestProcessorExecutionContext( this.globalTaskQueueManager = globalTaskQueueManager; String tenantId = HeadersUtil.tenantIdFromMetadata(recordMetadata); + this.server = server; this.coreStore = TenantScopedStore.newInstance( processorContext.getStateStore(ServerTopology.CORE_STORE), tenantId, this); diff --git a/server/src/test/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessorTest.java b/server/src/test/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessorTest.java index 0039c5668..74ef0a510 100644 --- a/server/src/test/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessorTest.java +++ b/server/src/test/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessorTest.java @@ -72,7 +72,8 @@ public class CommandProcessorTest { private final MockProcessorContext mockProcessorContext = new MockProcessorContext<>(); - private TestProcessorExecutionContext processorContext; + private TestProcessorExecutionContext tenantProcessorContext; + private TestProcessorExecutionContext defaultProcessorContext; @BeforeEach public void setup() { @@ -85,22 +86,34 @@ void supportTaskQueueRehydrationOnInitialization() { RunWfRequest.newBuilder().setWfSpecName("name").build(); Command commandToExecute = Command.newBuilder().setRunWf(runWfSubCommand).build(); - processorContext = TestProcessorExecutionContext.create( + + tenantProcessorContext = TestProcessorExecutionContext.create( commandToExecute, HeadersUtil.metadataHeadersFor("my-tenant", "tyler"), mockProcessorContext); + defaultProcessorContext = new TestProcessorExecutionContext( + commandToExecute, + HeadersUtil.metadataHeadersFor(LHConstants.DEFAULT_TENANT, LHConstants.DEFAULT_TENANT), + tenantProcessorContext.getLhConfig(), + mockProcessorContext, + tenantProcessorContext.getGlobalTaskQueueManager(), + tenantProcessorContext.getMetadataCache(), + tenantProcessorContext.getServer()); + defaultProcessorContext.getableManager(); ClusterScopedStore clusterStore = ClusterScopedStore.newInstance( mockProcessorContext.getStateStore(ServerTopology.GLOBAL_METADATA_STORE), executionContext); NodeRunModel nodeRun = TestUtil.nodeRun(); UserTaskRunModel userTaskRunModel = - TestUtil.userTaskRun(UUID.randomUUID().toString(), nodeRun, processorContext); - processorContext.getableManager().put(nodeRun); + TestUtil.userTaskRun(UUID.randomUUID().toString(), nodeRun, tenantProcessorContext); + tenantProcessorContext.getableManager().put(nodeRun); final ScheduledTaskModel scheduledTask = new ScheduledTaskModel( - TestUtil.taskDef("my-task").getObjectId(), List.of(), userTaskRunModel, processorContext); - processorContext.getTaskManager().scheduleTask(scheduledTask); - processorContext.endExecution(); + TestUtil.taskDef("my-task").getObjectId(), List.of(), userTaskRunModel, tenantProcessorContext); + tenantProcessorContext.getTaskManager().scheduleTask(scheduledTask); + defaultProcessorContext.getTaskManager().scheduleTask(scheduledTask); + tenantProcessorContext.endExecution(); + defaultProcessorContext.endExecution(); defaultStore.put(scheduledTask); clusterStore.put(new StoredGetable<>(new TenantModel("my-tenant"))); commandProcessor.init(mockProcessorContext); - verify(server, times(1)) + verify(server, times(2)) .onTaskScheduled( eq(scheduledTask.getTaskDefId()), any(), eq(new TenantIdModel(LHConstants.DEFAULT_TENANT))); }