From bde9d7f4aca0025b8888ffafc400f1f9546f2d43 Mon Sep 17 00:00:00 2001 From: Jacob Snarr Date: Fri, 8 Nov 2024 16:17:37 -0500 Subject: [PATCH 1/6] feat(server): Delete child objects when deleting WfRun (#1091) Cleans up TaskRuns, UserTaskRuns, Variables, WorkflowEvents, ExternalEvents, and NodeRuns when deleting an associated WfRun. --- .../subcommand/DeleteWfRunRequestModel.java | 13 ++ .../storeinternals/GetableManager.java | 14 ++ .../test/java/io/littlehorse/TestUtil.java | 22 +++ .../DeleteWfRunRequestModelTest.java | 130 ++++++++++++++++++ .../storeinternals/GetableManagerTest.java | 18 ++- 5 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 server/src/test/java/io/littlehorse/common/model/metadatacommand/subcommand/DeleteWfRunRequestModelTest.java diff --git a/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/DeleteWfRunRequestModel.java b/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/DeleteWfRunRequestModel.java index 8a0f3ce2c..8bb5d9a79 100644 --- a/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/DeleteWfRunRequestModel.java +++ b/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/DeleteWfRunRequestModel.java @@ -5,6 +5,12 @@ import io.littlehorse.common.LHSerializable; import io.littlehorse.common.LHServerConfig; import io.littlehorse.common.model.corecommand.CoreSubCommand; +import io.littlehorse.common.model.getable.core.events.WorkflowEventModel; +import io.littlehorse.common.model.getable.core.externalevent.ExternalEventModel; +import io.littlehorse.common.model.getable.core.noderun.NodeRunModel; +import io.littlehorse.common.model.getable.core.taskrun.TaskRunModel; +import io.littlehorse.common.model.getable.core.usertaskrun.UserTaskRunModel; +import io.littlehorse.common.model.getable.core.variable.VariableModel; import io.littlehorse.common.model.getable.objectId.WfRunIdModel; import io.littlehorse.sdk.common.proto.DeleteWfRunRequest; import io.littlehorse.server.streams.topology.core.ExecutionContext; @@ -35,6 +41,13 @@ public String getPartitionKey() { @Override public Empty process(ProcessorExecutionContext executionContext, LHServerConfig config) { executionContext.getableManager().delete(wfRunId); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), TaskRunModel.class); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), VariableModel.class); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), ExternalEventModel.class); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), UserTaskRunModel.class); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), WorkflowEventModel.class); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), NodeRunModel.class); + return Empty.getDefaultInstance(); } diff --git a/server/src/main/java/io/littlehorse/server/streams/storeinternals/GetableManager.java b/server/src/main/java/io/littlehorse/server/streams/storeinternals/GetableManager.java index 346eff2a0..8967989bf 100644 --- a/server/src/main/java/io/littlehorse/server/streams/storeinternals/GetableManager.java +++ b/server/src/main/java/io/littlehorse/server/streams/storeinternals/GetableManager.java @@ -115,6 +115,20 @@ public > T delete(CoreObjectId> void deleteAllByPrefix(String prefix, Class cls) { + log.trace("Deleting all {} with prefix {}", cls.getSimpleName(), prefix); + + // Note this iterates in a non-paginated way through all NodeRun's in the + // WfRun. Fine for most use-cases, but if there's a WfRUn that runs for a + // year and has hundreds of tasks per day, it will be a problem. + List> allItems = iterateOverPrefixAndPutInUncommittedChanges(prefix, cls); + + for (GetableToStore itemToDelete : allItems) { + // Marking the objectToStore as null causes the flush() to delete it. + itemToDelete.setObjectToStore(null); + } + } + public void commit() { for (Map.Entry> entry : uncommittedChanges.entrySet()) { String storeableKey = entry.getKey(); diff --git a/server/src/test/java/io/littlehorse/TestUtil.java b/server/src/test/java/io/littlehorse/TestUtil.java index 0110d3ec2..58023a42e 100644 --- a/server/src/test/java/io/littlehorse/TestUtil.java +++ b/server/src/test/java/io/littlehorse/TestUtil.java @@ -1,6 +1,7 @@ package io.littlehorse; import io.littlehorse.common.model.ScheduledTaskModel; +import io.littlehorse.common.model.getable.core.events.WorkflowEventModel; import io.littlehorse.common.model.getable.core.externalevent.ExternalEventModel; import io.littlehorse.common.model.getable.core.noderun.NodeRunModel; import io.littlehorse.common.model.getable.core.taskrun.TaskAttemptModel; @@ -32,6 +33,8 @@ import io.littlehorse.common.model.getable.objectId.UserTaskRunIdModel; import io.littlehorse.common.model.getable.objectId.WfRunIdModel; import io.littlehorse.common.model.getable.objectId.WfSpecIdModel; +import io.littlehorse.common.model.getable.objectId.WorkflowEventDefIdModel; +import io.littlehorse.common.model.getable.objectId.WorkflowEventIdModel; import io.littlehorse.common.proto.GetableClassEnum; import io.littlehorse.common.proto.TagStorageType; import io.littlehorse.sdk.common.proto.*; @@ -234,6 +237,25 @@ public static ExternalEventModel externalEvent() { return externalEvent; } + public static ExternalEventModel externalEvent(String wfRunId) { + ExternalEventModel externalEvent = new ExternalEventModel( + variableValue(), + new WfRunIdModel(wfRunId), + new ExternalEventDefIdModel("test-name"), + "0000001", + null, + null, + null); + return externalEvent; + } + + public static WorkflowEventModel workflowEvent(String wfRunId) { + WorkflowEventModel workflowEvent = new WorkflowEventModel( + new WorkflowEventIdModel(new WfRunIdModel(wfRunId), new WorkflowEventDefIdModel("test-name"), 0), + variableValue()); + return workflowEvent; + } + public static VariableDefModel variableDef(String name, VariableType variableTypePb) { VariableDefModel variableDef = new VariableDefModel(); variableDef.setName(name); diff --git a/server/src/test/java/io/littlehorse/common/model/metadatacommand/subcommand/DeleteWfRunRequestModelTest.java b/server/src/test/java/io/littlehorse/common/model/metadatacommand/subcommand/DeleteWfRunRequestModelTest.java new file mode 100644 index 000000000..8d0c0f3b1 --- /dev/null +++ b/server/src/test/java/io/littlehorse/common/model/metadatacommand/subcommand/DeleteWfRunRequestModelTest.java @@ -0,0 +1,130 @@ +package io.littlehorse.common.model.metadatacommand.subcommand; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +import io.littlehorse.TestUtil; +import io.littlehorse.common.LHConstants; +import io.littlehorse.common.LHServerConfig; +import io.littlehorse.common.model.getable.core.events.WorkflowEventModel; +import io.littlehorse.common.model.getable.core.externalevent.ExternalEventModel; +import io.littlehorse.common.model.getable.core.noderun.NodeRunModel; +import io.littlehorse.common.model.getable.core.taskrun.TaskRunModel; +import io.littlehorse.common.model.getable.core.usertaskrun.UserTaskRunModel; +import io.littlehorse.common.model.getable.core.variable.VariableModel; +import io.littlehorse.common.model.getable.core.wfrun.WfRunModel; +import io.littlehorse.common.model.getable.global.wfspec.WfSpecModel; +import io.littlehorse.common.model.getable.objectId.PrincipalIdModel; +import io.littlehorse.common.model.getable.objectId.TaskDefIdModel; +import io.littlehorse.common.model.getable.objectId.TaskRunIdModel; +import io.littlehorse.common.model.getable.objectId.TenantIdModel; +import io.littlehorse.common.model.getable.objectId.VariableIdModel; +import io.littlehorse.common.proto.Command; +import io.littlehorse.sdk.common.proto.DeleteWfRunRequest; +import io.littlehorse.sdk.common.proto.WfRunId; +import io.littlehorse.server.LHServer; +import io.littlehorse.server.TestProcessorExecutionContext; +import io.littlehorse.server.streams.storeinternals.GetableManager; +import io.littlehorse.server.streams.taskqueue.TaskQueueManager; +import io.littlehorse.server.streams.topology.core.CommandProcessorOutput; +import io.littlehorse.server.streams.topology.core.processors.CommandProcessor; +import io.littlehorse.server.streams.util.HeadersUtil; +import io.littlehorse.server.streams.util.MetadataCache; +import java.util.List; +import java.util.UUID; +import org.apache.kafka.streams.processor.api.MockProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +public class DeleteWfRunRequestModelTest { + + @Mock + private final LHServerConfig lhConfig = mock(); + + @Mock + private final LHServer server = mock(); + + private final MetadataCache metadataCache = new MetadataCache(); + private final TaskQueueManager queueManager = mock(); + private final CommandProcessor processor = new CommandProcessor(lhConfig, server, metadataCache, queueManager); + private final String wfRunId = UUID.randomUUID().toString(); + private final WfRunModel wfRun = TestUtil.wfRun(wfRunId); + private final Command command = commandProto(); + private final MockProcessorContext mockProcessor = new MockProcessorContext<>(); + private final TestProcessorExecutionContext testProcessorContext = TestProcessorExecutionContext.create( + command, + HeadersUtil.metadataHeadersFor( + new TenantIdModel(LHConstants.DEFAULT_TENANT), + new PrincipalIdModel(LHConstants.ANONYMOUS_PRINCIPAL)), + mockProcessor); + private final GetableManager getableManager = testProcessorContext.getableManager(); + + // VariableModel's "index" implementation relies on a litany of other objects + // that we do not need to create or store to test this specific feature. + // So we mock the VariableModel and hide the "index" functionality with our when() statements. + public VariableModel mockVariableModel() { + VariableModel variableModel = spy(); + variableModel.setId(new VariableIdModel(wfRun.getId(), 0, "test-name")); + variableModel.setValue(TestUtil.variableValue()); + variableModel.setMasked(false); + WfSpecModel wfSpec = TestUtil.wfSpec("testWfSpecName"); + variableModel.setWfSpec(wfSpec); + variableModel.setWfSpecId(wfSpec.getId()); + + when(variableModel.getIndexConfigurations()).thenReturn(List.of()); + when(variableModel.getIndexEntries()).thenReturn(List.of()); + + return variableModel; + } + + @Test + public void shouldDeleteWfRunObjects() { + TaskRunModel taskRunModel = TestUtil.taskRun( + new TaskRunIdModel(wfRun.getId(), UUID.randomUUID().toString()), + new TaskDefIdModel(UUID.randomUUID().toString())); + VariableModel variableModel = mockVariableModel(); + ExternalEventModel externalEventModel = TestUtil.externalEvent(wfRunId); + UserTaskRunModel userTaskRunModel = TestUtil.userTaskRun(wfRunId, testProcessorContext); + WorkflowEventModel workflowEventModel = TestUtil.workflowEvent(wfRunId); + NodeRunModel nodeRunModel = TestUtil.nodeRun(wfRunId); + + getableManager.put(wfRun); + getableManager.put(taskRunModel); + getableManager.put(variableModel); + getableManager.put(externalEventModel); + getableManager.put(userTaskRunModel); + getableManager.put(workflowEventModel); + getableManager.put(nodeRunModel); + + getableManager.commit(); + processor.init(mockProcessor); + processor.process(new Record<>("", command, 0L, testProcessorContext.getRecordMetadata())); + + WfRunModel storedWfRunModel = getableManager.get(wfRun.getId()); + TaskRunModel storedTaskRunModel = getableManager.get(taskRunModel.getId()); + VariableModel storedVariableModel = getableManager.get(variableModel.getId()); + ExternalEventModel storedExternalEventModel = getableManager.get(externalEventModel.getId()); + UserTaskRunModel storedUserTaskRunModel = getableManager.get(userTaskRunModel.getId()); + WorkflowEventModel storedWorkflowEventModel = getableManager.get(workflowEventModel.getId()); + NodeRunModel storedNodeRunModel = getableManager.get(nodeRunModel.getId()); + + assertThat(storedWfRunModel).isNull(); + assertThat(storedTaskRunModel).isNull(); + assertThat(storedVariableModel).isNull(); + assertThat(storedExternalEventModel).isNull(); + assertThat(storedUserTaskRunModel).isNull(); + assertThat(storedWorkflowEventModel).isNull(); + assertThat(storedNodeRunModel).isNull(); + verify(server, never()).sendErrorToClient(anyString(), any()); + } + + private Command commandProto() { + DeleteWfRunRequest request = DeleteWfRunRequest.newBuilder() + .setId(WfRunId.newBuilder().setId(wfRun.getObjectId().getId())) + .build(); + return Command.newBuilder().setDeleteWfRun(request).build(); + } +} diff --git a/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java b/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java index 77f8be564..b29fc86b5 100644 --- a/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java +++ b/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java @@ -58,7 +58,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -81,7 +80,7 @@ public class GetableManagerTest { new MockProcessorContext<>(); private GetableManager getableManager; - @Mock(answer = Answers.RETURNS_DEEP_STUBS) + @Mock private ProcessorExecutionContext executionContext; private AuthorizationContext testContext = new AuthorizationContextImpl( @@ -133,6 +132,21 @@ void deleteGetableAndTags() { assertThat(keysAfterDelete).isEmpty(); } + @Test + void deleteAllByPrefix() { + WfRunModel wfRunModel = TestUtil.wfRun("1234"); + TaskRunModel taskRunModel = TestUtil.taskRun(); + + getableManager.put(taskRunModel); + getableManager.commit(); + + getableManager.deleteAllByPrefix(wfRunModel.getPartitionKey().get(), TaskRunModel.class); + getableManager.commit(); + + TaskRunModel storedTaskRunModel = getableManager.get(taskRunModel.getObjectId()); + assertThat(storedTaskRunModel).isNull(); + } + @NotNull private List getAllKeys(KeyValueStore store) { KeyValueIterator all = store.all(); From d0c76d2193bdeffa5d691ec95840846f9763e01e Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Sat, 9 Nov 2024 10:07:19 -0800 Subject: [PATCH 2/6] feat(server): adds configuration for kafka streams transaction timeout (#1114) - Adds `LHS_STREAMS_TRANSACTION_TIMEOUT_MS` - Adds `LHS_CORE_KS_CONFIG_` prefix which allows overriding any kafka streams configuration on the core topology. --- .../06-operations/01-server-configuration.md | 20 ++++++ local-dev/configs/server-1.config | 2 + .../io/littlehorse/common/LHConstants.java | 2 +- .../io/littlehorse/common/LHServerConfig.java | 63 +++++++++++++------ .../topology/timer/TimerProcessor.java | 4 +- .../topology/timer/TimerProcessorTest.java | 2 +- 6 files changed, 69 insertions(+), 24 deletions(-) diff --git a/docs/docs/06-operations/01-server-configuration.md b/docs/docs/06-operations/01-server-configuration.md index 05d622c9e..3538b403a 100644 --- a/docs/docs/06-operations/01-server-configuration.md +++ b/docs/docs/06-operations/01-server-configuration.md @@ -478,6 +478,26 @@ The number of threads to execute stream processing in the Core Topology. [Kafka --- +### `LHS_KAFKA_TRANSACTION_TIMEOUT_MS` + +The transaction timeout configured for the Core Topology producer. + +- **Type:** int, >= 1 +- **Default:** 60000 +- **Importance:** medium + +--- + +### `LHS_CORE_KS_CONFIG_` + +Any configurations prefixed with this prefix will be appended to the Kafka Streams configuration properties for the Core Topology. For example, setting `LHS_CORE_KS_CONFIG_RESTORE_CONSUMER_CLIENT_RACK` would set the `restore.consumer.client.rack` configuration. + +- **Type:** any +- **Default:** none +- **Importance:** low + +--- + ### `LHS_TIMER_STREAM_THREADS` The number of threads to execute stream processing in the Timer Topology. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_num.stream.threads). For a server with `N` cores, we recommend setting this to `N * 0.4`. diff --git a/local-dev/configs/server-1.config b/local-dev/configs/server-1.config index 1afe967f0..f561a5877 100644 --- a/local-dev/configs/server-1.config +++ b/local-dev/configs/server-1.config @@ -11,6 +11,8 @@ LHS_SHOULD_CREATE_TOPICS=true LHS_CORE_STREAM_THREADS=2 LHS_STREAMS_METRICS_LEVEL=info +LHS_STREAMS_TRANSACTION_TIMEOUT_MS=10000 + LHS_HEALTH_SERVICE_PORT=1822 LHS_INTERNAL_BIND_PORT=2011 diff --git a/server/src/main/java/io/littlehorse/common/LHConstants.java b/server/src/main/java/io/littlehorse/common/LHConstants.java index 08bda5959..2dac9236e 100644 --- a/server/src/main/java/io/littlehorse/common/LHConstants.java +++ b/server/src/main/java/io/littlehorse/common/LHConstants.java @@ -11,7 +11,7 @@ public class LHConstants { // Other various constants used by code - public static final Duration PUNCTUATOR_INERVAL = Duration.ofMillis(500); + public static final Duration TIMER_PUNCTUATOR_INTERVAL = Duration.ofMillis(500); public static final String EXT_EVT_HANDLER_VAR = "INPUT"; // Make all global metadata use the same partition key so that they're processed diff --git a/server/src/main/java/io/littlehorse/common/LHServerConfig.java b/server/src/main/java/io/littlehorse/common/LHServerConfig.java index 885a27f66..92977881b 100644 --- a/server/src/main/java/io/littlehorse/common/LHServerConfig.java +++ b/server/src/main/java/io/littlehorse/common/LHServerConfig.java @@ -91,6 +91,8 @@ public class LHServerConfig extends ConfigBase { public static final String ROCKSDB_COMPACTION_THREADS_KEY = "LHS_ROCKSDB_COMPACTION_THREADS"; public static final String STREAMS_METRICS_LEVEL_KEY = "LHS_STREAMS_METRICS_LEVEL"; public static final String LINGER_MS_KEY = "LHS_KAFKA_LINGER_MS"; + public static final String TRANSACTION_TIMEOUT_MS_KEY = "LHS_STREAMS_TRANSACTION_TIMEOUT_MS"; + public static final String CORE_KAFKA_STREAMS_OVERRIDE_PREFIX = "LHS_CORE_KS_CONFIG_"; // General LittleHorse Runtime Behavior Config Env Vars public static final String NUM_NETWORK_THREADS_KEY = "LHS_NUM_NETWORK_THREADS"; @@ -773,18 +775,18 @@ public boolean leaveGroupOnShutdown() { } public Properties getCoreStreamsConfig() { - Properties props = getBaseStreamsConfig(); - props.put("application.id", getKafkaGroupId("core")); - props.put("client.id", this.getClientId("core")); + Properties result = getBaseStreamsConfig(); + result.put("application.id", getKafkaGroupId("core")); + result.put("client.id", this.getClientId("core")); if (getOrSetDefault(X_USE_AT_LEAST_ONCE_KEY, "false").equals("true")) { log.warn("Using experimental override config to use at-least-once for Core topology"); - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); + result.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); } else { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + result.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); } - props.put("num.stream.threads", Integer.valueOf(getOrSetDefault(CORE_STREAM_THREADS_KEY, "1"))); + result.put("num.stream.threads", Integer.valueOf(getOrSetDefault(CORE_STREAM_THREADS_KEY, "1"))); // The Core Topology is EOS. Note that we have engineered the application to not be sensitive // to commit latency (long story). The only thing that is affected by commit latency is the // time at which metrics updates are processed by the repartition processor, but those @@ -802,16 +804,34 @@ public Properties getCoreStreamsConfig() { // // That's not to mention that we will be writing fewer times to RocksDB. Huge win. int commitInterval = Integer.valueOf(getOrSetDefault(LHServerConfig.CORE_STREAMS_COMMIT_INTERVAL_KEY, "3000")); - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval); - props.put( + result.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval); + result.put( "statestore.cache.max.bytes", Long.valueOf(getOrSetDefault(CORE_STATESTORE_CACHE_BYTES_KEY, String.valueOf(1024L * 1024L * 32)))); - // Kafka Streams calls KafkaProducer#commitTransaction() which flushes messages anyways. Sending earlier does - // not help at all in any way (this is because the Core topology is EOS). Therefore, having a big linger.ms - // does not have any downsides in theory. - props.put(StreamsConfig.producerPrefix("linger.ms"), 1000); - return props; + // Kafka Streams calls KafkaProducer#commitTransaction() which flushes messages upon committing the kafka + // transaction. We _could_ linger.ms to the commit interval; however, the problem with this is that the + // timer topology needs to be able to read the records. The Timer Topology is set to read_uncommitted and + // has a requirement that all Command's in the Timer Topology are idempotent, so this is okay. + // + // We can make some of our end-to-end tests fail if we set linger.ms to something big (i.e. 3,000ms) because + // it delays the sending of timers to the Timer Topology, so certain time-triggered events that are expected + // to happen end up not happening (eg. in RetryTest, exponential-backoff retries are not scheduled on time). + // + // We set linger.ms to the same interval as the Timer Punctuator interval (500ms). This gives us approximately + // 1-second precision on timers. + result.put(StreamsConfig.producerPrefix("linger.ms"), LHConstants.TIMER_PUNCTUATOR_INTERVAL.toMillis()); + + for (Object keyObj : props.keySet()) { + String key = (String) keyObj; + if (key.startsWith(CORE_KAFKA_STREAMS_OVERRIDE_PREFIX)) { + String kafkaKey = key.substring(CORE_KAFKA_STREAMS_OVERRIDE_PREFIX.length()) + .replace("_", ".") + .toLowerCase(); + result.put(kafkaKey, props.get(key)); + } + } + return result; } public Properties getTimerStreamsConfig() { @@ -876,7 +896,6 @@ private Properties getBaseStreamsConfig() { props.put("bootstrap.servers", this.getBootstrapServers()); props.put("state.dir", getStateDirectory()); props.put("request.timeout.ms", 1000 * 60); - props.put("producer.transaction.timeout.ms", 1000 * 60); props.put("producer.acks", "all"); props.put("replication.factor", (int) getReplicationFactor()); props.put("num.standby.replicas", Integer.valueOf(getOrSetDefault(NUM_STANDBY_REPLICAS_KEY, "0"))); @@ -885,6 +904,7 @@ private Properties getBaseStreamsConfig() { props.put( "metrics.recording.level", getOrSetDefault(STREAMS_METRICS_LEVEL_KEY, "info").toUpperCase()); + props.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), getTransactionTimeoutMs()); // Configs required by KafkaStreams. Some of these are overriden by the application logic itself. props.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class); @@ -908,7 +928,7 @@ private Properties getBaseStreamsConfig() { // Those fetch requests can be somewhat costly. props.put("global.consumer.client.rack", getRackId()); - // As of Kafka 3.6, there is nothing we can do to optimize the group coordinator traffic. + // As of Kafka 3.9, there is nothing we can do to optimize the group coordinator traffic. } // Set the RocksDB Config Setter, and inject this LHServerConfig into the options set @@ -924,9 +944,7 @@ private Properties getBaseStreamsConfig() { // in the case of a server failure while a request is being processed, the resulting // `Command` should be processed on a new server within a minute. Issue #479 // should verify this behavior - props.put( - "consumer.session.timeout.ms", - Integer.valueOf(getOrSetDefault(LHServerConfig.SESSION_TIMEOUT_KEY, "40000"))); + props.put("consumer.session.timeout.ms", getStreamsSessionTimeout()); // In case we need to authenticate to Kafka, this sets it. addKafkaSecuritySettings(props); @@ -934,12 +952,17 @@ private Properties getBaseStreamsConfig() { return props; } + private int getTransactionTimeoutMs() { + // Default 60 second transaction timeout. + return Integer.valueOf(getOrSetDefault(LHServerConfig.TRANSACTION_TIMEOUT_MS_KEY, "60000")); + } + private String getClientId(String component) { return this.getLHClusterId() + "-" + this.getLHInstanceName() + "-" + component; } - public long getStreamsSessionTimeout() { - return Long.parseLong(props.getProperty(SESSION_TIMEOUT_KEY)); + public int getStreamsSessionTimeout() { + return Integer.valueOf(getOrSetDefault(LHServerConfig.SESSION_TIMEOUT_KEY, "40000")); } public int getNumNetworkThreads() { diff --git a/server/src/main/java/io/littlehorse/server/streams/topology/timer/TimerProcessor.java b/server/src/main/java/io/littlehorse/server/streams/topology/timer/TimerProcessor.java index d7da16f81..2f6185d73 100644 --- a/server/src/main/java/io/littlehorse/server/streams/topology/timer/TimerProcessor.java +++ b/server/src/main/java/io/littlehorse/server/streams/topology/timer/TimerProcessor.java @@ -25,8 +25,8 @@ public class TimerProcessor implements Processor context) { this.context = context; timerStore = context.getStateStore(ServerTopology.TIMER_STORE); - this.punctuator = - context.schedule(LHConstants.PUNCTUATOR_INERVAL, PunctuationType.WALL_CLOCK_TIME, this::clearTimers); + this.punctuator = context.schedule( + LHConstants.TIMER_PUNCTUATOR_INTERVAL, PunctuationType.WALL_CLOCK_TIME, this::clearTimers); } @Override diff --git a/server/src/test/java/io/littlehorse/server/streams/topology/timer/TimerProcessorTest.java b/server/src/test/java/io/littlehorse/server/streams/topology/timer/TimerProcessorTest.java index b3846ac26..6aa7bd6cf 100644 --- a/server/src/test/java/io/littlehorse/server/streams/topology/timer/TimerProcessorTest.java +++ b/server/src/test/java/io/littlehorse/server/streams/topology/timer/TimerProcessorTest.java @@ -68,7 +68,7 @@ public void supportSystemTimePunctuatorConsistency() { @Test public void supportPunctuatorIntervalConfigConsistency() { - Assertions.assertThat(scheduledPunctuator.getInterval()).isEqualTo(LHConstants.PUNCTUATOR_INERVAL); + Assertions.assertThat(scheduledPunctuator.getInterval()).isEqualTo(LHConstants.TIMER_PUNCTUATOR_INTERVAL); } @Test From f39e55a0a0c2e3c514ac97e58757f76d1c2e3590 Mon Sep 17 00:00:00 2001 From: Jacob Snarr Date: Sun, 10 Nov 2024 13:14:13 -0500 Subject: [PATCH 3/6] build(deps): bump com.nimbusds:oauth2-oidc-sdk from 10.9.2 to 11.20.1 (#1117) --- sdk-java/build.gradle | 2 +- server/build.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk-java/build.gradle b/sdk-java/build.gradle index 4a8545c65..3030ab4c6 100644 --- a/sdk-java/build.gradle +++ b/sdk-java/build.gradle @@ -92,7 +92,7 @@ dependencies { implementation 'org.awaitility:awaitility:4.2.0' // OAuth - implementation 'com.nimbusds:oauth2-oidc-sdk:10.9.2' + implementation 'com.nimbusds:oauth2-oidc-sdk:11.20.1' // Lombok stuffs compileOnly "org.projectlombok:lombok:${lombokVersion}" diff --git a/server/build.gradle b/server/build.gradle index e782b13ce..ed4f47415 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -47,7 +47,7 @@ dependencies { testImplementation "org.apache.kafka:kafka-streams-test-utils:${kafkaVersion}" // Auth - implementation 'com.nimbusds:oauth2-oidc-sdk:10.9.2' + implementation 'com.nimbusds:oauth2-oidc-sdk:11.20.1' // Utils implementation 'org.apache.commons:commons-lang3:3.12.0' From bcd2cb458a3a0d9f7ffb6896a0590049f5fdd8ec Mon Sep 17 00:00:00 2001 From: KarlaCarvajal Date: Tue, 12 Nov 2024 09:26:16 -0500 Subject: [PATCH 4/6] docs(dotnet-sdk): add dotnet example to mask input and output fields (#1121) Add example project to use masked params and result in LHTaskMethod --- .gitignore | 1 + .../MaskedFieldsExample.csproj | 13 +++++ .../Examples/MaskedFieldsExample/Program.cs | 56 +++++++++++++++++++ .../Examples/MaskedFieldsExample/README.md | 32 +++++++++++ .../Examples/MaskedFieldsExample/Worker.cs | 32 +++++++++++ 5 files changed, 134 insertions(+) create mode 100644 sdk-dotnet/Examples/MaskedFieldsExample/MaskedFieldsExample.csproj create mode 100644 sdk-dotnet/Examples/MaskedFieldsExample/Program.cs create mode 100644 sdk-dotnet/Examples/MaskedFieldsExample/README.md create mode 100644 sdk-dotnet/Examples/MaskedFieldsExample/Worker.cs diff --git a/.gitignore b/.gitignore index 596401d78..dfd1ec023 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,7 @@ lhctl/lhctl local-dev/certs/ build/ .config +*.pem # Python __pycache__ diff --git a/sdk-dotnet/Examples/MaskedFieldsExample/MaskedFieldsExample.csproj b/sdk-dotnet/Examples/MaskedFieldsExample/MaskedFieldsExample.csproj new file mode 100644 index 000000000..f09e3ff5c --- /dev/null +++ b/sdk-dotnet/Examples/MaskedFieldsExample/MaskedFieldsExample.csproj @@ -0,0 +1,13 @@ + + + + net8.0 + enable + enable + enable + + + + + + diff --git a/sdk-dotnet/Examples/MaskedFieldsExample/Program.cs b/sdk-dotnet/Examples/MaskedFieldsExample/Program.cs new file mode 100644 index 000000000..7a378c616 --- /dev/null +++ b/sdk-dotnet/Examples/MaskedFieldsExample/Program.cs @@ -0,0 +1,56 @@ +using Examples.BasicExample; +using LittleHorse.Sdk; +using LittleHorse.Sdk.Worker; + +public class Program +{ + private static ServiceProvider? _serviceProvider; + private static void SetupApplication() + { + _serviceProvider = new ServiceCollection() + .AddLogging(config => + { + config.AddConsole(); + config.SetMinimumLevel(LogLevel.Debug); + }) + .BuildServiceProvider(); + } + + private static LHConfig GetLHConfig(string[] args, ILoggerFactory loggerFactory) + { + var config = new LHConfig(loggerFactory); + + string filePath = Path.Combine(Directory.GetCurrentDirectory(), ".config/littlehorse.config"); + if (File.Exists(filePath)) + config = new LHConfig(filePath, loggerFactory); + + return config; + } + + static void Main(string[] args) + { + SetupApplication(); + if (_serviceProvider != null) + { + var loggerFactory = _serviceProvider.GetRequiredService(); + var config = GetLHConfig(args, loggerFactory); + + MyWorker executableCreateGreet = new MyWorker(); + var taskWorkerCreate = new LHTaskWorker(executableCreateGreet, "create-greet", config); + MyWorker executableUpdateGreet = new MyWorker(); + var taskWorkerUpdate = new LHTaskWorker(executableUpdateGreet, "update-greet", config); + MyWorker executableDeleteGreet = new MyWorker(); + var taskWorkerDelete = new LHTaskWorker(executableDeleteGreet, "delete-greet", config); + + taskWorkerCreate.RegisterTaskDef(); + taskWorkerUpdate.RegisterTaskDef(); + taskWorkerDelete.RegisterTaskDef(); + + Thread.Sleep(1000); + + taskWorkerCreate.Start(); + taskWorkerUpdate.Start(); + taskWorkerDelete.Start(); + } + } +} diff --git a/sdk-dotnet/Examples/MaskedFieldsExample/README.md b/sdk-dotnet/Examples/MaskedFieldsExample/README.md new file mode 100644 index 000000000..345eb8c48 --- /dev/null +++ b/sdk-dotnet/Examples/MaskedFieldsExample/README.md @@ -0,0 +1,32 @@ +## Running MaskedFields Example + +This is a simple example, that masks input params and output results in LH Task Methods. + +Let's run the example in `MaskedFieldsExample` + +``` +dotnet build +dotnet run +``` + +In another terminal, use `lhctl` to run the workflow: + +``` +# The "masked-name" variable should mask the value +# And the input-name variable value will mantain the original plain text + +lhctl run example-basic masked-name pii-info input-name foo +``` + +In addition, you can check the result with: + +``` +# This call shows the result +lhctl get wfRun + +# This will show you all nodes in tha run +lhctl list nodeRun + +# This shows the task run information +lhctl get taskRun +``` diff --git a/sdk-dotnet/Examples/MaskedFieldsExample/Worker.cs b/sdk-dotnet/Examples/MaskedFieldsExample/Worker.cs new file mode 100644 index 000000000..9760e3363 --- /dev/null +++ b/sdk-dotnet/Examples/MaskedFieldsExample/Worker.cs @@ -0,0 +1,32 @@ +using LittleHorse.Sdk.Worker; + +namespace Examples.BasicExample +{ + public class MyWorker + { + [LHTaskMethod("create-greet")] + [LHType(masked: true)] + public string CreateGreeting([LHType(masked: true)] string name) + { + var message = $"Hello team, This is a New Greeting for {name}"; + Console.WriteLine($"Executing task create greet {name}"); + return message; + } + + [LHTaskMethod("update-greet")] + public string UpdateGreeting([LHType(masked: true)] string name) + { + var message = $"Hello team, This is Greeting Modification {name}"; + Console.WriteLine($"Executing task update greet {name}"); + return message; + } + + [LHTaskMethod("delete-greet")] + public string DeleteGreeting(string name) + { + var message = $"Hello team, This is a Greeting Deletion {name}"; + Console.WriteLine($"Executing task delete greet {name}"); + return message; + } + } +} \ No newline at end of file From 072a7a8e17435eae13e252ea9393aa4f12b79d75 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Tue, 12 Nov 2024 11:40:03 -0800 Subject: [PATCH 5/6] breaking(server): refactors variable access level api (#1118) - Fixes ticket #582 - Default access level is now PRIVATE - Only PUBLIC_VAR and required input variables are considered in WfSpec versioning - Only PUBLIC_VAR can be frozen --- sdk-go/littlehorse/wf_lib_internal.go | 6 +- sdk-go/littlehorse/wf_lib_internal_test.go | 13 +-- sdk-go/littlehorse/wf_lib_public.go | 8 ++ .../littlehorse/sdk/wfsdk/WfRunVariable.java | 30 ++++++- .../sdk/wfsdk/internal/WfRunVariableImpl.java | 14 ++- sdk-python/littlehorse/model/__init__.py | 2 +- sdk-python/littlehorse/workflow.py | 25 +++++- sdk-python/tests/test_workflow.py | 9 +- .../subcommand/TriggeredTaskRun.java | 12 +-- .../getable/global/wfspec/WfSpecModel.java | 30 ++++--- .../global/wfspec/thread/ThreadSpecModel.java | 10 +++ .../wfspec/thread/ThreadVarDefModel.java | 2 + .../wfspec/variable/JsonIndexModel.java | 15 +--- .../wfspec/variable/VariableDefModel.java | 2 + .../littlehorse/common/util/WfSpecUtil.java | 47 +++------- .../test/java/e2e/SharedVariablesTest.java | 85 ++++++++++++++++++- .../test/java/e2e/WfSpecLifecycleTest.java | 4 +- .../global/wfspec/WfSpecModelTest.java | 16 ++-- 18 files changed, 233 insertions(+), 97 deletions(-) diff --git a/sdk-go/littlehorse/wf_lib_internal.go b/sdk-go/littlehorse/wf_lib_internal.go index d83291d59..d8fe56934 100644 --- a/sdk-go/littlehorse/wf_lib_internal.go +++ b/sdk-go/littlehorse/wf_lib_internal.go @@ -2,12 +2,13 @@ package littlehorse import ( "errors" - "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto" "log" "strconv" "strings" "unicode" + "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto" + "github.com/ztrue/tracerr" ) @@ -745,7 +746,8 @@ func (t *WorkflowThread) addVariable( } threadVarDef := &lhproto.ThreadVarDef{ - VarDef: varDef, + VarDef: varDef, + AccessLevel: lhproto.WfRunVariableAccessLevel_PRIVATE_VAR, } t.spec.VariableDefs = append(t.spec.VariableDefs, threadVarDef) diff --git a/sdk-go/littlehorse/wf_lib_internal_test.go b/sdk-go/littlehorse/wf_lib_internal_test.go index 715309a4b..dcc5d718d 100644 --- a/sdk-go/littlehorse/wf_lib_internal_test.go +++ b/sdk-go/littlehorse/wf_lib_internal_test.go @@ -1,9 +1,10 @@ package littlehorse_test import ( + "testing" + "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto" "github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse" - "testing" "github.com/stretchr/testify/assert" ) @@ -538,10 +539,10 @@ func TestJsonPath(t *testing.T) { func TestVariableAccessLevel(t *testing.T) { wf := littlehorse.NewWorkflow(func(t *littlehorse.WorkflowThread) { - inheritedVar := t.AddVariable("my-var", lhproto.VariableType_BOOL) - inheritedVar.WithAccessLevel(lhproto.WfRunVariableAccessLevel_PRIVATE_VAR) + publicVar := t.AddVariable("my-var", lhproto.VariableType_BOOL) + publicVar.AsPublic() - // Test that default is PUBLIC_VAR + // Test that default is PRIVATE_VAR t.AddVariable("default-access", lhproto.VariableType_INT) t.Execute("some-task") @@ -550,11 +551,11 @@ func TestVariableAccessLevel(t *testing.T) { putWf, _ := wf.Compile() entrypoint := putWf.ThreadSpecs[putWf.EntrypointThreadName] varDef := entrypoint.VariableDefs[0] - assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PRIVATE_VAR) + assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PUBLIC_VAR) assert.Equal(t, varDef.VarDef.Name, "my-var") varDef = entrypoint.VariableDefs[1] - assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PUBLIC_VAR) + assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PRIVATE_VAR) assert.Equal(t, varDef.VarDef.Name, "default-access") } diff --git a/sdk-go/littlehorse/wf_lib_public.go b/sdk-go/littlehorse/wf_lib_public.go index 09acb9d13..4340087d5 100644 --- a/sdk-go/littlehorse/wf_lib_public.go +++ b/sdk-go/littlehorse/wf_lib_public.go @@ -110,6 +110,14 @@ func (w *WfRunVariable) WithAccessLevel(accessLevel lhproto.WfRunVariableAccessL return w.withAccessLevel(accessLevel) } +func (w *WfRunVariable) AsPublic() WfRunVariable { + return w.withAccessLevel(lhproto.WfRunVariableAccessLevel_PUBLIC_VAR) +} + +func (w *WfRunVariable) AsInherited() WfRunVariable { + return w.withAccessLevel(lhproto.WfRunVariableAccessLevel_INHERITED_VAR) +} + func (w *WfRunVariable) Searchable() *WfRunVariable { return w.searchableImpl() } diff --git a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java index 3d31a85c7..b59f8f19a 100644 --- a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java +++ b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java @@ -27,14 +27,36 @@ public interface WfRunVariable extends Serializable { WfRunVariable required(); /** - * Marks the Variable as "Searchable", which: - * - Creates an Index on the Variable in the LH Data Store - * - Due to the fact that the Variable is now Searchable, all future WfSpec - * versions must use the same Type for this Variable. + * Marks the Variable as "Searchable", which creates an Index on the Variable + * in the LH Data Store. * @return same {@link WfRunVariable} instance */ WfRunVariable searchable(); + /** + * Marks the Variable as a `PUBLIC_VAR`, which does three things: + * 1. Considers this variable in determining whether a new version of this WfSpec + * should be a major version or minor revision. + * 2. Freezes the type of this variable so that you cannot create future WfSpec + * versions with a variable of the same name and different type. + * 3. Allows defining child WfSpec's that use this variable. + * + * This is an advanced feature that you should use in any of the following cases: + * - You are treating a WfSpec as a data model and a WfRun as an instance of data. + * - You need child workflows to access this variable. + * @return same {@link WfRunVariable} instance + */ + WfRunVariable asPublic(); + + /** + * Marks the Variable as a `INHERITED_VAR`, which means that it comes from the + * parent `WfRun`. This means that: + * - There must be a parent WfSpec reference. + * - The parent must have a PUBLIC_VAR variable of the same name and type. + * @return same {@link WfRunVariable} instance + */ + WfRunVariable asInherited(); + /** * Marks the JSON_OBJ or JSON_ARR Variable as "Searchable", and creates an * index on the specified field. diff --git a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java index cd6b03023..b982d1e53 100644 --- a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java +++ b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java @@ -33,8 +33,8 @@ public WfRunVariableImpl(String name, Object typeOrDefaultVal) { this.name = name; this.typeOrDefaultVal = typeOrDefaultVal; - // This is the default zero value. - this.accessLevel = WfRunVariableAccessLevel.PUBLIC_VAR; + // As per GH Issue #582, the default is now PRIVATE_VAR. + this.accessLevel = WfRunVariableAccessLevel.PRIVATE_VAR; initializeType(); } @@ -122,4 +122,14 @@ public ThreadVarDef getSpec() { .setAccessLevel(accessLevel) .build(); } + + @Override + public WfRunVariableImpl asPublic() { + return this.withAccessLevel(WfRunVariableAccessLevel.PUBLIC_VAR); + } + + @Override + public WfRunVariable asInherited() { + return this.withAccessLevel(WfRunVariableAccessLevel.INHERITED_VAR); + } } diff --git a/sdk-python/littlehorse/model/__init__.py b/sdk-python/littlehorse/model/__init__.py index 85f573b7e..c956189c7 100644 --- a/sdk-python/littlehorse/model/__init__.py +++ b/sdk-python/littlehorse/model/__init__.py @@ -5,8 +5,8 @@ from .node_run_pb2 import * from .object_id_pb2 import * from .scheduled_wf_run_pb2 import * -from .service_pb2 import * from .service_pb2_grpc import * +from .service_pb2 import * from .task_def_pb2 import * from .task_run_pb2 import * from .user_tasks_pb2 import * diff --git a/sdk-python/littlehorse/workflow.py b/sdk-python/littlehorse/workflow.py index 4e97a5f7b..71a7788af 100644 --- a/sdk-python/littlehorse/workflow.py +++ b/sdk-python/littlehorse/workflow.py @@ -56,6 +56,7 @@ WfRunVariableAccessLevel, WorkflowRetentionPolicy, ) +from littlehorse.model.wf_spec_pb2 import PRIVATE_VAR from littlehorse.utils import negate_comparator, to_variable_value from littlehorse.worker import _create_task_def @@ -364,6 +365,28 @@ def with_json_path(self, json_path: str) -> "WfRunVariable": out.json_path = json_path return out + def as_public(self) -> "WfRunVariable": + """Sets the access level to PUBLIC_VAR, which has three implications: + - Future versions of this WfSpec cannot define a variable with the + same name and a different type. + - Child workflows can access this variable. + - This variable is now considered in determining whether a new + version of the WfSpec is a majorVersion or revision. + """ + self._access_level = WfRunVariableAccessLevel.PUBLIC_VAR + return self + + def as_inherited(self) -> "WfRunVariable": + """Sets the access level to INHERITED_VAR, which has three implications: + - Future versions of this WfSpec cannot define a variable with the + same name and a different type. + - Child workflows can access this variable. + - This variable is now considered in determining whether a new + version of the WfSpec is a majorVersion or revision. + """ + self._access_level = WfRunVariableAccessLevel.INHERITED_VAR + return self + def with_access_level( self, access_level: WfRunVariableAccessLevel ) -> "WfRunVariable": @@ -1474,7 +1497,7 @@ def add_variable( self, variable_name: str, variable_type: VariableType, - access_level: Optional[Union[WfRunVariableAccessLevel, str]] = None, + access_level: Optional[Union[WfRunVariableAccessLevel, str]] = PRIVATE_VAR, default_value: Any = None, ) -> WfRunVariable: """Defines a Variable in the ThreadSpec and returns a handle to it. diff --git a/sdk-python/tests/test_workflow.py b/sdk-python/tests/test_workflow.py index 0618844ba..6f3c1c59c 100644 --- a/sdk-python/tests/test_workflow.py +++ b/sdk-python/tests/test_workflow.py @@ -199,6 +199,7 @@ def my_entrypoint(thread: WorkflowThread) -> None: variable_defs=[ ThreadVarDef( var_def=VariableDef(name="input-name", type=VariableType.STR), + access_level=WfRunVariableAccessLevel.PRIVATE_VAR, ), ], nodes={ @@ -884,6 +885,7 @@ def my_entrypoint(thread: WorkflowThread) -> None: variable_defs=[ ThreadVarDef( var_def=VariableDef(name="input-name", type=VariableType.STR), + access_level=WfRunVariableAccessLevel.PRIVATE_VAR, ) ], nodes={ @@ -1046,6 +1048,7 @@ def my_entrypoint(thread: WorkflowThread) -> None: variable_defs=[ ThreadVarDef( var_def=VariableDef(name="value", type=VariableType.INT), + access_level=WfRunVariableAccessLevel.PRIVATE_VAR, ), ], nodes={ @@ -1609,7 +1612,8 @@ def my_entrypoint(thread: WorkflowThread) -> None: ThreadVarDef( var_def=VariableDef( name="input-name", type=VariableType.STR - ) + ), + access_level=WfRunVariableAccessLevel.PRIVATE_VAR, ), ], nodes={ @@ -1642,7 +1646,8 @@ def my_entrypoint(thread: WorkflowThread) -> None: ThreadVarDef( var_def=VariableDef( name="input-name", type=VariableType.STR - ) + ), + access_level=WfRunVariableAccessLevel.PRIVATE_VAR, ), ], nodes={ diff --git a/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TriggeredTaskRun.java b/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TriggeredTaskRun.java index 0f9d0c5bd..b21e4828e 100644 --- a/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TriggeredTaskRun.java +++ b/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TriggeredTaskRun.java @@ -74,19 +74,19 @@ public void initFrom(Message proto, ExecutionContext context) { public Empty process(ProcessorExecutionContext executionContext, LHServerConfig config) { WfRunIdModel wfRunId = source.getWfRunId(); - log.info("Might schedule a one-off task for wfRun {} due to UserTask", wfRunId); + log.trace("Might schedule a one-off task for wfRun {} due to UserTask", wfRunId); WfRunModel wfRunModel = executionContext.getableManager().get(wfRunId); if (wfRunModel == null) { - log.info("WfRun no longer exists! Skipping the scheduled action trigger"); + log.trace("WfRun no longer exists! Skipping the scheduled action trigger"); return null; } // Now verify that the thing hasn't yet been completed. ThreadRunModel thread = wfRunModel.getThreadRun(source.getThreadRunNumber()); - // Impossible for thread to be null, but check anyways + // This can happen in the case of ThreadRetentionPolicy being set. if (thread == null) { - log.warn("Triggered scheduled task refers to missing thread!"); + log.trace("Triggered scheduled task refers to missing thread!"); return null; } @@ -96,12 +96,12 @@ public Empty process(ProcessorExecutionContext executionContext, LHServerConfig UserTaskRunModel userTaskRun = executionContext.getableManager().get(userTaskRunId); if (userTaskNR.getStatus() != LHStatus.RUNNING) { - log.info("NodeRun is not RUNNING anymore, so can't take action!"); + log.trace("NodeRun is not RUNNING anymore, so can't take action!"); return null; } // At this point, need to update the events. - log.info("Scheduling a one-off task for wfRun {} due to UserTask", wfRunId); + log.trace("Scheduling a one-off task for wfRun {} due to UserTask", wfRunId); try { List inputVars = taskToSchedule.assignInputVars(thread, executionContext); diff --git a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModel.java b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModel.java index d544e0df3..13174d115 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModel.java @@ -35,6 +35,7 @@ import io.littlehorse.server.streams.topology.core.MetadataCommandExecution; import io.littlehorse.server.streams.topology.core.ProcessorExecutionContext; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -287,12 +288,6 @@ public Map getSearchableVariables() { return out; } - public Map getRequiredVariables() { - return threadSpecs.get(entrypointThreadName).getInputVariableDefs().entrySet().stream() - .filter(kv -> kv.getValue().isRequired()) - .collect(Collectors.toMap(kv -> kv.getKey(), kv -> kv.getValue())); - } - /* * For now, the only validation we do for variables is to make sure that: * 1. No variable name is defined twice (this will be useful for future @@ -358,17 +353,28 @@ private void validateVariablesHelper(MetadataCommandExecution ctx) throws LHApiE } // Now we curate the list of variables which are "frozen" in time and cannot - // change their types. + // change their types. This includes two types: + // - Required variables in the entrypoint threadRun + // - Any variable with the access_level `PUBLIC_VAR`. + for (ThreadVarDefModel tvd : getEntrypointThread().getRequiredVarDefs()) { + frozenVariables.put(tvd.getVarDef().getName(), tvd); + } for (ThreadSpecModel thread : threadSpecs.values()) { - for (ThreadVarDefModel tvd : thread.getRequiredVarDefs()) { - frozenVariables.put(tvd.getVarDef().getName(), tvd); - } - for (ThreadVarDefModel tvd : thread.getRequiredVarDefs()) { + for (ThreadVarDefModel tvd : thread.getPublicVarDefs()) { frozenVariables.put(tvd.getVarDef().getName(), tvd); } } } + /** + * Returns a ThreadVarDef for every PUBLIC_VAR variable in the WfSpec (all threads). + */ + public Collection getPublicVars() { + return threadSpecs.values().stream() + .flatMap(tspec -> tspec.getPublicVarDefs().stream()) + .toList(); + } + private void checkCompatibilityAndSetVersion(WfSpecModel old) { // First, for every previously-frozen variable, we need to check that either: // - the variable isn't included, or @@ -385,7 +391,7 @@ private void checkCompatibilityAndSetVersion(WfSpecModel old) { if (oldDef.getVarDef().getType() != currentVarDef.getVarDef().getType()) { throw new LHApiException( Status.FAILED_PRECONDITION, - "Variable %s must be of type %s not %s" + "Variable %s must be of type %s not %s as it was formerly declared a PUBLIC_VAR" .formatted( varName, oldDef.getVarDef().getType(), diff --git a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadSpecModel.java b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadSpecModel.java index 57c48b44d..8cde2094a 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadSpecModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadSpecModel.java @@ -21,6 +21,7 @@ import io.littlehorse.server.streams.topology.core.ExecutionContext; import io.littlehorse.server.streams.topology.core.MetadataCommandExecution; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -128,6 +129,15 @@ public Map getInputVariableDefs() { return out; } + /** + * Returns all ThreadVarDef's that are of type `PUBLIC_VAR`. + */ + public Collection getPublicVarDefs() { + return getVariableDefs().stream() + .filter(varDef -> varDef.getAccessLevel() == WfRunVariableAccessLevel.PUBLIC_VAR) + .toList(); + } + /* * Returns a set of all variable names *used* during thread execution. */ diff --git a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadVarDefModel.java b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadVarDefModel.java index 1aa224235..9bd5614b0 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadVarDefModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadVarDefModel.java @@ -10,9 +10,11 @@ import io.littlehorse.server.streams.topology.core.ExecutionContext; import java.util.ArrayList; import java.util.List; +import lombok.EqualsAndHashCode; import lombok.Getter; @Getter +@EqualsAndHashCode(callSuper = false) public class ThreadVarDefModel extends LHSerializable { private VariableDefModel varDef; diff --git a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/JsonIndexModel.java b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/JsonIndexModel.java index c8efadc0a..42468e14d 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/JsonIndexModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/JsonIndexModel.java @@ -5,14 +5,15 @@ import io.littlehorse.sdk.common.proto.JsonIndex; import io.littlehorse.sdk.common.proto.VariableType; import io.littlehorse.server.streams.topology.core.ExecutionContext; -import java.util.Objects; import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; @Getter @AllArgsConstructor @NoArgsConstructor +@EqualsAndHashCode(callSuper = false) public class JsonIndexModel extends LHSerializable { private String fieldPath; @@ -36,16 +37,4 @@ public void initFrom(Message proto, ExecutionContext context) { fieldPath = p.getFieldPath(); fieldType = p.getFieldType(); } - - @Override - public boolean equals(Object other) { - if (other == null) return false; - - if (!(other instanceof JsonIndexModel)) { - return false; - } - - JsonIndexModel o = (JsonIndexModel) other; - return Objects.equals(fieldPath, o.getFieldPath()) && Objects.equals(fieldType, o.getFieldType()); - } } diff --git a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/VariableDefModel.java b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/VariableDefModel.java index 557f043cd..501917033 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/VariableDefModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/VariableDefModel.java @@ -9,11 +9,13 @@ import io.littlehorse.sdk.common.proto.VariableDef; import io.littlehorse.sdk.common.proto.VariableType; import io.littlehorse.server.streams.topology.core.ExecutionContext; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @Getter @Setter +@EqualsAndHashCode(callSuper = false) public class VariableDefModel extends LHSerializable { private VariableType type; diff --git a/server/src/main/java/io/littlehorse/common/util/WfSpecUtil.java b/server/src/main/java/io/littlehorse/common/util/WfSpecUtil.java index 1d2c34de0..e2abb5b7f 100644 --- a/server/src/main/java/io/littlehorse/common/util/WfSpecUtil.java +++ b/server/src/main/java/io/littlehorse/common/util/WfSpecUtil.java @@ -3,15 +3,11 @@ import com.google.protobuf.Timestamp; import io.littlehorse.common.model.getable.global.wfspec.WfSpecModel; import io.littlehorse.common.model.getable.global.wfspec.thread.ThreadVarDefModel; -import io.littlehorse.sdk.common.proto.WfRunVariableAccessLevel; import io.littlehorse.sdk.common.proto.WfSpec; import io.littlehorse.sdk.common.proto.WfSpecId; import java.util.Arrays; +import java.util.Collection; import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; public class WfSpecUtil { private WfSpecUtil() {} @@ -40,43 +36,24 @@ private static void sanitize(WfSpec.Builder spec, Timestamp date) { * Verifies if left WfSpecModel have breaking changes comparing to right * when either: * - set of left required variables does not match right required variables - * - set of left searchable variables does not match right searchable variables + * - set of left PUBLIC_VAR variables does not match right PUBLIC_VAR variables * @param left WfSpecModel to be compared * @param right WfSpecModel compared against * @return true when there is a breaking change, false otherwise */ public static boolean hasBreakingChanges(WfSpecModel left, WfSpecModel right) { - return !variablesMatch(left.getRequiredVariables(), right.getRequiredVariables()) - || !variablesMatch(left.getSearchableVariables(), right.getSearchableVariables()) - || !variableAccessLevelMatch( - left.getEntrypointThread().getVariableDefs(), - right.getEntrypointThread().getVariableDefs()); - } + Collection leftPublicVars = left.getPublicVars(); + Collection rightPublicVars = right.getPublicVars(); - private static boolean variablesMatch(Map left, Map right) { - Set leftVariables = left.keySet(); - Set rightVariables = right.keySet(); - if (leftVariables.size() != rightVariables.size()) return false; + if (leftPublicVars.size() != rightPublicVars.size()) return true; + if (!leftPublicVars.containsAll(rightPublicVars)) return true; - return leftVariables.containsAll(rightVariables); - } + // Check required variables + Collection rightRequiredVars = + right.getEntrypointThread().getRequiredVarDefs(); + Collection leftRequiredVars = + left.getEntrypointThread().getRequiredVarDefs(); - private static boolean variableAccessLevelMatch(List left, List right) { - final Map leftVariables = new HashMap<>(); - for (ThreadVarDefModel leftVarDefinition : left) { - leftVariables.put(leftVarDefinition.getVarDef().getName(), leftVarDefinition.getAccessLevel()); - } - for (ThreadVarDefModel rightVarDefinition : right) { - WfRunVariableAccessLevel rightAccessLevel = rightVarDefinition.getAccessLevel(); - if (rightAccessLevel == WfRunVariableAccessLevel.PUBLIC_VAR - || rightAccessLevel == WfRunVariableAccessLevel.INHERITED_VAR) { - WfRunVariableAccessLevel leftAccessLevel = - leftVariables.get(rightVarDefinition.getVarDef().getName()); - if (leftAccessLevel != null && leftAccessLevel.equals(WfRunVariableAccessLevel.PRIVATE_VAR)) { - return false; - } - } - } - return true; + return leftRequiredVars.size() != rightRequiredVars.size() || !leftRequiredVars.containsAll(rightRequiredVars); } } diff --git a/server/src/test/java/e2e/SharedVariablesTest.java b/server/src/test/java/e2e/SharedVariablesTest.java index 9b27cf86c..9f9812c7d 100644 --- a/server/src/test/java/e2e/SharedVariablesTest.java +++ b/server/src/test/java/e2e/SharedVariablesTest.java @@ -9,6 +9,7 @@ import io.littlehorse.sdk.common.proto.VariableType; import io.littlehorse.sdk.common.proto.WfRunId; import io.littlehorse.sdk.common.proto.WfRunVariableAccessLevel; +import io.littlehorse.sdk.common.proto.WfSpec; import io.littlehorse.sdk.common.util.Arg; import io.littlehorse.sdk.wfsdk.WfRunVariable; import io.littlehorse.sdk.wfsdk.Workflow; @@ -67,6 +68,86 @@ public void shouldResolvePublicVariablesFromParentWf() { .isEqualTo(12); } + @Test + void shouldNotFreezeSearchableVariables() { + // Adding UUID makes it possible to run repeatedly with ExternalBootstrapper + String wfSpecName = "dont-freeze-searchable-" + UUID.randomUUID(); + Workflow workflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", VariableType.STR).searchable(); + }); + WfSpec result = client.putWfSpec(workflow.compileWorkflow()); + assertThat(result.getFrozenVariablesCount()).isEqualTo(0); + + // Additionally, private searchable variables do not increment the major version, + // and I can change the type of them if they're not public. + Workflow secondWorkflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", VariableType.INT); + }); + WfSpec secondVersion = client.putWfSpec(secondWorkflow.compileWorkflow()); + assertThat(secondVersion.getId().getMajorVersion()).isEqualTo(0); + assertThat(secondVersion.getId().getRevision()).isEqualTo(1); + } + + @Test + void publicVariablesShouldStayFrozenThroughVersions() { + String wfSpecName = "public-var-frozen-" + UUID.randomUUID(); + Workflow workflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", VariableType.STR).asPublic(); + }); + WfSpec firstVersion = client.putWfSpec(workflow.compileWorkflow()); + assertThat(firstVersion.getFrozenVariablesCount()).isEqualTo(1); + + Workflow secondWorkflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-other-var", "some string value").searchable(); + }); + WfSpec secondVersion = client.putWfSpec(secondWorkflow.compileWorkflow()); + + // Breaking change + assertThat(secondVersion.getId().getMajorVersion()).isEqualTo(1); + assertThat(secondVersion.getId().getRevision()).isEqualTo(0); + + // The first variable should still be frozen + assertThat(secondVersion.getFrozenVariablesCount()).isEqualTo(1); + assertThat(secondVersion.getFrozenVariables(0).getVarDef().getName()).isEqualTo("some-var"); + } + + @Test + void requiredVariablesIncrementMajorVersion() { + String wfSpecName = "required-major-version-" + UUID.randomUUID(); + Workflow workflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", VariableType.STR).required(); + }); + WfSpec firstVersion = client.putWfSpec(workflow.compileWorkflow()); + assertThat(firstVersion.getFrozenVariablesCount()).isEqualTo(1); + + Workflow secondWorkflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-other-var", "not required").searchable(); + }); + WfSpec secondVersion = client.putWfSpec(secondWorkflow.compileWorkflow()); + + assertThat(secondVersion.getId().getMajorVersion()).isEqualTo(1); + assertThat(secondVersion.getId().getRevision()).isEqualTo(0); + } + + @Test + void publicVariablesIncrementMajorVersion() { + String wfSpecName = "public-major-version-" + UUID.randomUUID(); + Workflow workflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", VariableType.STR).asPublic(); + wf.addVariable("another-var", "will-be-removed").asPublic(); + }); + WfSpec firstVersion = client.putWfSpec(workflow.compileWorkflow()); + assertThat(firstVersion.getFrozenVariablesCount()).isEqualTo(2); + + Workflow secondWorkflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", "not public anymore").searchable(); + }); + WfSpec thirdVersion = client.putWfSpec(secondWorkflow.compileWorkflow()); + + assertThat(thirdVersion.getId().getMajorVersion()).isEqualTo(1); + assertThat(thirdVersion.getId().getRevision()).isEqualTo(0); + } + @LHWorkflow("shared-variables-parent-wf") public Workflow buildParentWf() { return new WorkflowImpl("shared-variables-parent-wf", thread -> { @@ -82,8 +163,8 @@ public Workflow buildParentWf() { @LHWorkflow("shared-variables-child-wf") public Workflow buildChildWf() { Workflow out = new WorkflowImpl("shared-variables-child-wf", thread -> { - WfRunVariable publicVariable = thread.addVariable("public-variable", VariableType.INT) - .withAccessLevel(WfRunVariableAccessLevel.INHERITED_VAR); + WfRunVariable publicVariable = + thread.addVariable("public-variable", VariableType.INT).asInherited(); WfRunVariable calculatedValue = thread.addVariable("calculated-value", VariableType.INT).searchable(); diff --git a/server/src/test/java/e2e/WfSpecLifecycleTest.java b/server/src/test/java/e2e/WfSpecLifecycleTest.java index 6ec651a87..9d0675f8e 100644 --- a/server/src/test/java/e2e/WfSpecLifecycleTest.java +++ b/server/src/test/java/e2e/WfSpecLifecycleTest.java @@ -71,7 +71,7 @@ void shouldAddMajorVersionWhenWfSpecHasBreakingChanges() { Workflow updatedWorkflow = Workflow.newWorkflow("sample", wf -> { wf.addVariable("variable", VariableType.BOOL).required(); - wf.addVariable("searchable", VariableType.BOOL).searchable(); + wf.addVariable("second-required", VariableType.BOOL).required(); }); WfSpec updatedSpec = client.putWfSpec(updatedWorkflow.compileWorkflow()); @@ -180,7 +180,7 @@ void shouldThrowFailedPreconditionWhenWfSpecHasBreakingChanges() { Workflow updatedWorkflow = Workflow.newWorkflow("sample", wf -> { wf.addVariable("variable", VariableType.BOOL).required(); - wf.addVariable("searchable", VariableType.BOOL).searchable(); + wf.addVariable("searchable", VariableType.BOOL).required(); }) .withUpdateType(AllowedUpdateType.MINOR_REVISION_UPDATES); diff --git a/server/src/test/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModelTest.java b/server/src/test/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModelTest.java index 6d9ee3a4f..082c2f86e 100644 --- a/server/src/test/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModelTest.java +++ b/server/src/test/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModelTest.java @@ -110,7 +110,7 @@ public void shouldIncreaseTheMajorVersionWhenAPublicVariableChangesItsAccessLeve } @Test - public void shouldIncreaseTheMajorVersionWhenAInheritedVariableChangesItsAccessLevelToPrivate() { + public void shouldNotIncreaseTheMajorVersionWhenAInheritedVariableChangesItsAccessLevelToPrivate() { WfSpecModel oldVersion = TestUtil.wfSpec("my-parent-wf"); ThreadVarDefModel inheritedVar = new ThreadVarDefModel(variableDef, false, false, WfRunVariableAccessLevel.INHERITED_VAR); @@ -127,11 +127,12 @@ public void shouldIncreaseTheMajorVersionWhenAInheritedVariableChangesItsAccessL wfSpec.setThreadSpecs(Map.of(wfSpec.getEntrypointThreadName(), entrypointThread)); Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(0); wfSpec.validateAndMaybeBumpVersion(Optional.of(oldVersion), mockContext); - Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(1); + Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(0); + Assertions.assertThat(wfSpec.getId().getRevision()).isEqualTo(1); } @ParameterizedTest - @MethodSource("provideNonBreakingChangeArguments") + @MethodSource("provideBreakingChangeArguments") public void shouldNotIncreaseTheMajorVersionWhenVariablesAreStillAccessibleFromChildWorkflows( WfRunVariableAccessLevel from, WfRunVariableAccessLevel to) { WfSpecModel oldVersion = TestUtil.wfSpec("my-parent-wf"); @@ -148,19 +149,16 @@ public void shouldNotIncreaseTheMajorVersionWhenVariablesAreStillAccessibleFromC wfSpec.setThreadSpecs(Map.of(wfSpec.getEntrypointThreadName(), entrypointThread)); Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(0); wfSpec.validateAndMaybeBumpVersion(Optional.of(oldVersion), mockContext); - Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(0); + Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(1); } /* Access level combinations when the major version shouldn't increase */ - private static Stream provideNonBreakingChangeArguments() { + private static Stream provideBreakingChangeArguments() { return Stream.of( Arguments.of(WfRunVariableAccessLevel.PUBLIC_VAR, WfRunVariableAccessLevel.INHERITED_VAR), - Arguments.of(WfRunVariableAccessLevel.PUBLIC_VAR, WfRunVariableAccessLevel.PUBLIC_VAR), Arguments.of(WfRunVariableAccessLevel.PRIVATE_VAR, WfRunVariableAccessLevel.PUBLIC_VAR), - Arguments.of(WfRunVariableAccessLevel.PRIVATE_VAR, WfRunVariableAccessLevel.PRIVATE_VAR), - Arguments.of(WfRunVariableAccessLevel.INHERITED_VAR, WfRunVariableAccessLevel.PUBLIC_VAR), - Arguments.of(WfRunVariableAccessLevel.INHERITED_VAR, WfRunVariableAccessLevel.INHERITED_VAR)); + Arguments.of(WfRunVariableAccessLevel.INHERITED_VAR, WfRunVariableAccessLevel.PUBLIC_VAR)); } } From 32c5f2932c5b09dc2c4787395e3714401e92a198 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Tue, 12 Nov 2024 11:51:35 -0800 Subject: [PATCH 6/6] feat(server): use kip-899 to rebootstrap clients (#1122) In testing, I observed that when moving from a 3-node Kafka cluster with mixed brokers and controllers to a Kafka cluster with separate brokers and controllers, all of the Kafka clients got stuck and continued trying to connect to the controllers which no longer had the broker role. It turns out that KIP-899 indirectly solves this problem by allowing clients to re-bootstrap and discover which servers are the brokers now. When tested with my fix for KAFKA-17455, we are successfully able to continue processing workflows without a crash / TaskCorruptedException when migrating the Kafka Cluster from mixed mode to separate mode. --- .../java/io/littlehorse/common/LHServerConfig.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server/src/main/java/io/littlehorse/common/LHServerConfig.java b/server/src/main/java/io/littlehorse/common/LHServerConfig.java index 92977881b..7a96a93c9 100644 --- a/server/src/main/java/io/littlehorse/common/LHServerConfig.java +++ b/server/src/main/java/io/littlehorse/common/LHServerConfig.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; @@ -659,6 +660,7 @@ public long getTimerMemtableSize() { public Properties getKafkaProducerConfig(String component) { Properties conf = new Properties(); conf.put("client.id", this.getClientId(component)); + conf.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap"); conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); conf.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); conf.put( @@ -873,6 +875,17 @@ public Properties getTimerStreamsConfig() { private Properties getBaseStreamsConfig() { Properties props = new Properties(); + props.put(StreamsConfig.producerPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), "rebootstrap"); + props.put(StreamsConfig.consumerPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), "rebootstrap"); + props.put( + StreamsConfig.restoreConsumerPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), + "rebootstrap"); + props.put( + StreamsConfig.globalConsumerPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), + "rebootstrap"); + props.put( + StreamsConfig.adminClientPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), "rebootstrap"); + if (getOrSetDefault(X_LEAVE_GROUP_ON_SHUTDOWN_KEY, "false").equals("true")) { log.warn("Using experimental internal config to leave group on shutdonw!"); props.put(StreamsConfig.consumerPrefix("internal.leave.group.on.close"), true);