Skip to content

Commit

Permalink
perf(server): avoid rocksdb range scan on deletewfrun (#1259)
Browse files Browse the repository at this point in the history
- Intelligently remember which NodeRun's and TaskRun's and variables there were
- No range scans over prefix
- Journal externalEventId's since we can't guess them
  • Loading branch information
coltmcnealy-lh authored Jan 21, 2025
1 parent d1a36a7 commit 623bf03
Show file tree
Hide file tree
Showing 17 changed files with 1,488 additions and 99 deletions.
17 changes: 17 additions & 0 deletions schemas/internal/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ option java_multiple_files = true;
option java_package = "io.littlehorse.common.proto";

import "google/protobuf/timestamp.proto";
import "object_id.proto";


enum LHStoreType {
Expand All @@ -23,6 +24,7 @@ enum StoreableType {
PARTITION_METRICS = 6;
METRIC_WINDOW = 7;
INITIALIZATION_LOG = 8;
WFRUN_STORED_INVENTORY = 9;
}

enum GetableClassEnum {
Expand Down Expand Up @@ -72,6 +74,21 @@ message TagPb {
// the code a bit and may just be premature optimization.
}

// This is a hack to remove the need for doing range scans during the WfRun Deletion
// process. We can iterate through nodeRun's just by knowing their id's; however, it
// turns out that we can't guess all of the External Event Id's without journalling
// them somewhere.
message WfRunStoredInventory {
WfRunId wf_run_id = 1;
repeated ExternalEventId external_events = 3;

// Don't need to store NodeRunId's because we can get them from the WfRun
// Don't need to store TaskRunid's because we can get them from the NodeRun
// Don't need to store Variable's because we can get them from the WfSpec
// Don't need to store UserTaskRun's because we can get them from the NodeRun
// Don't need to store WorkflowEvent's because we can get them from the NodeRun's
}

message TagsCachePb {
message CachedTagPb {
string id = 1;
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/io/littlehorse/common/Storeable.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public static StoreableType getStoreableType(Class<? extends Storeable<?>> cls)
return StoreableType.METRIC_WINDOW;
case "InitializationLogModel":
return StoreableType.INITIALIZATION_LOG;
case "WfRunStoredInventoryModel":
return StoreableType.WFRUN_STORED_INVENTORY;
}
throw new IllegalArgumentException("Unrecognized Storeable class: " + cls);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
import io.littlehorse.common.LHSerializable;
import io.littlehorse.common.LHServerConfig;
import io.littlehorse.common.model.corecommand.CoreSubCommand;
import io.littlehorse.common.model.getable.core.events.WorkflowEventModel;
import io.littlehorse.common.model.getable.core.externalevent.ExternalEventModel;
import io.littlehorse.common.model.getable.CoreObjectId;
import io.littlehorse.common.model.getable.core.noderun.NodeRunModel;
import io.littlehorse.common.model.getable.core.taskrun.TaskRunModel;
import io.littlehorse.common.model.getable.core.usertaskrun.UserTaskRunModel;
import io.littlehorse.common.model.getable.core.variable.VariableModel;
import io.littlehorse.common.model.getable.core.wfrun.ThreadRunModel;
import io.littlehorse.common.model.getable.core.wfrun.WfRunModel;
import io.littlehorse.common.model.getable.global.wfspec.thread.ThreadSpecModel;
import io.littlehorse.common.model.getable.global.wfspec.thread.ThreadVarDefModel;
import io.littlehorse.common.model.getable.objectId.VariableIdModel;
import io.littlehorse.common.model.getable.objectId.WfRunIdModel;
import io.littlehorse.sdk.common.proto.DeleteWfRunRequest;
import io.littlehorse.sdk.common.proto.WfRunVariableAccessLevel;
import io.littlehorse.server.streams.storeinternals.GetableManager;
import io.littlehorse.server.streams.topology.core.ExecutionContext;
import io.littlehorse.server.streams.topology.core.ProcessorExecutionContext;
import java.util.Optional;

public class DeleteWfRunRequestModel extends CoreSubCommand<DeleteWfRunRequest> {

Expand All @@ -40,14 +44,44 @@ public String getPartitionKey() {

@Override
public Empty process(ProcessorExecutionContext executionContext, LHServerConfig config) {
executionContext.getableManager().delete(wfRunId);
executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), TaskRunModel.class);
executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), VariableModel.class);
executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), ExternalEventModel.class);
executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), UserTaskRunModel.class);
executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), WorkflowEventModel.class);
executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), NodeRunModel.class);
GetableManager manager = executionContext.getableManager();
WfRunModel wfRun = manager.get(wfRunId);
if (wfRun == null) return Empty.getDefaultInstance();

// We use our brains here to delete things we know are there rather than using a range scan.
// Better to rely on our brainpower rather than RocksDB, since RocksDB gets grumpy when we ask
// it to do work.
for (ThreadRunModel thread : wfRun.getThreadRunsUseMeCarefully()) {
for (int i = 0; i <= thread.getCurrentNodePosition(); i++) {
NodeRunModel nodeRun = thread.getNodeRun(i);

// Delete things created by the NodeRun, eg TaskRun / UserTaskRun / WorkflowEvent
Optional<? extends CoreObjectId<?, ?, ?>> createdGetable =
nodeRun.getSubNodeRun().getCreatedSubGetableId();
if (createdGetable.isPresent()) {
manager.delete((CoreObjectId<?, ?, ?>) createdGetable.get());
}

// Delete the NodeRun
manager.delete(nodeRun.getObjectId());
}

// Delete the variables belonging to that ThreadRun
ThreadSpecModel threadSpec = thread.getThreadSpec();
for (ThreadVarDefModel varDef : threadSpec.getVariableDefs()) {
if (varDef.getAccessLevel() == WfRunVariableAccessLevel.INHERITED_VAR) continue;

VariableIdModel id = new VariableIdModel(
wfRunId, thread.getNumber(), varDef.getVarDef().getName());
manager.delete(id);
}
}

// Delete the ExternalEvents, which can be done by the GetableManager itself
manager.deleteAllExternalEventsFor(wfRunId);

// Now we delete the WfRun itself
executionContext.getableManager().delete(wfRunId);
return Empty.getDefaultInstance();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.Message;
import io.littlehorse.common.LHSerializable;
import io.littlehorse.common.model.getable.CoreObjectId;
import io.littlehorse.common.model.getable.core.noderun.NodeFailureException;
import io.littlehorse.common.model.getable.core.noderun.NodeRunModel;
import io.littlehorse.common.model.getable.core.variable.VariableValueModel;
Expand Down Expand Up @@ -99,4 +100,12 @@ public WfRunModel getWfRun() {
public NodeModel getNode() {
return nodeRun.getNode();
}

/**
* Returns the created sub-Getable for this NodeRunModel. For example, a TaskNodeRunModel would
* return the associated TaskRunModel.
*/
public Optional<? extends CoreObjectId<?, ?, ?>> getCreatedSubGetableId() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,9 @@ public boolean maybeHalt(ProcessorExecutionContext processorContext) {
// For now, we can't interrupt a TaskRun until it's fully done.
return !processorContext.getableManager().get(getTaskRunId()).isStillRunning();
}

@Override
public Optional<TaskRunIdModel> getCreatedSubGetableId() {
return Optional.ofNullable(taskRunId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,9 @@ private Optional<WorkflowEventModel> getCurrentWorkflowEventId(
.max(Comparator.comparingInt(
workflowEvent -> workflowEvent.getId().getId()));
}

@Override
public Optional<WorkflowEventIdModel> getCreatedSubGetableId() {
return Optional.ofNullable(workflowEventId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,9 @@ public void arrive(Date time, ProcessorExecutionContext processorContext) throws
out.onArrival(time);
processorContext.getableManager().put(out);
}

@Override
public Optional<UserTaskRunIdModel> getCreatedSubGetableId() {
return Optional.ofNullable(userTaskRunId);
}
}
83 changes: 50 additions & 33 deletions server/src/main/java/io/littlehorse/common/proto/Storage.java

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 623bf03

Please sign in to comment.