Skip to content

Commit

Permalink
Individual tenant rehydration (#632)
Browse files Browse the repository at this point in the history
Solves #557
  • Loading branch information
eduwercamacaro authored Jan 29, 2024
1 parent 61e7f60 commit b75727d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
import io.littlehorse.common.model.PartitionMetricsModel;
import io.littlehorse.common.model.ScheduledTaskModel;
import io.littlehorse.common.model.corecommand.CommandModel;
import io.littlehorse.common.model.getable.global.acl.TenantModel;
import io.littlehorse.common.model.getable.objectId.TenantIdModel;
import io.littlehorse.common.model.repartitioncommand.RepartitionCommand;
import io.littlehorse.common.model.repartitioncommand.RepartitionSubCommand;
import io.littlehorse.common.model.repartitioncommand.repartitionsubcommand.AggregateTaskMetricsModel;
import io.littlehorse.common.model.repartitioncommand.repartitionsubcommand.AggregateWfMetricsModel;
import io.littlehorse.common.proto.Command;
import io.littlehorse.common.proto.GetableClassEnum;
import io.littlehorse.common.proto.Tenant;
import io.littlehorse.common.proto.WaitForCommandResponse;
import io.littlehorse.common.util.LHUtil;
import io.littlehorse.server.KafkaStreamsServerImpl;
import io.littlehorse.server.streams.ServerTopology;
import io.littlehorse.server.streams.store.LHIterKeyValue;
import io.littlehorse.server.streams.store.LHKeyValueIterator;
import io.littlehorse.server.streams.store.StoredGetable;
import io.littlehorse.server.streams.stores.ClusterScopedStore;
import io.littlehorse.server.streams.stores.TenantScopedStore;
import io.littlehorse.server.streams.taskqueue.TaskQueueManager;
Expand Down Expand Up @@ -48,6 +52,7 @@ public class CommandProcessor implements Processor<String, Command, String, Comm
private final TaskQueueManager globalTaskQueueManager;

private KeyValueStore<String, Bytes> nativeStore;
private KeyValueStore<String, Bytes> globalStore;
private boolean partitionIsClaimed;

public CommandProcessor(
Expand All @@ -65,6 +70,7 @@ public CommandProcessor(
public void init(final ProcessorContext<String, CommandProcessorOutput> ctx) {
this.ctx = ctx;
this.nativeStore = ctx.getStateStore(ServerTopology.CORE_STORE);
this.globalStore = ctx.getStateStore(ServerTopology.GLOBAL_METADATA_STORE);
onPartitionClaimed();
ctx.schedule(Duration.ofSeconds(30), PunctuationType.WALL_CLOCK_TIME, this::forwardMetricsUpdates);
}
Expand Down Expand Up @@ -93,7 +99,6 @@ private void processHelper(final Record<String, Command> commandRecord) {

try {
Message response = command.process(executionContext, config);
// coreDao.commit();
executionContext.endExecution();
if (command.hasResponse() && command.getCommandId() != null) {
WaitForCommandResponse cmdReply = WaitForCommandResponse.newBuilder()
Expand Down Expand Up @@ -165,8 +170,21 @@ private boolean isUserError(Exception exn) {
}

public void onPartitionClaimed() {
ClusterScopedStore clusterStore = ClusterScopedStore.newInstance(this.globalStore, new BackgroundContext());
try (LHKeyValueIterator<?> storedTenants = clusterStore.range(
GetableClassEnum.TENANT.getNumber() + "/",
GetableClassEnum.TENANT.getNumber() + "/~",
StoredGetable.class)) {
storedTenants.forEachRemaining(getable -> {
TenantModel storedTenant = ((StoredGetable<Tenant, TenantModel>) getable.getValue()).getStoredObject();
rehydrateTenant(storedTenant);
});
}
}

private void rehydrateTenant(TenantModel tenant) {
TenantScopedStore coreDefaultStore =
TenantScopedStore.newInstance(this.nativeStore, LHConstants.DEFAULT_TENANT, new BackgroundContext());
TenantScopedStore.newInstance(this.nativeStore, tenant.getId(), new BackgroundContext());
if (partitionIsClaimed) {
throw new RuntimeException("Re-claiming partition! Yikes!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
import io.littlehorse.common.model.ScheduledTaskModel;
import io.littlehorse.common.model.getable.core.noderun.NodeRunModel;
import io.littlehorse.common.model.getable.core.usertaskrun.UserTaskRunModel;
import io.littlehorse.common.model.getable.global.acl.TenantModel;
import io.littlehorse.common.model.getable.objectId.TenantIdModel;
import io.littlehorse.common.proto.Command;
import io.littlehorse.sdk.common.proto.RunWfRequest;
import io.littlehorse.server.KafkaStreamsServerImpl;
import io.littlehorse.server.TestProcessorExecutionContext;
import io.littlehorse.server.streams.ServerTopology;
import io.littlehorse.server.streams.store.StoredGetable;
import io.littlehorse.server.streams.stores.ClusterScopedStore;
import io.littlehorse.server.streams.stores.TenantScopedStore;
import io.littlehorse.server.streams.taskqueue.TaskQueueManager;
import io.littlehorse.server.streams.topology.core.CommandProcessorOutput;
Expand Down Expand Up @@ -66,15 +69,14 @@ public class CommandProcessorTest {

private final TenantScopedStore defaultStore =
TenantScopedStore.newInstance(nativeInMemoryStore, LHConstants.DEFAULT_TENANT, executionContext);

private final MockProcessorContext<String, CommandProcessorOutput> mockProcessorContext =
new MockProcessorContext<>();

private TestProcessorExecutionContext processorContext;

@BeforeEach
public void setup() {
nativeInMemoryStore.init(mockProcessorContext.getStateStoreContext(), nativeInMemoryStore);
globalInMemoryStore.init(mockProcessorContext.getStateStoreContext(), globalInMemoryStore);
}

@Test
Expand All @@ -84,9 +86,9 @@ void supportTaskQueueRehydrationOnInitialization() {
Command commandToExecute =
Command.newBuilder().setRunWf(runWfSubCommand).build();
processorContext = TestProcessorExecutionContext.create(
commandToExecute,
HeadersUtil.metadataHeadersFor(LHConstants.DEFAULT_TENANT, LHConstants.ANONYMOUS_PRINCIPAL),
mockProcessorContext);
commandToExecute, HeadersUtil.metadataHeadersFor("my-tenant", "tyler"), mockProcessorContext);
ClusterScopedStore clusterStore = ClusterScopedStore.newInstance(
mockProcessorContext.getStateStore(ServerTopology.GLOBAL_METADATA_STORE), executionContext);
NodeRunModel nodeRun = TestUtil.nodeRun();
UserTaskRunModel userTaskRunModel =
TestUtil.userTaskRun(UUID.randomUUID().toString(), nodeRun, processorContext);
Expand All @@ -96,6 +98,7 @@ void supportTaskQueueRehydrationOnInitialization() {
processorContext.getTaskManager().scheduleTask(scheduledTask);
processorContext.endExecution();
defaultStore.put(scheduledTask);
clusterStore.put(new StoredGetable<>(new TenantModel("my-tenant")));
commandProcessor.init(mockProcessorContext);
verify(server, times(1))
.onTaskScheduled(
Expand Down

0 comments on commit b75727d

Please sign in to comment.