diff --git a/.gitignore b/.gitignore index 596401d78..dfd1ec023 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,7 @@ lhctl/lhctl local-dev/certs/ build/ .config +*.pem # Python __pycache__ diff --git a/docs/docs/06-operations/01-server-configuration.md b/docs/docs/06-operations/01-server-configuration.md index 05d622c9e..3538b403a 100644 --- a/docs/docs/06-operations/01-server-configuration.md +++ b/docs/docs/06-operations/01-server-configuration.md @@ -478,6 +478,26 @@ The number of threads to execute stream processing in the Core Topology. [Kafka --- +### `LHS_KAFKA_TRANSACTION_TIMEOUT_MS` + +The transaction timeout configured for the Core Topology producer. + +- **Type:** int, >= 1 +- **Default:** 60000 +- **Importance:** medium + +--- + +### `LHS_CORE_KS_CONFIG_` + +Any configurations prefixed with this prefix will be appended to the Kafka Streams configuration properties for the Core Topology. For example, setting `LHS_CORE_KS_CONFIG_RESTORE_CONSUMER_CLIENT_RACK` would set the `restore.consumer.client.rack` configuration. + +- **Type:** any +- **Default:** none +- **Importance:** low + +--- + ### `LHS_TIMER_STREAM_THREADS` The number of threads to execute stream processing in the Timer Topology. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_num.stream.threads). For a server with `N` cores, we recommend setting this to `N * 0.4`. diff --git a/local-dev/configs/server-1.config b/local-dev/configs/server-1.config index 1afe967f0..f561a5877 100644 --- a/local-dev/configs/server-1.config +++ b/local-dev/configs/server-1.config @@ -11,6 +11,8 @@ LHS_SHOULD_CREATE_TOPICS=true LHS_CORE_STREAM_THREADS=2 LHS_STREAMS_METRICS_LEVEL=info +LHS_STREAMS_TRANSACTION_TIMEOUT_MS=10000 + LHS_HEALTH_SERVICE_PORT=1822 LHS_INTERNAL_BIND_PORT=2011 diff --git a/sdk-dotnet/Examples/MaskedFieldsExample/MaskedFieldsExample.csproj b/sdk-dotnet/Examples/MaskedFieldsExample/MaskedFieldsExample.csproj new file mode 100644 index 000000000..f09e3ff5c --- /dev/null +++ b/sdk-dotnet/Examples/MaskedFieldsExample/MaskedFieldsExample.csproj @@ -0,0 +1,13 @@ + + + + net8.0 + enable + enable + enable + + + + + + diff --git a/sdk-dotnet/Examples/MaskedFieldsExample/Program.cs b/sdk-dotnet/Examples/MaskedFieldsExample/Program.cs new file mode 100644 index 000000000..7a378c616 --- /dev/null +++ b/sdk-dotnet/Examples/MaskedFieldsExample/Program.cs @@ -0,0 +1,56 @@ +using Examples.BasicExample; +using LittleHorse.Sdk; +using LittleHorse.Sdk.Worker; + +public class Program +{ + private static ServiceProvider? _serviceProvider; + private static void SetupApplication() + { + _serviceProvider = new ServiceCollection() + .AddLogging(config => + { + config.AddConsole(); + config.SetMinimumLevel(LogLevel.Debug); + }) + .BuildServiceProvider(); + } + + private static LHConfig GetLHConfig(string[] args, ILoggerFactory loggerFactory) + { + var config = new LHConfig(loggerFactory); + + string filePath = Path.Combine(Directory.GetCurrentDirectory(), ".config/littlehorse.config"); + if (File.Exists(filePath)) + config = new LHConfig(filePath, loggerFactory); + + return config; + } + + static void Main(string[] args) + { + SetupApplication(); + if (_serviceProvider != null) + { + var loggerFactory = _serviceProvider.GetRequiredService(); + var config = GetLHConfig(args, loggerFactory); + + MyWorker executableCreateGreet = new MyWorker(); + var taskWorkerCreate = new LHTaskWorker(executableCreateGreet, "create-greet", config); + MyWorker executableUpdateGreet = new MyWorker(); + var taskWorkerUpdate = new LHTaskWorker(executableUpdateGreet, "update-greet", config); + MyWorker executableDeleteGreet = new MyWorker(); + var taskWorkerDelete = new LHTaskWorker(executableDeleteGreet, "delete-greet", config); + + taskWorkerCreate.RegisterTaskDef(); + taskWorkerUpdate.RegisterTaskDef(); + taskWorkerDelete.RegisterTaskDef(); + + Thread.Sleep(1000); + + taskWorkerCreate.Start(); + taskWorkerUpdate.Start(); + taskWorkerDelete.Start(); + } + } +} diff --git a/sdk-dotnet/Examples/MaskedFieldsExample/README.md b/sdk-dotnet/Examples/MaskedFieldsExample/README.md new file mode 100644 index 000000000..345eb8c48 --- /dev/null +++ b/sdk-dotnet/Examples/MaskedFieldsExample/README.md @@ -0,0 +1,32 @@ +## Running MaskedFields Example + +This is a simple example, that masks input params and output results in LH Task Methods. + +Let's run the example in `MaskedFieldsExample` + +``` +dotnet build +dotnet run +``` + +In another terminal, use `lhctl` to run the workflow: + +``` +# The "masked-name" variable should mask the value +# And the input-name variable value will mantain the original plain text + +lhctl run example-basic masked-name pii-info input-name foo +``` + +In addition, you can check the result with: + +``` +# This call shows the result +lhctl get wfRun + +# This will show you all nodes in tha run +lhctl list nodeRun + +# This shows the task run information +lhctl get taskRun +``` diff --git a/sdk-dotnet/Examples/MaskedFieldsExample/Worker.cs b/sdk-dotnet/Examples/MaskedFieldsExample/Worker.cs new file mode 100644 index 000000000..9760e3363 --- /dev/null +++ b/sdk-dotnet/Examples/MaskedFieldsExample/Worker.cs @@ -0,0 +1,32 @@ +using LittleHorse.Sdk.Worker; + +namespace Examples.BasicExample +{ + public class MyWorker + { + [LHTaskMethod("create-greet")] + [LHType(masked: true)] + public string CreateGreeting([LHType(masked: true)] string name) + { + var message = $"Hello team, This is a New Greeting for {name}"; + Console.WriteLine($"Executing task create greet {name}"); + return message; + } + + [LHTaskMethod("update-greet")] + public string UpdateGreeting([LHType(masked: true)] string name) + { + var message = $"Hello team, This is Greeting Modification {name}"; + Console.WriteLine($"Executing task update greet {name}"); + return message; + } + + [LHTaskMethod("delete-greet")] + public string DeleteGreeting(string name) + { + var message = $"Hello team, This is a Greeting Deletion {name}"; + Console.WriteLine($"Executing task delete greet {name}"); + return message; + } + } +} \ No newline at end of file diff --git a/sdk-go/littlehorse/wf_lib_internal.go b/sdk-go/littlehorse/wf_lib_internal.go index d83291d59..d8fe56934 100644 --- a/sdk-go/littlehorse/wf_lib_internal.go +++ b/sdk-go/littlehorse/wf_lib_internal.go @@ -2,12 +2,13 @@ package littlehorse import ( "errors" - "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto" "log" "strconv" "strings" "unicode" + "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto" + "github.com/ztrue/tracerr" ) @@ -745,7 +746,8 @@ func (t *WorkflowThread) addVariable( } threadVarDef := &lhproto.ThreadVarDef{ - VarDef: varDef, + VarDef: varDef, + AccessLevel: lhproto.WfRunVariableAccessLevel_PRIVATE_VAR, } t.spec.VariableDefs = append(t.spec.VariableDefs, threadVarDef) diff --git a/sdk-go/littlehorse/wf_lib_internal_test.go b/sdk-go/littlehorse/wf_lib_internal_test.go index 715309a4b..dcc5d718d 100644 --- a/sdk-go/littlehorse/wf_lib_internal_test.go +++ b/sdk-go/littlehorse/wf_lib_internal_test.go @@ -1,9 +1,10 @@ package littlehorse_test import ( + "testing" + "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto" "github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse" - "testing" "github.com/stretchr/testify/assert" ) @@ -538,10 +539,10 @@ func TestJsonPath(t *testing.T) { func TestVariableAccessLevel(t *testing.T) { wf := littlehorse.NewWorkflow(func(t *littlehorse.WorkflowThread) { - inheritedVar := t.AddVariable("my-var", lhproto.VariableType_BOOL) - inheritedVar.WithAccessLevel(lhproto.WfRunVariableAccessLevel_PRIVATE_VAR) + publicVar := t.AddVariable("my-var", lhproto.VariableType_BOOL) + publicVar.AsPublic() - // Test that default is PUBLIC_VAR + // Test that default is PRIVATE_VAR t.AddVariable("default-access", lhproto.VariableType_INT) t.Execute("some-task") @@ -550,11 +551,11 @@ func TestVariableAccessLevel(t *testing.T) { putWf, _ := wf.Compile() entrypoint := putWf.ThreadSpecs[putWf.EntrypointThreadName] varDef := entrypoint.VariableDefs[0] - assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PRIVATE_VAR) + assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PUBLIC_VAR) assert.Equal(t, varDef.VarDef.Name, "my-var") varDef = entrypoint.VariableDefs[1] - assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PUBLIC_VAR) + assert.Equal(t, varDef.AccessLevel, lhproto.WfRunVariableAccessLevel_PRIVATE_VAR) assert.Equal(t, varDef.VarDef.Name, "default-access") } diff --git a/sdk-go/littlehorse/wf_lib_public.go b/sdk-go/littlehorse/wf_lib_public.go index 09acb9d13..4340087d5 100644 --- a/sdk-go/littlehorse/wf_lib_public.go +++ b/sdk-go/littlehorse/wf_lib_public.go @@ -110,6 +110,14 @@ func (w *WfRunVariable) WithAccessLevel(accessLevel lhproto.WfRunVariableAccessL return w.withAccessLevel(accessLevel) } +func (w *WfRunVariable) AsPublic() WfRunVariable { + return w.withAccessLevel(lhproto.WfRunVariableAccessLevel_PUBLIC_VAR) +} + +func (w *WfRunVariable) AsInherited() WfRunVariable { + return w.withAccessLevel(lhproto.WfRunVariableAccessLevel_INHERITED_VAR) +} + func (w *WfRunVariable) Searchable() *WfRunVariable { return w.searchableImpl() } diff --git a/sdk-java/build.gradle b/sdk-java/build.gradle index 4a8545c65..3030ab4c6 100644 --- a/sdk-java/build.gradle +++ b/sdk-java/build.gradle @@ -92,7 +92,7 @@ dependencies { implementation 'org.awaitility:awaitility:4.2.0' // OAuth - implementation 'com.nimbusds:oauth2-oidc-sdk:10.9.2' + implementation 'com.nimbusds:oauth2-oidc-sdk:11.20.1' // Lombok stuffs compileOnly "org.projectlombok:lombok:${lombokVersion}" diff --git a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java index 3d31a85c7..b59f8f19a 100644 --- a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java +++ b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java @@ -27,14 +27,36 @@ public interface WfRunVariable extends Serializable { WfRunVariable required(); /** - * Marks the Variable as "Searchable", which: - * - Creates an Index on the Variable in the LH Data Store - * - Due to the fact that the Variable is now Searchable, all future WfSpec - * versions must use the same Type for this Variable. + * Marks the Variable as "Searchable", which creates an Index on the Variable + * in the LH Data Store. * @return same {@link WfRunVariable} instance */ WfRunVariable searchable(); + /** + * Marks the Variable as a `PUBLIC_VAR`, which does three things: + * 1. Considers this variable in determining whether a new version of this WfSpec + * should be a major version or minor revision. + * 2. Freezes the type of this variable so that you cannot create future WfSpec + * versions with a variable of the same name and different type. + * 3. Allows defining child WfSpec's that use this variable. + * + * This is an advanced feature that you should use in any of the following cases: + * - You are treating a WfSpec as a data model and a WfRun as an instance of data. + * - You need child workflows to access this variable. + * @return same {@link WfRunVariable} instance + */ + WfRunVariable asPublic(); + + /** + * Marks the Variable as a `INHERITED_VAR`, which means that it comes from the + * parent `WfRun`. This means that: + * - There must be a parent WfSpec reference. + * - The parent must have a PUBLIC_VAR variable of the same name and type. + * @return same {@link WfRunVariable} instance + */ + WfRunVariable asInherited(); + /** * Marks the JSON_OBJ or JSON_ARR Variable as "Searchable", and creates an * index on the specified field. diff --git a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java index cd6b03023..b982d1e53 100644 --- a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java +++ b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java @@ -33,8 +33,8 @@ public WfRunVariableImpl(String name, Object typeOrDefaultVal) { this.name = name; this.typeOrDefaultVal = typeOrDefaultVal; - // This is the default zero value. - this.accessLevel = WfRunVariableAccessLevel.PUBLIC_VAR; + // As per GH Issue #582, the default is now PRIVATE_VAR. + this.accessLevel = WfRunVariableAccessLevel.PRIVATE_VAR; initializeType(); } @@ -122,4 +122,14 @@ public ThreadVarDef getSpec() { .setAccessLevel(accessLevel) .build(); } + + @Override + public WfRunVariableImpl asPublic() { + return this.withAccessLevel(WfRunVariableAccessLevel.PUBLIC_VAR); + } + + @Override + public WfRunVariable asInherited() { + return this.withAccessLevel(WfRunVariableAccessLevel.INHERITED_VAR); + } } diff --git a/sdk-python/littlehorse/model/__init__.py b/sdk-python/littlehorse/model/__init__.py index 85f573b7e..c956189c7 100644 --- a/sdk-python/littlehorse/model/__init__.py +++ b/sdk-python/littlehorse/model/__init__.py @@ -5,8 +5,8 @@ from .node_run_pb2 import * from .object_id_pb2 import * from .scheduled_wf_run_pb2 import * -from .service_pb2 import * from .service_pb2_grpc import * +from .service_pb2 import * from .task_def_pb2 import * from .task_run_pb2 import * from .user_tasks_pb2 import * diff --git a/sdk-python/littlehorse/workflow.py b/sdk-python/littlehorse/workflow.py index 4e97a5f7b..71a7788af 100644 --- a/sdk-python/littlehorse/workflow.py +++ b/sdk-python/littlehorse/workflow.py @@ -56,6 +56,7 @@ WfRunVariableAccessLevel, WorkflowRetentionPolicy, ) +from littlehorse.model.wf_spec_pb2 import PRIVATE_VAR from littlehorse.utils import negate_comparator, to_variable_value from littlehorse.worker import _create_task_def @@ -364,6 +365,28 @@ def with_json_path(self, json_path: str) -> "WfRunVariable": out.json_path = json_path return out + def as_public(self) -> "WfRunVariable": + """Sets the access level to PUBLIC_VAR, which has three implications: + - Future versions of this WfSpec cannot define a variable with the + same name and a different type. + - Child workflows can access this variable. + - This variable is now considered in determining whether a new + version of the WfSpec is a majorVersion or revision. + """ + self._access_level = WfRunVariableAccessLevel.PUBLIC_VAR + return self + + def as_inherited(self) -> "WfRunVariable": + """Sets the access level to INHERITED_VAR, which has three implications: + - Future versions of this WfSpec cannot define a variable with the + same name and a different type. + - Child workflows can access this variable. + - This variable is now considered in determining whether a new + version of the WfSpec is a majorVersion or revision. + """ + self._access_level = WfRunVariableAccessLevel.INHERITED_VAR + return self + def with_access_level( self, access_level: WfRunVariableAccessLevel ) -> "WfRunVariable": @@ -1474,7 +1497,7 @@ def add_variable( self, variable_name: str, variable_type: VariableType, - access_level: Optional[Union[WfRunVariableAccessLevel, str]] = None, + access_level: Optional[Union[WfRunVariableAccessLevel, str]] = PRIVATE_VAR, default_value: Any = None, ) -> WfRunVariable: """Defines a Variable in the ThreadSpec and returns a handle to it. diff --git a/sdk-python/tests/test_workflow.py b/sdk-python/tests/test_workflow.py index 0618844ba..6f3c1c59c 100644 --- a/sdk-python/tests/test_workflow.py +++ b/sdk-python/tests/test_workflow.py @@ -199,6 +199,7 @@ def my_entrypoint(thread: WorkflowThread) -> None: variable_defs=[ ThreadVarDef( var_def=VariableDef(name="input-name", type=VariableType.STR), + access_level=WfRunVariableAccessLevel.PRIVATE_VAR, ), ], nodes={ @@ -884,6 +885,7 @@ def my_entrypoint(thread: WorkflowThread) -> None: variable_defs=[ ThreadVarDef( var_def=VariableDef(name="input-name", type=VariableType.STR), + access_level=WfRunVariableAccessLevel.PRIVATE_VAR, ) ], nodes={ @@ -1046,6 +1048,7 @@ def my_entrypoint(thread: WorkflowThread) -> None: variable_defs=[ ThreadVarDef( var_def=VariableDef(name="value", type=VariableType.INT), + access_level=WfRunVariableAccessLevel.PRIVATE_VAR, ), ], nodes={ @@ -1609,7 +1612,8 @@ def my_entrypoint(thread: WorkflowThread) -> None: ThreadVarDef( var_def=VariableDef( name="input-name", type=VariableType.STR - ) + ), + access_level=WfRunVariableAccessLevel.PRIVATE_VAR, ), ], nodes={ @@ -1642,7 +1646,8 @@ def my_entrypoint(thread: WorkflowThread) -> None: ThreadVarDef( var_def=VariableDef( name="input-name", type=VariableType.STR - ) + ), + access_level=WfRunVariableAccessLevel.PRIVATE_VAR, ), ], nodes={ diff --git a/server/build.gradle b/server/build.gradle index e782b13ce..ed4f47415 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -47,7 +47,7 @@ dependencies { testImplementation "org.apache.kafka:kafka-streams-test-utils:${kafkaVersion}" // Auth - implementation 'com.nimbusds:oauth2-oidc-sdk:10.9.2' + implementation 'com.nimbusds:oauth2-oidc-sdk:11.20.1' // Utils implementation 'org.apache.commons:commons-lang3:3.12.0' diff --git a/server/src/main/java/io/littlehorse/common/LHConstants.java b/server/src/main/java/io/littlehorse/common/LHConstants.java index 08bda5959..2dac9236e 100644 --- a/server/src/main/java/io/littlehorse/common/LHConstants.java +++ b/server/src/main/java/io/littlehorse/common/LHConstants.java @@ -11,7 +11,7 @@ public class LHConstants { // Other various constants used by code - public static final Duration PUNCTUATOR_INERVAL = Duration.ofMillis(500); + public static final Duration TIMER_PUNCTUATOR_INTERVAL = Duration.ofMillis(500); public static final String EXT_EVT_HANDLER_VAR = "INPUT"; // Make all global metadata use the same partition key so that they're processed diff --git a/server/src/main/java/io/littlehorse/common/LHServerConfig.java b/server/src/main/java/io/littlehorse/common/LHServerConfig.java index 885a27f66..7a96a93c9 100644 --- a/server/src/main/java/io/littlehorse/common/LHServerConfig.java +++ b/server/src/main/java/io/littlehorse/common/LHServerConfig.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; @@ -91,6 +92,8 @@ public class LHServerConfig extends ConfigBase { public static final String ROCKSDB_COMPACTION_THREADS_KEY = "LHS_ROCKSDB_COMPACTION_THREADS"; public static final String STREAMS_METRICS_LEVEL_KEY = "LHS_STREAMS_METRICS_LEVEL"; public static final String LINGER_MS_KEY = "LHS_KAFKA_LINGER_MS"; + public static final String TRANSACTION_TIMEOUT_MS_KEY = "LHS_STREAMS_TRANSACTION_TIMEOUT_MS"; + public static final String CORE_KAFKA_STREAMS_OVERRIDE_PREFIX = "LHS_CORE_KS_CONFIG_"; // General LittleHorse Runtime Behavior Config Env Vars public static final String NUM_NETWORK_THREADS_KEY = "LHS_NUM_NETWORK_THREADS"; @@ -657,6 +660,7 @@ public long getTimerMemtableSize() { public Properties getKafkaProducerConfig(String component) { Properties conf = new Properties(); conf.put("client.id", this.getClientId(component)); + conf.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap"); conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); conf.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); conf.put( @@ -773,18 +777,18 @@ public boolean leaveGroupOnShutdown() { } public Properties getCoreStreamsConfig() { - Properties props = getBaseStreamsConfig(); - props.put("application.id", getKafkaGroupId("core")); - props.put("client.id", this.getClientId("core")); + Properties result = getBaseStreamsConfig(); + result.put("application.id", getKafkaGroupId("core")); + result.put("client.id", this.getClientId("core")); if (getOrSetDefault(X_USE_AT_LEAST_ONCE_KEY, "false").equals("true")) { log.warn("Using experimental override config to use at-least-once for Core topology"); - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); + result.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); } else { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + result.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); } - props.put("num.stream.threads", Integer.valueOf(getOrSetDefault(CORE_STREAM_THREADS_KEY, "1"))); + result.put("num.stream.threads", Integer.valueOf(getOrSetDefault(CORE_STREAM_THREADS_KEY, "1"))); // The Core Topology is EOS. Note that we have engineered the application to not be sensitive // to commit latency (long story). The only thing that is affected by commit latency is the // time at which metrics updates are processed by the repartition processor, but those @@ -802,16 +806,34 @@ public Properties getCoreStreamsConfig() { // // That's not to mention that we will be writing fewer times to RocksDB. Huge win. int commitInterval = Integer.valueOf(getOrSetDefault(LHServerConfig.CORE_STREAMS_COMMIT_INTERVAL_KEY, "3000")); - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval); - props.put( + result.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval); + result.put( "statestore.cache.max.bytes", Long.valueOf(getOrSetDefault(CORE_STATESTORE_CACHE_BYTES_KEY, String.valueOf(1024L * 1024L * 32)))); - // Kafka Streams calls KafkaProducer#commitTransaction() which flushes messages anyways. Sending earlier does - // not help at all in any way (this is because the Core topology is EOS). Therefore, having a big linger.ms - // does not have any downsides in theory. - props.put(StreamsConfig.producerPrefix("linger.ms"), 1000); - return props; + // Kafka Streams calls KafkaProducer#commitTransaction() which flushes messages upon committing the kafka + // transaction. We _could_ linger.ms to the commit interval; however, the problem with this is that the + // timer topology needs to be able to read the records. The Timer Topology is set to read_uncommitted and + // has a requirement that all Command's in the Timer Topology are idempotent, so this is okay. + // + // We can make some of our end-to-end tests fail if we set linger.ms to something big (i.e. 3,000ms) because + // it delays the sending of timers to the Timer Topology, so certain time-triggered events that are expected + // to happen end up not happening (eg. in RetryTest, exponential-backoff retries are not scheduled on time). + // + // We set linger.ms to the same interval as the Timer Punctuator interval (500ms). This gives us approximately + // 1-second precision on timers. + result.put(StreamsConfig.producerPrefix("linger.ms"), LHConstants.TIMER_PUNCTUATOR_INTERVAL.toMillis()); + + for (Object keyObj : props.keySet()) { + String key = (String) keyObj; + if (key.startsWith(CORE_KAFKA_STREAMS_OVERRIDE_PREFIX)) { + String kafkaKey = key.substring(CORE_KAFKA_STREAMS_OVERRIDE_PREFIX.length()) + .replace("_", ".") + .toLowerCase(); + result.put(kafkaKey, props.get(key)); + } + } + return result; } public Properties getTimerStreamsConfig() { @@ -853,6 +875,17 @@ public Properties getTimerStreamsConfig() { private Properties getBaseStreamsConfig() { Properties props = new Properties(); + props.put(StreamsConfig.producerPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), "rebootstrap"); + props.put(StreamsConfig.consumerPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), "rebootstrap"); + props.put( + StreamsConfig.restoreConsumerPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), + "rebootstrap"); + props.put( + StreamsConfig.globalConsumerPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), + "rebootstrap"); + props.put( + StreamsConfig.adminClientPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), "rebootstrap"); + if (getOrSetDefault(X_LEAVE_GROUP_ON_SHUTDOWN_KEY, "false").equals("true")) { log.warn("Using experimental internal config to leave group on shutdonw!"); props.put(StreamsConfig.consumerPrefix("internal.leave.group.on.close"), true); @@ -876,7 +909,6 @@ private Properties getBaseStreamsConfig() { props.put("bootstrap.servers", this.getBootstrapServers()); props.put("state.dir", getStateDirectory()); props.put("request.timeout.ms", 1000 * 60); - props.put("producer.transaction.timeout.ms", 1000 * 60); props.put("producer.acks", "all"); props.put("replication.factor", (int) getReplicationFactor()); props.put("num.standby.replicas", Integer.valueOf(getOrSetDefault(NUM_STANDBY_REPLICAS_KEY, "0"))); @@ -885,6 +917,7 @@ private Properties getBaseStreamsConfig() { props.put( "metrics.recording.level", getOrSetDefault(STREAMS_METRICS_LEVEL_KEY, "info").toUpperCase()); + props.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), getTransactionTimeoutMs()); // Configs required by KafkaStreams. Some of these are overriden by the application logic itself. props.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class); @@ -908,7 +941,7 @@ private Properties getBaseStreamsConfig() { // Those fetch requests can be somewhat costly. props.put("global.consumer.client.rack", getRackId()); - // As of Kafka 3.6, there is nothing we can do to optimize the group coordinator traffic. + // As of Kafka 3.9, there is nothing we can do to optimize the group coordinator traffic. } // Set the RocksDB Config Setter, and inject this LHServerConfig into the options set @@ -924,9 +957,7 @@ private Properties getBaseStreamsConfig() { // in the case of a server failure while a request is being processed, the resulting // `Command` should be processed on a new server within a minute. Issue #479 // should verify this behavior - props.put( - "consumer.session.timeout.ms", - Integer.valueOf(getOrSetDefault(LHServerConfig.SESSION_TIMEOUT_KEY, "40000"))); + props.put("consumer.session.timeout.ms", getStreamsSessionTimeout()); // In case we need to authenticate to Kafka, this sets it. addKafkaSecuritySettings(props); @@ -934,12 +965,17 @@ private Properties getBaseStreamsConfig() { return props; } + private int getTransactionTimeoutMs() { + // Default 60 second transaction timeout. + return Integer.valueOf(getOrSetDefault(LHServerConfig.TRANSACTION_TIMEOUT_MS_KEY, "60000")); + } + private String getClientId(String component) { return this.getLHClusterId() + "-" + this.getLHInstanceName() + "-" + component; } - public long getStreamsSessionTimeout() { - return Long.parseLong(props.getProperty(SESSION_TIMEOUT_KEY)); + public int getStreamsSessionTimeout() { + return Integer.valueOf(getOrSetDefault(LHServerConfig.SESSION_TIMEOUT_KEY, "40000")); } public int getNumNetworkThreads() { diff --git a/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/DeleteWfRunRequestModel.java b/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/DeleteWfRunRequestModel.java index 8a0f3ce2c..8bb5d9a79 100644 --- a/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/DeleteWfRunRequestModel.java +++ b/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/DeleteWfRunRequestModel.java @@ -5,6 +5,12 @@ import io.littlehorse.common.LHSerializable; import io.littlehorse.common.LHServerConfig; import io.littlehorse.common.model.corecommand.CoreSubCommand; +import io.littlehorse.common.model.getable.core.events.WorkflowEventModel; +import io.littlehorse.common.model.getable.core.externalevent.ExternalEventModel; +import io.littlehorse.common.model.getable.core.noderun.NodeRunModel; +import io.littlehorse.common.model.getable.core.taskrun.TaskRunModel; +import io.littlehorse.common.model.getable.core.usertaskrun.UserTaskRunModel; +import io.littlehorse.common.model.getable.core.variable.VariableModel; import io.littlehorse.common.model.getable.objectId.WfRunIdModel; import io.littlehorse.sdk.common.proto.DeleteWfRunRequest; import io.littlehorse.server.streams.topology.core.ExecutionContext; @@ -35,6 +41,13 @@ public String getPartitionKey() { @Override public Empty process(ProcessorExecutionContext executionContext, LHServerConfig config) { executionContext.getableManager().delete(wfRunId); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), TaskRunModel.class); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), VariableModel.class); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), ExternalEventModel.class); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), UserTaskRunModel.class); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), WorkflowEventModel.class); + executionContext.getableManager().deleteAllByPrefix(getPartitionKey(), NodeRunModel.class); + return Empty.getDefaultInstance(); } diff --git a/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TriggeredTaskRun.java b/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TriggeredTaskRun.java index 0f9d0c5bd..b21e4828e 100644 --- a/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TriggeredTaskRun.java +++ b/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TriggeredTaskRun.java @@ -74,19 +74,19 @@ public void initFrom(Message proto, ExecutionContext context) { public Empty process(ProcessorExecutionContext executionContext, LHServerConfig config) { WfRunIdModel wfRunId = source.getWfRunId(); - log.info("Might schedule a one-off task for wfRun {} due to UserTask", wfRunId); + log.trace("Might schedule a one-off task for wfRun {} due to UserTask", wfRunId); WfRunModel wfRunModel = executionContext.getableManager().get(wfRunId); if (wfRunModel == null) { - log.info("WfRun no longer exists! Skipping the scheduled action trigger"); + log.trace("WfRun no longer exists! Skipping the scheduled action trigger"); return null; } // Now verify that the thing hasn't yet been completed. ThreadRunModel thread = wfRunModel.getThreadRun(source.getThreadRunNumber()); - // Impossible for thread to be null, but check anyways + // This can happen in the case of ThreadRetentionPolicy being set. if (thread == null) { - log.warn("Triggered scheduled task refers to missing thread!"); + log.trace("Triggered scheduled task refers to missing thread!"); return null; } @@ -96,12 +96,12 @@ public Empty process(ProcessorExecutionContext executionContext, LHServerConfig UserTaskRunModel userTaskRun = executionContext.getableManager().get(userTaskRunId); if (userTaskNR.getStatus() != LHStatus.RUNNING) { - log.info("NodeRun is not RUNNING anymore, so can't take action!"); + log.trace("NodeRun is not RUNNING anymore, so can't take action!"); return null; } // At this point, need to update the events. - log.info("Scheduling a one-off task for wfRun {} due to UserTask", wfRunId); + log.trace("Scheduling a one-off task for wfRun {} due to UserTask", wfRunId); try { List inputVars = taskToSchedule.assignInputVars(thread, executionContext); diff --git a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModel.java b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModel.java index d544e0df3..13174d115 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModel.java @@ -35,6 +35,7 @@ import io.littlehorse.server.streams.topology.core.MetadataCommandExecution; import io.littlehorse.server.streams.topology.core.ProcessorExecutionContext; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -287,12 +288,6 @@ public Map getSearchableVariables() { return out; } - public Map getRequiredVariables() { - return threadSpecs.get(entrypointThreadName).getInputVariableDefs().entrySet().stream() - .filter(kv -> kv.getValue().isRequired()) - .collect(Collectors.toMap(kv -> kv.getKey(), kv -> kv.getValue())); - } - /* * For now, the only validation we do for variables is to make sure that: * 1. No variable name is defined twice (this will be useful for future @@ -358,17 +353,28 @@ private void validateVariablesHelper(MetadataCommandExecution ctx) throws LHApiE } // Now we curate the list of variables which are "frozen" in time and cannot - // change their types. + // change their types. This includes two types: + // - Required variables in the entrypoint threadRun + // - Any variable with the access_level `PUBLIC_VAR`. + for (ThreadVarDefModel tvd : getEntrypointThread().getRequiredVarDefs()) { + frozenVariables.put(tvd.getVarDef().getName(), tvd); + } for (ThreadSpecModel thread : threadSpecs.values()) { - for (ThreadVarDefModel tvd : thread.getRequiredVarDefs()) { - frozenVariables.put(tvd.getVarDef().getName(), tvd); - } - for (ThreadVarDefModel tvd : thread.getRequiredVarDefs()) { + for (ThreadVarDefModel tvd : thread.getPublicVarDefs()) { frozenVariables.put(tvd.getVarDef().getName(), tvd); } } } + /** + * Returns a ThreadVarDef for every PUBLIC_VAR variable in the WfSpec (all threads). + */ + public Collection getPublicVars() { + return threadSpecs.values().stream() + .flatMap(tspec -> tspec.getPublicVarDefs().stream()) + .toList(); + } + private void checkCompatibilityAndSetVersion(WfSpecModel old) { // First, for every previously-frozen variable, we need to check that either: // - the variable isn't included, or @@ -385,7 +391,7 @@ private void checkCompatibilityAndSetVersion(WfSpecModel old) { if (oldDef.getVarDef().getType() != currentVarDef.getVarDef().getType()) { throw new LHApiException( Status.FAILED_PRECONDITION, - "Variable %s must be of type %s not %s" + "Variable %s must be of type %s not %s as it was formerly declared a PUBLIC_VAR" .formatted( varName, oldDef.getVarDef().getType(), diff --git a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadSpecModel.java b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadSpecModel.java index 57c48b44d..8cde2094a 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadSpecModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadSpecModel.java @@ -21,6 +21,7 @@ import io.littlehorse.server.streams.topology.core.ExecutionContext; import io.littlehorse.server.streams.topology.core.MetadataCommandExecution; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -128,6 +129,15 @@ public Map getInputVariableDefs() { return out; } + /** + * Returns all ThreadVarDef's that are of type `PUBLIC_VAR`. + */ + public Collection getPublicVarDefs() { + return getVariableDefs().stream() + .filter(varDef -> varDef.getAccessLevel() == WfRunVariableAccessLevel.PUBLIC_VAR) + .toList(); + } + /* * Returns a set of all variable names *used* during thread execution. */ diff --git a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadVarDefModel.java b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadVarDefModel.java index 1aa224235..9bd5614b0 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadVarDefModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/thread/ThreadVarDefModel.java @@ -10,9 +10,11 @@ import io.littlehorse.server.streams.topology.core.ExecutionContext; import java.util.ArrayList; import java.util.List; +import lombok.EqualsAndHashCode; import lombok.Getter; @Getter +@EqualsAndHashCode(callSuper = false) public class ThreadVarDefModel extends LHSerializable { private VariableDefModel varDef; diff --git a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/JsonIndexModel.java b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/JsonIndexModel.java index c8efadc0a..42468e14d 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/JsonIndexModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/JsonIndexModel.java @@ -5,14 +5,15 @@ import io.littlehorse.sdk.common.proto.JsonIndex; import io.littlehorse.sdk.common.proto.VariableType; import io.littlehorse.server.streams.topology.core.ExecutionContext; -import java.util.Objects; import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; @Getter @AllArgsConstructor @NoArgsConstructor +@EqualsAndHashCode(callSuper = false) public class JsonIndexModel extends LHSerializable { private String fieldPath; @@ -36,16 +37,4 @@ public void initFrom(Message proto, ExecutionContext context) { fieldPath = p.getFieldPath(); fieldType = p.getFieldType(); } - - @Override - public boolean equals(Object other) { - if (other == null) return false; - - if (!(other instanceof JsonIndexModel)) { - return false; - } - - JsonIndexModel o = (JsonIndexModel) other; - return Objects.equals(fieldPath, o.getFieldPath()) && Objects.equals(fieldType, o.getFieldType()); - } } diff --git a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/VariableDefModel.java b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/VariableDefModel.java index 557f043cd..501917033 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/VariableDefModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/global/wfspec/variable/VariableDefModel.java @@ -9,11 +9,13 @@ import io.littlehorse.sdk.common.proto.VariableDef; import io.littlehorse.sdk.common.proto.VariableType; import io.littlehorse.server.streams.topology.core.ExecutionContext; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @Getter @Setter +@EqualsAndHashCode(callSuper = false) public class VariableDefModel extends LHSerializable { private VariableType type; diff --git a/server/src/main/java/io/littlehorse/common/util/WfSpecUtil.java b/server/src/main/java/io/littlehorse/common/util/WfSpecUtil.java index 1d2c34de0..e2abb5b7f 100644 --- a/server/src/main/java/io/littlehorse/common/util/WfSpecUtil.java +++ b/server/src/main/java/io/littlehorse/common/util/WfSpecUtil.java @@ -3,15 +3,11 @@ import com.google.protobuf.Timestamp; import io.littlehorse.common.model.getable.global.wfspec.WfSpecModel; import io.littlehorse.common.model.getable.global.wfspec.thread.ThreadVarDefModel; -import io.littlehorse.sdk.common.proto.WfRunVariableAccessLevel; import io.littlehorse.sdk.common.proto.WfSpec; import io.littlehorse.sdk.common.proto.WfSpecId; import java.util.Arrays; +import java.util.Collection; import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; public class WfSpecUtil { private WfSpecUtil() {} @@ -40,43 +36,24 @@ private static void sanitize(WfSpec.Builder spec, Timestamp date) { * Verifies if left WfSpecModel have breaking changes comparing to right * when either: * - set of left required variables does not match right required variables - * - set of left searchable variables does not match right searchable variables + * - set of left PUBLIC_VAR variables does not match right PUBLIC_VAR variables * @param left WfSpecModel to be compared * @param right WfSpecModel compared against * @return true when there is a breaking change, false otherwise */ public static boolean hasBreakingChanges(WfSpecModel left, WfSpecModel right) { - return !variablesMatch(left.getRequiredVariables(), right.getRequiredVariables()) - || !variablesMatch(left.getSearchableVariables(), right.getSearchableVariables()) - || !variableAccessLevelMatch( - left.getEntrypointThread().getVariableDefs(), - right.getEntrypointThread().getVariableDefs()); - } + Collection leftPublicVars = left.getPublicVars(); + Collection rightPublicVars = right.getPublicVars(); - private static boolean variablesMatch(Map left, Map right) { - Set leftVariables = left.keySet(); - Set rightVariables = right.keySet(); - if (leftVariables.size() != rightVariables.size()) return false; + if (leftPublicVars.size() != rightPublicVars.size()) return true; + if (!leftPublicVars.containsAll(rightPublicVars)) return true; - return leftVariables.containsAll(rightVariables); - } + // Check required variables + Collection rightRequiredVars = + right.getEntrypointThread().getRequiredVarDefs(); + Collection leftRequiredVars = + left.getEntrypointThread().getRequiredVarDefs(); - private static boolean variableAccessLevelMatch(List left, List right) { - final Map leftVariables = new HashMap<>(); - for (ThreadVarDefModel leftVarDefinition : left) { - leftVariables.put(leftVarDefinition.getVarDef().getName(), leftVarDefinition.getAccessLevel()); - } - for (ThreadVarDefModel rightVarDefinition : right) { - WfRunVariableAccessLevel rightAccessLevel = rightVarDefinition.getAccessLevel(); - if (rightAccessLevel == WfRunVariableAccessLevel.PUBLIC_VAR - || rightAccessLevel == WfRunVariableAccessLevel.INHERITED_VAR) { - WfRunVariableAccessLevel leftAccessLevel = - leftVariables.get(rightVarDefinition.getVarDef().getName()); - if (leftAccessLevel != null && leftAccessLevel.equals(WfRunVariableAccessLevel.PRIVATE_VAR)) { - return false; - } - } - } - return true; + return leftRequiredVars.size() != rightRequiredVars.size() || !leftRequiredVars.containsAll(rightRequiredVars); } } diff --git a/server/src/main/java/io/littlehorse/server/streams/storeinternals/GetableManager.java b/server/src/main/java/io/littlehorse/server/streams/storeinternals/GetableManager.java index 346eff2a0..8967989bf 100644 --- a/server/src/main/java/io/littlehorse/server/streams/storeinternals/GetableManager.java +++ b/server/src/main/java/io/littlehorse/server/streams/storeinternals/GetableManager.java @@ -115,6 +115,20 @@ public > T delete(CoreObjectId> void deleteAllByPrefix(String prefix, Class cls) { + log.trace("Deleting all {} with prefix {}", cls.getSimpleName(), prefix); + + // Note this iterates in a non-paginated way through all NodeRun's in the + // WfRun. Fine for most use-cases, but if there's a WfRUn that runs for a + // year and has hundreds of tasks per day, it will be a problem. + List> allItems = iterateOverPrefixAndPutInUncommittedChanges(prefix, cls); + + for (GetableToStore itemToDelete : allItems) { + // Marking the objectToStore as null causes the flush() to delete it. + itemToDelete.setObjectToStore(null); + } + } + public void commit() { for (Map.Entry> entry : uncommittedChanges.entrySet()) { String storeableKey = entry.getKey(); diff --git a/server/src/main/java/io/littlehorse/server/streams/topology/timer/TimerProcessor.java b/server/src/main/java/io/littlehorse/server/streams/topology/timer/TimerProcessor.java index d7da16f81..2f6185d73 100644 --- a/server/src/main/java/io/littlehorse/server/streams/topology/timer/TimerProcessor.java +++ b/server/src/main/java/io/littlehorse/server/streams/topology/timer/TimerProcessor.java @@ -25,8 +25,8 @@ public class TimerProcessor implements Processor context) { this.context = context; timerStore = context.getStateStore(ServerTopology.TIMER_STORE); - this.punctuator = - context.schedule(LHConstants.PUNCTUATOR_INERVAL, PunctuationType.WALL_CLOCK_TIME, this::clearTimers); + this.punctuator = context.schedule( + LHConstants.TIMER_PUNCTUATOR_INTERVAL, PunctuationType.WALL_CLOCK_TIME, this::clearTimers); } @Override diff --git a/server/src/test/java/e2e/SharedVariablesTest.java b/server/src/test/java/e2e/SharedVariablesTest.java index 9b27cf86c..9f9812c7d 100644 --- a/server/src/test/java/e2e/SharedVariablesTest.java +++ b/server/src/test/java/e2e/SharedVariablesTest.java @@ -9,6 +9,7 @@ import io.littlehorse.sdk.common.proto.VariableType; import io.littlehorse.sdk.common.proto.WfRunId; import io.littlehorse.sdk.common.proto.WfRunVariableAccessLevel; +import io.littlehorse.sdk.common.proto.WfSpec; import io.littlehorse.sdk.common.util.Arg; import io.littlehorse.sdk.wfsdk.WfRunVariable; import io.littlehorse.sdk.wfsdk.Workflow; @@ -67,6 +68,86 @@ public void shouldResolvePublicVariablesFromParentWf() { .isEqualTo(12); } + @Test + void shouldNotFreezeSearchableVariables() { + // Adding UUID makes it possible to run repeatedly with ExternalBootstrapper + String wfSpecName = "dont-freeze-searchable-" + UUID.randomUUID(); + Workflow workflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", VariableType.STR).searchable(); + }); + WfSpec result = client.putWfSpec(workflow.compileWorkflow()); + assertThat(result.getFrozenVariablesCount()).isEqualTo(0); + + // Additionally, private searchable variables do not increment the major version, + // and I can change the type of them if they're not public. + Workflow secondWorkflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", VariableType.INT); + }); + WfSpec secondVersion = client.putWfSpec(secondWorkflow.compileWorkflow()); + assertThat(secondVersion.getId().getMajorVersion()).isEqualTo(0); + assertThat(secondVersion.getId().getRevision()).isEqualTo(1); + } + + @Test + void publicVariablesShouldStayFrozenThroughVersions() { + String wfSpecName = "public-var-frozen-" + UUID.randomUUID(); + Workflow workflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", VariableType.STR).asPublic(); + }); + WfSpec firstVersion = client.putWfSpec(workflow.compileWorkflow()); + assertThat(firstVersion.getFrozenVariablesCount()).isEqualTo(1); + + Workflow secondWorkflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-other-var", "some string value").searchable(); + }); + WfSpec secondVersion = client.putWfSpec(secondWorkflow.compileWorkflow()); + + // Breaking change + assertThat(secondVersion.getId().getMajorVersion()).isEqualTo(1); + assertThat(secondVersion.getId().getRevision()).isEqualTo(0); + + // The first variable should still be frozen + assertThat(secondVersion.getFrozenVariablesCount()).isEqualTo(1); + assertThat(secondVersion.getFrozenVariables(0).getVarDef().getName()).isEqualTo("some-var"); + } + + @Test + void requiredVariablesIncrementMajorVersion() { + String wfSpecName = "required-major-version-" + UUID.randomUUID(); + Workflow workflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", VariableType.STR).required(); + }); + WfSpec firstVersion = client.putWfSpec(workflow.compileWorkflow()); + assertThat(firstVersion.getFrozenVariablesCount()).isEqualTo(1); + + Workflow secondWorkflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-other-var", "not required").searchable(); + }); + WfSpec secondVersion = client.putWfSpec(secondWorkflow.compileWorkflow()); + + assertThat(secondVersion.getId().getMajorVersion()).isEqualTo(1); + assertThat(secondVersion.getId().getRevision()).isEqualTo(0); + } + + @Test + void publicVariablesIncrementMajorVersion() { + String wfSpecName = "public-major-version-" + UUID.randomUUID(); + Workflow workflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", VariableType.STR).asPublic(); + wf.addVariable("another-var", "will-be-removed").asPublic(); + }); + WfSpec firstVersion = client.putWfSpec(workflow.compileWorkflow()); + assertThat(firstVersion.getFrozenVariablesCount()).isEqualTo(2); + + Workflow secondWorkflow = Workflow.newWorkflow(wfSpecName, wf -> { + wf.addVariable("some-var", "not public anymore").searchable(); + }); + WfSpec thirdVersion = client.putWfSpec(secondWorkflow.compileWorkflow()); + + assertThat(thirdVersion.getId().getMajorVersion()).isEqualTo(1); + assertThat(thirdVersion.getId().getRevision()).isEqualTo(0); + } + @LHWorkflow("shared-variables-parent-wf") public Workflow buildParentWf() { return new WorkflowImpl("shared-variables-parent-wf", thread -> { @@ -82,8 +163,8 @@ public Workflow buildParentWf() { @LHWorkflow("shared-variables-child-wf") public Workflow buildChildWf() { Workflow out = new WorkflowImpl("shared-variables-child-wf", thread -> { - WfRunVariable publicVariable = thread.addVariable("public-variable", VariableType.INT) - .withAccessLevel(WfRunVariableAccessLevel.INHERITED_VAR); + WfRunVariable publicVariable = + thread.addVariable("public-variable", VariableType.INT).asInherited(); WfRunVariable calculatedValue = thread.addVariable("calculated-value", VariableType.INT).searchable(); diff --git a/server/src/test/java/e2e/WfSpecLifecycleTest.java b/server/src/test/java/e2e/WfSpecLifecycleTest.java index 6ec651a87..9d0675f8e 100644 --- a/server/src/test/java/e2e/WfSpecLifecycleTest.java +++ b/server/src/test/java/e2e/WfSpecLifecycleTest.java @@ -71,7 +71,7 @@ void shouldAddMajorVersionWhenWfSpecHasBreakingChanges() { Workflow updatedWorkflow = Workflow.newWorkflow("sample", wf -> { wf.addVariable("variable", VariableType.BOOL).required(); - wf.addVariable("searchable", VariableType.BOOL).searchable(); + wf.addVariable("second-required", VariableType.BOOL).required(); }); WfSpec updatedSpec = client.putWfSpec(updatedWorkflow.compileWorkflow()); @@ -180,7 +180,7 @@ void shouldThrowFailedPreconditionWhenWfSpecHasBreakingChanges() { Workflow updatedWorkflow = Workflow.newWorkflow("sample", wf -> { wf.addVariable("variable", VariableType.BOOL).required(); - wf.addVariable("searchable", VariableType.BOOL).searchable(); + wf.addVariable("searchable", VariableType.BOOL).required(); }) .withUpdateType(AllowedUpdateType.MINOR_REVISION_UPDATES); diff --git a/server/src/test/java/io/littlehorse/TestUtil.java b/server/src/test/java/io/littlehorse/TestUtil.java index 0110d3ec2..58023a42e 100644 --- a/server/src/test/java/io/littlehorse/TestUtil.java +++ b/server/src/test/java/io/littlehorse/TestUtil.java @@ -1,6 +1,7 @@ package io.littlehorse; import io.littlehorse.common.model.ScheduledTaskModel; +import io.littlehorse.common.model.getable.core.events.WorkflowEventModel; import io.littlehorse.common.model.getable.core.externalevent.ExternalEventModel; import io.littlehorse.common.model.getable.core.noderun.NodeRunModel; import io.littlehorse.common.model.getable.core.taskrun.TaskAttemptModel; @@ -32,6 +33,8 @@ import io.littlehorse.common.model.getable.objectId.UserTaskRunIdModel; import io.littlehorse.common.model.getable.objectId.WfRunIdModel; import io.littlehorse.common.model.getable.objectId.WfSpecIdModel; +import io.littlehorse.common.model.getable.objectId.WorkflowEventDefIdModel; +import io.littlehorse.common.model.getable.objectId.WorkflowEventIdModel; import io.littlehorse.common.proto.GetableClassEnum; import io.littlehorse.common.proto.TagStorageType; import io.littlehorse.sdk.common.proto.*; @@ -234,6 +237,25 @@ public static ExternalEventModel externalEvent() { return externalEvent; } + public static ExternalEventModel externalEvent(String wfRunId) { + ExternalEventModel externalEvent = new ExternalEventModel( + variableValue(), + new WfRunIdModel(wfRunId), + new ExternalEventDefIdModel("test-name"), + "0000001", + null, + null, + null); + return externalEvent; + } + + public static WorkflowEventModel workflowEvent(String wfRunId) { + WorkflowEventModel workflowEvent = new WorkflowEventModel( + new WorkflowEventIdModel(new WfRunIdModel(wfRunId), new WorkflowEventDefIdModel("test-name"), 0), + variableValue()); + return workflowEvent; + } + public static VariableDefModel variableDef(String name, VariableType variableTypePb) { VariableDefModel variableDef = new VariableDefModel(); variableDef.setName(name); diff --git a/server/src/test/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModelTest.java b/server/src/test/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModelTest.java index 6d9ee3a4f..082c2f86e 100644 --- a/server/src/test/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModelTest.java +++ b/server/src/test/java/io/littlehorse/common/model/getable/global/wfspec/WfSpecModelTest.java @@ -110,7 +110,7 @@ public void shouldIncreaseTheMajorVersionWhenAPublicVariableChangesItsAccessLeve } @Test - public void shouldIncreaseTheMajorVersionWhenAInheritedVariableChangesItsAccessLevelToPrivate() { + public void shouldNotIncreaseTheMajorVersionWhenAInheritedVariableChangesItsAccessLevelToPrivate() { WfSpecModel oldVersion = TestUtil.wfSpec("my-parent-wf"); ThreadVarDefModel inheritedVar = new ThreadVarDefModel(variableDef, false, false, WfRunVariableAccessLevel.INHERITED_VAR); @@ -127,11 +127,12 @@ public void shouldIncreaseTheMajorVersionWhenAInheritedVariableChangesItsAccessL wfSpec.setThreadSpecs(Map.of(wfSpec.getEntrypointThreadName(), entrypointThread)); Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(0); wfSpec.validateAndMaybeBumpVersion(Optional.of(oldVersion), mockContext); - Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(1); + Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(0); + Assertions.assertThat(wfSpec.getId().getRevision()).isEqualTo(1); } @ParameterizedTest - @MethodSource("provideNonBreakingChangeArguments") + @MethodSource("provideBreakingChangeArguments") public void shouldNotIncreaseTheMajorVersionWhenVariablesAreStillAccessibleFromChildWorkflows( WfRunVariableAccessLevel from, WfRunVariableAccessLevel to) { WfSpecModel oldVersion = TestUtil.wfSpec("my-parent-wf"); @@ -148,19 +149,16 @@ public void shouldNotIncreaseTheMajorVersionWhenVariablesAreStillAccessibleFromC wfSpec.setThreadSpecs(Map.of(wfSpec.getEntrypointThreadName(), entrypointThread)); Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(0); wfSpec.validateAndMaybeBumpVersion(Optional.of(oldVersion), mockContext); - Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(0); + Assertions.assertThat(wfSpec.getId().getMajorVersion()).isEqualTo(1); } /* Access level combinations when the major version shouldn't increase */ - private static Stream provideNonBreakingChangeArguments() { + private static Stream provideBreakingChangeArguments() { return Stream.of( Arguments.of(WfRunVariableAccessLevel.PUBLIC_VAR, WfRunVariableAccessLevel.INHERITED_VAR), - Arguments.of(WfRunVariableAccessLevel.PUBLIC_VAR, WfRunVariableAccessLevel.PUBLIC_VAR), Arguments.of(WfRunVariableAccessLevel.PRIVATE_VAR, WfRunVariableAccessLevel.PUBLIC_VAR), - Arguments.of(WfRunVariableAccessLevel.PRIVATE_VAR, WfRunVariableAccessLevel.PRIVATE_VAR), - Arguments.of(WfRunVariableAccessLevel.INHERITED_VAR, WfRunVariableAccessLevel.PUBLIC_VAR), - Arguments.of(WfRunVariableAccessLevel.INHERITED_VAR, WfRunVariableAccessLevel.INHERITED_VAR)); + Arguments.of(WfRunVariableAccessLevel.INHERITED_VAR, WfRunVariableAccessLevel.PUBLIC_VAR)); } } diff --git a/server/src/test/java/io/littlehorse/common/model/metadatacommand/subcommand/DeleteWfRunRequestModelTest.java b/server/src/test/java/io/littlehorse/common/model/metadatacommand/subcommand/DeleteWfRunRequestModelTest.java new file mode 100644 index 000000000..8d0c0f3b1 --- /dev/null +++ b/server/src/test/java/io/littlehorse/common/model/metadatacommand/subcommand/DeleteWfRunRequestModelTest.java @@ -0,0 +1,130 @@ +package io.littlehorse.common.model.metadatacommand.subcommand; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +import io.littlehorse.TestUtil; +import io.littlehorse.common.LHConstants; +import io.littlehorse.common.LHServerConfig; +import io.littlehorse.common.model.getable.core.events.WorkflowEventModel; +import io.littlehorse.common.model.getable.core.externalevent.ExternalEventModel; +import io.littlehorse.common.model.getable.core.noderun.NodeRunModel; +import io.littlehorse.common.model.getable.core.taskrun.TaskRunModel; +import io.littlehorse.common.model.getable.core.usertaskrun.UserTaskRunModel; +import io.littlehorse.common.model.getable.core.variable.VariableModel; +import io.littlehorse.common.model.getable.core.wfrun.WfRunModel; +import io.littlehorse.common.model.getable.global.wfspec.WfSpecModel; +import io.littlehorse.common.model.getable.objectId.PrincipalIdModel; +import io.littlehorse.common.model.getable.objectId.TaskDefIdModel; +import io.littlehorse.common.model.getable.objectId.TaskRunIdModel; +import io.littlehorse.common.model.getable.objectId.TenantIdModel; +import io.littlehorse.common.model.getable.objectId.VariableIdModel; +import io.littlehorse.common.proto.Command; +import io.littlehorse.sdk.common.proto.DeleteWfRunRequest; +import io.littlehorse.sdk.common.proto.WfRunId; +import io.littlehorse.server.LHServer; +import io.littlehorse.server.TestProcessorExecutionContext; +import io.littlehorse.server.streams.storeinternals.GetableManager; +import io.littlehorse.server.streams.taskqueue.TaskQueueManager; +import io.littlehorse.server.streams.topology.core.CommandProcessorOutput; +import io.littlehorse.server.streams.topology.core.processors.CommandProcessor; +import io.littlehorse.server.streams.util.HeadersUtil; +import io.littlehorse.server.streams.util.MetadataCache; +import java.util.List; +import java.util.UUID; +import org.apache.kafka.streams.processor.api.MockProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +public class DeleteWfRunRequestModelTest { + + @Mock + private final LHServerConfig lhConfig = mock(); + + @Mock + private final LHServer server = mock(); + + private final MetadataCache metadataCache = new MetadataCache(); + private final TaskQueueManager queueManager = mock(); + private final CommandProcessor processor = new CommandProcessor(lhConfig, server, metadataCache, queueManager); + private final String wfRunId = UUID.randomUUID().toString(); + private final WfRunModel wfRun = TestUtil.wfRun(wfRunId); + private final Command command = commandProto(); + private final MockProcessorContext mockProcessor = new MockProcessorContext<>(); + private final TestProcessorExecutionContext testProcessorContext = TestProcessorExecutionContext.create( + command, + HeadersUtil.metadataHeadersFor( + new TenantIdModel(LHConstants.DEFAULT_TENANT), + new PrincipalIdModel(LHConstants.ANONYMOUS_PRINCIPAL)), + mockProcessor); + private final GetableManager getableManager = testProcessorContext.getableManager(); + + // VariableModel's "index" implementation relies on a litany of other objects + // that we do not need to create or store to test this specific feature. + // So we mock the VariableModel and hide the "index" functionality with our when() statements. + public VariableModel mockVariableModel() { + VariableModel variableModel = spy(); + variableModel.setId(new VariableIdModel(wfRun.getId(), 0, "test-name")); + variableModel.setValue(TestUtil.variableValue()); + variableModel.setMasked(false); + WfSpecModel wfSpec = TestUtil.wfSpec("testWfSpecName"); + variableModel.setWfSpec(wfSpec); + variableModel.setWfSpecId(wfSpec.getId()); + + when(variableModel.getIndexConfigurations()).thenReturn(List.of()); + when(variableModel.getIndexEntries()).thenReturn(List.of()); + + return variableModel; + } + + @Test + public void shouldDeleteWfRunObjects() { + TaskRunModel taskRunModel = TestUtil.taskRun( + new TaskRunIdModel(wfRun.getId(), UUID.randomUUID().toString()), + new TaskDefIdModel(UUID.randomUUID().toString())); + VariableModel variableModel = mockVariableModel(); + ExternalEventModel externalEventModel = TestUtil.externalEvent(wfRunId); + UserTaskRunModel userTaskRunModel = TestUtil.userTaskRun(wfRunId, testProcessorContext); + WorkflowEventModel workflowEventModel = TestUtil.workflowEvent(wfRunId); + NodeRunModel nodeRunModel = TestUtil.nodeRun(wfRunId); + + getableManager.put(wfRun); + getableManager.put(taskRunModel); + getableManager.put(variableModel); + getableManager.put(externalEventModel); + getableManager.put(userTaskRunModel); + getableManager.put(workflowEventModel); + getableManager.put(nodeRunModel); + + getableManager.commit(); + processor.init(mockProcessor); + processor.process(new Record<>("", command, 0L, testProcessorContext.getRecordMetadata())); + + WfRunModel storedWfRunModel = getableManager.get(wfRun.getId()); + TaskRunModel storedTaskRunModel = getableManager.get(taskRunModel.getId()); + VariableModel storedVariableModel = getableManager.get(variableModel.getId()); + ExternalEventModel storedExternalEventModel = getableManager.get(externalEventModel.getId()); + UserTaskRunModel storedUserTaskRunModel = getableManager.get(userTaskRunModel.getId()); + WorkflowEventModel storedWorkflowEventModel = getableManager.get(workflowEventModel.getId()); + NodeRunModel storedNodeRunModel = getableManager.get(nodeRunModel.getId()); + + assertThat(storedWfRunModel).isNull(); + assertThat(storedTaskRunModel).isNull(); + assertThat(storedVariableModel).isNull(); + assertThat(storedExternalEventModel).isNull(); + assertThat(storedUserTaskRunModel).isNull(); + assertThat(storedWorkflowEventModel).isNull(); + assertThat(storedNodeRunModel).isNull(); + verify(server, never()).sendErrorToClient(anyString(), any()); + } + + private Command commandProto() { + DeleteWfRunRequest request = DeleteWfRunRequest.newBuilder() + .setId(WfRunId.newBuilder().setId(wfRun.getObjectId().getId())) + .build(); + return Command.newBuilder().setDeleteWfRun(request).build(); + } +} diff --git a/server/src/test/java/io/littlehorse/server/streams/topology/timer/TimerProcessorTest.java b/server/src/test/java/io/littlehorse/server/streams/topology/timer/TimerProcessorTest.java index b3846ac26..6aa7bd6cf 100644 --- a/server/src/test/java/io/littlehorse/server/streams/topology/timer/TimerProcessorTest.java +++ b/server/src/test/java/io/littlehorse/server/streams/topology/timer/TimerProcessorTest.java @@ -68,7 +68,7 @@ public void supportSystemTimePunctuatorConsistency() { @Test public void supportPunctuatorIntervalConfigConsistency() { - Assertions.assertThat(scheduledPunctuator.getInterval()).isEqualTo(LHConstants.PUNCTUATOR_INERVAL); + Assertions.assertThat(scheduledPunctuator.getInterval()).isEqualTo(LHConstants.TIMER_PUNCTUATOR_INTERVAL); } @Test diff --git a/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java b/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java index 77f8be564..b29fc86b5 100644 --- a/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java +++ b/server/src/test/java/io/littlehorse/storeinternals/GetableManagerTest.java @@ -58,7 +58,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -81,7 +80,7 @@ public class GetableManagerTest { new MockProcessorContext<>(); private GetableManager getableManager; - @Mock(answer = Answers.RETURNS_DEEP_STUBS) + @Mock private ProcessorExecutionContext executionContext; private AuthorizationContext testContext = new AuthorizationContextImpl( @@ -133,6 +132,21 @@ void deleteGetableAndTags() { assertThat(keysAfterDelete).isEmpty(); } + @Test + void deleteAllByPrefix() { + WfRunModel wfRunModel = TestUtil.wfRun("1234"); + TaskRunModel taskRunModel = TestUtil.taskRun(); + + getableManager.put(taskRunModel); + getableManager.commit(); + + getableManager.deleteAllByPrefix(wfRunModel.getPartitionKey().get(), TaskRunModel.class); + getableManager.commit(); + + TaskRunModel storedTaskRunModel = getableManager.get(taskRunModel.getObjectId()); + assertThat(storedTaskRunModel).isNull(); + } + @NotNull private List getAllKeys(KeyValueStore store) { KeyValueIterator all = store.all();