Skip to content

Commit

Permalink
Support principal ids for e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Jan 31, 2024
1 parent d53734f commit d750fb6
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ private ProcessorExecutionContext buildExecutionContext(Record<String, Command>
}

public void onPartitionClaimed() {
if (partitionIsClaimed) {
throw new RuntimeException("Re-claiming partition! Yikes!");
}
partitionIsClaimed = true;
ClusterScopedStore clusterStore = ClusterScopedStore.newInstance(this.globalStore, new BackgroundContext());
rehydrateTenant(new TenantModel(LHConstants.DEFAULT_TENANT));
try (LHKeyValueIterator<?> storedTenants = clusterStore.range(
GetableClassEnum.TENANT.getNumber() + "/",
GetableClassEnum.TENANT.getNumber() + "/~",
Expand All @@ -155,11 +160,6 @@ public void onPartitionClaimed() {
private void rehydrateTenant(TenantModel tenant) {
TenantScopedStore coreDefaultStore =
TenantScopedStore.newInstance(this.nativeStore, tenant.getId(), new BackgroundContext());
if (partitionIsClaimed) {
throw new RuntimeException("Re-claiming partition! Yikes!");
}
partitionIsClaimed = true;

try (LHKeyValueIterator<ScheduledTaskModel> iter = coreDefaultStore.prefixScan("", ScheduledTaskModel.class)) {
while (iter.hasNext()) {
LHIterKeyValue<ScheduledTaskModel> next = iter.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class TestProcessorExecutionContext extends ProcessorExecutionContext {
private final TenantScopedStore tenantMetadataStore;
private final ClusterScopedStore clusterMetadataStore;
private final Headers recordMetadata;
private final KafkaStreamsServerImpl server;

public TestProcessorExecutionContext(
Command currentCommand,
Expand All @@ -47,6 +48,7 @@ public TestProcessorExecutionContext(
this.globalTaskQueueManager = globalTaskQueueManager;

String tenantId = HeadersUtil.tenantIdFromMetadata(recordMetadata);
this.server = server;

this.coreStore = TenantScopedStore.newInstance(
processorContext.getStateStore(ServerTopology.CORE_STORE), tenantId, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public class CommandProcessorTest {
private final MockProcessorContext<String, CommandProcessorOutput> mockProcessorContext =
new MockProcessorContext<>();

private TestProcessorExecutionContext processorContext;
private TestProcessorExecutionContext tenantProcessorContext;
private TestProcessorExecutionContext defaultProcessorContext;

@BeforeEach
public void setup() {
Expand All @@ -85,22 +86,34 @@ void supportTaskQueueRehydrationOnInitialization() {
RunWfRequest.newBuilder().setWfSpecName("name").build();
Command commandToExecute =
Command.newBuilder().setRunWf(runWfSubCommand).build();
processorContext = TestProcessorExecutionContext.create(

tenantProcessorContext = TestProcessorExecutionContext.create(
commandToExecute, HeadersUtil.metadataHeadersFor("my-tenant", "tyler"), mockProcessorContext);
defaultProcessorContext = new TestProcessorExecutionContext(
commandToExecute,
HeadersUtil.metadataHeadersFor(LHConstants.DEFAULT_TENANT, LHConstants.DEFAULT_TENANT),
tenantProcessorContext.getLhConfig(),
mockProcessorContext,
tenantProcessorContext.getGlobalTaskQueueManager(),
tenantProcessorContext.getMetadataCache(),
tenantProcessorContext.getServer());
defaultProcessorContext.getableManager();
ClusterScopedStore clusterStore = ClusterScopedStore.newInstance(
mockProcessorContext.getStateStore(ServerTopology.GLOBAL_METADATA_STORE), executionContext);
NodeRunModel nodeRun = TestUtil.nodeRun();
UserTaskRunModel userTaskRunModel =
TestUtil.userTaskRun(UUID.randomUUID().toString(), nodeRun, processorContext);
processorContext.getableManager().put(nodeRun);
TestUtil.userTaskRun(UUID.randomUUID().toString(), nodeRun, tenantProcessorContext);
tenantProcessorContext.getableManager().put(nodeRun);
final ScheduledTaskModel scheduledTask = new ScheduledTaskModel(
TestUtil.taskDef("my-task").getObjectId(), List.of(), userTaskRunModel, processorContext);
processorContext.getTaskManager().scheduleTask(scheduledTask);
processorContext.endExecution();
TestUtil.taskDef("my-task").getObjectId(), List.of(), userTaskRunModel, tenantProcessorContext);
tenantProcessorContext.getTaskManager().scheduleTask(scheduledTask);
defaultProcessorContext.getTaskManager().scheduleTask(scheduledTask);
tenantProcessorContext.endExecution();
defaultProcessorContext.endExecution();
defaultStore.put(scheduledTask);
clusterStore.put(new StoredGetable<>(new TenantModel("my-tenant")));
commandProcessor.init(mockProcessorContext);
verify(server, times(1))
verify(server, times(2))
.onTaskScheduled(
eq(scheduledTask.getTaskDefId()), any(), eq(new TenantIdModel(LHConstants.DEFAULT_TENANT)));
}
Expand Down

0 comments on commit d750fb6

Please sign in to comment.